Skip to content

feat(security): add replay attack prevention and HTTPS enforcement#19

Merged
adriannoes merged 15 commits intomainfrom
feature/sprint-s3-replay-attack-prevention
Jan 28, 2026
Merged

feat(security): add replay attack prevention and HTTPS enforcement#19
adriannoes merged 15 commits intomainfrom
feature/sprint-s3-replay-attack-prevention

Conversation

@adriannoes
Copy link
Copy Markdown
Owner

@adriannoes adriannoes commented Jan 27, 2026

This PR implements timestamp validation and HTTPS enforcement to prevent replay attacks and ensure secure connections in production.

Features

1. Timestamp Validation

  • Added MAX_ENVELOPE_AGE_SECONDS (300s) and MAX_FUTURE_TOLERANCE_SECONDS (30s) constants
  • Implemented validate_envelope_timestamp() to reject envelopes outside acceptable time windows
  • Integrated validation in server handler with proper JSON-RPC error responses

2. Optional Nonce Support

  • Added NonceStore protocol and InMemoryNonceStore implementation
  • Implemented validate_envelope_nonce() for duplicate detection
  • Configurable via require_nonce parameter in create_app()
  • TTL-based expiration (10 minutes) with lazy cleanup

3. HTTPS Enforcement

  • Client-side HTTPS validation with localhost exception
  • require_https=True by default (configurable)
  • Clear error messages with override instructions
  • HTTP allowed for localhost/127.0.0.1/::1 with warning

Changes

New Files:

  • src/asap/transport/validators.py - Validation module
  • tests/transport/unit/test_validators.py - Unit tests (11 tests)

Modified Files:

  • src/asap/models/constants.py - Timestamp constants
  • src/asap/errors.py - InvalidTimestampError, InvalidNonceError
  • src/asap/models/envelope.py - Nonce documentation
  • src/asap/transport/server.py - Validation integration
  • src/asap/transport/client.py - HTTPS enforcement
  • tests/transport/test_client.py - HTTPS tests (6 tests)
  • tests/e2e/test_two_agents.py - Fix for HTTPS validation
  • docs/security.md - Replay prevention and HTTPS docs
  • README.md - HTTPS examples and notes

Testing

  • ✅ All 627 tests passing
  • ✅ 11 new unit tests for validators
  • ✅ 6 new tests for HTTPS validation
  • ✅ Coverage: 91.90%
  • ✅ mypy --strict: No errors

Documentation

  • Added comprehensive "Replay Attack Prevention" section to docs/security.md
  • Added "HTTPS Enforcement" section with configuration examples
  • Updated README with HTTPS best practices

Breaking Changes

None - All changes are backward compatible. HTTPS enforcement can be disabled with require_https=False for development.

Related

@adriannoes adriannoes force-pushed the feature/sprint-s3-replay-attack-prevention branch from 9efad43 to c0fbd10 Compare January 27, 2026 23:58
- Add test_validators_edge_cases.py: None timestamp, naive TZ, non-string nonce
- Add test_payloads_validation.py: McpToolResult validator edge cases
- Add test_cli_edge_cases.py: verbose output, JSON root validation
- Add test_run_demo.py: start_process, wait_for_ready, terminate_process

All 657 tests passing with 95.05% coverage.
@adriannoes adriannoes force-pushed the feature/sprint-s3-replay-attack-prevention branch from c0fbd10 to 57e6f2d Compare January 28, 2026 00:00
@adriannoes adriannoes self-assigned this Jan 28, 2026
@adriannoes adriannoes added the enhancement New feature or request label Jan 28, 2026
@adriannoes
Copy link
Copy Markdown
Owner Author

Code Review: PR #19 - feat(security): add replay attack prevention and HTTPS enforcement (Opus 4.5)

1. Executive Summary

  • Impact Analysis: Medium Risk. New security-critical validation code (timestamp and nonce) integrated into the request processing pipeline. HTTPS enforcement affects all client connections.
  • Architecture Check: Yes. Implementation follows SOLID principles, separates validation logic into dedicated module (validators.py), and uses Protocol pattern for extensibility.
  • Blockers: 0 critical issues found.

2. Critical Issues (Must Fix)

No critical security or logic bugs found. The implementation is solid, properly handles edge cases, and includes comprehensive test coverage.


3. Improvements & Refactoring (Strongly Recommended)

The following are quality improvements that would enhance the robustness and maintainability of the implementation.

3.1 [Concurrency] Potential Race Condition in InMemoryNonceStore - src/asap/transport/validators.py

  • Location: Lines 169-184 (is_used method)
  • Context: The is_used method performs cleanup and check within a lock, but there's a TOCTOU (time-of-check-time-of-use) race condition between is_used() and mark_used() calls in validate_envelope_nonce().
  • Problem: Two concurrent requests with the same nonce could both pass is_used() before either calls mark_used().
  • Risk: Low. Under typical load, this race is unlikely. However, for a high-security protocol, this should be addressed.
  • Suggestion: Combine check-and-mark into a single atomic operation:
# file: src/asap/transport/validators.py

class InMemoryNonceStore:
    # ... existing code ...
    
    def check_and_mark(self, nonce: str, ttl_seconds: int) -> bool:
        """Atomically check if nonce is used and mark it if not.
        
        Args:
            nonce: The nonce value to check and mark
            ttl_seconds: Time-to-live in seconds for the nonce
            
        Returns:
            True if nonce was already used, False if it was newly marked
        """
        with self._lock:
            self._cleanup_expired()
            if nonce in self._store and self._store[nonce] >= time.time():
                return True  # Already used
            self._store[nonce] = time.time() + ttl_seconds
            return False  # Newly marked

Then in validate_envelope_nonce():

-    if nonce_store.is_used(nonce):
-        raise InvalidNonceError(...)
-    nonce_store.mark_used(nonce, ttl_seconds=600)
+    if nonce_store.check_and_mark(nonce, ttl_seconds=600):
+        raise InvalidNonceError(...)

Note: This would require updating the NonceStore Protocol to include the atomic method.


3.2 [Resilience] Missing Empty Nonce String Validation - src/asap/transport/validators.py

  • Location: Lines 261-269
  • Context: The nonce validation checks for non-string types but doesn't reject empty strings.
  • Problem: An empty string "" would pass validation and be stored, but it offers no replay protection.
  • Suggestion:
# file: src/asap/transport/validators.py (Line 263-269)

    # Validate nonce is a string
-   if not isinstance(nonce, str):
+   if not isinstance(nonce, str) or not nonce:
        raise InvalidNonceError(
            nonce=str(nonce),
-           message=f"Nonce must be a string, got {type(nonce).__name__}",
+           message=f"Nonce must be a non-empty string, got {type(nonce).__name__ if not isinstance(nonce, str) else 'empty string'}",
            details={"envelope_id": envelope.id},
        )

3.3 [Observability] Nonce Exposed in Log Output - src/asap/transport/server.py

  • Location: Lines 885-889
  • Context: The nonce value is logged when validation fails.
  • Problem: Nonces, while not directly sensitive, could be useful to attackers for reconnaissance if exposed in logs.
  • Suggestion: Consider truncating or hashing the nonce in logs:
# file: src/asap/transport/server.py (Lines 885-889)

            except InvalidNonceError as e:
                logger.warning(
                    "asap.request.invalid_nonce",
                    envelope_id=envelope.id,
-                   nonce=e.nonce,
+                   nonce_prefix=e.nonce[:8] + "..." if len(e.nonce) > 8 else e.nonce,
                    error=e.message,
                )

3.4 [Data Integrity] Hardcoded TTL in validate_envelope_nonce - src/asap/transport/validators.py

  • Location: Line 280
  • Context: The nonce TTL is hardcoded to 600 seconds (10 minutes).
  • Problem: This should be configurable or derived from MAX_ENVELOPE_AGE_SECONDS to ensure consistency.
  • Suggestion:
# file: src/asap/transport/validators.py (Line 279-280)

+   # Use 2x envelope age as TTL to ensure nonces expire after the envelope would
+   # have been rejected anyway, providing some buffer
+   nonce_ttl = MAX_ENVELOPE_AGE_SECONDS * 2  # 10 minutes by default
-   nonce_store.mark_used(nonce, ttl_seconds=600)
+   nonce_store.mark_used(nonce, ttl_seconds=nonce_ttl)

3.5 [Defensive Coding] IPv6 Localhost Edge Case - src/asap/transport/client.py

  • Location: Lines 207-224 (_is_localhost method)
  • Context: IPv6 localhost detection handles ::1 but not full forms like [::1].
  • Problem: URLs like http://[::1]:8000 would have hostname [::1] (with brackets) not ::1.
  • Suggestion:
# file: src/asap/transport/client.py (Lines 222-224)

        hostname_lower = hostname.lower()
-       return hostname_lower in ("localhost", "127.0.0.1", "::1")
+       # Handle both ::1 and [::1] (bracket notation from URL parsing)
+       return hostname_lower in ("localhost", "127.0.0.1", "::1", "[::1]")

3.6 [Testing] Missing Integration Test for Validation Order - tests/transport/

  • Location: New test file recommended
  • Context: Timestamp validation runs before nonce validation in the server handler.
  • Problem: No test verifies the validation order or that both validations are consistently applied together.
  • Suggestion: Add an integration test that verifies:
    1. Invalid timestamp is rejected before nonce check
    2. Valid timestamp with invalid nonce is rejected
    3. Both validations return appropriate error codes

4. Nitpicks & Questions

Code Style

  • src/asap/transport/validators.py (Line 10): datetime, timezone import could be grouped with stdlib imports for PEP 8 compliance, but current grouping is acceptable.

  • src/asap/errors.py (Lines 229-234): The conditional dict merge pattern is creative but slightly hard to read:

    **(age_seconds is not None and {"age_seconds": age_seconds} or {})

    Consider explicit if/else for clarity (but this is a nitpick - current code works correctly).

Documentation

  • docs/security.md: Excellent comprehensive documentation. Minor suggestion: add a "Quick Reference" table at the top summarizing the validation constants and their values.

Type Annotations

  • src/asap/transport/client.py (Line 208): parsed_url: Any type annotation loses type safety. Consider using ParseResult from urllib.parse:
    from urllib.parse import ParseResult
    
    @staticmethod
    def _is_localhost(parsed_url: ParseResult) -> bool:

Test Coverage

  • tests/transport/unit/test_validators.py: Tests are well-structured. The test_expired_nonce_allowed_again test uses time.sleep(1.1) which is acceptable for unit tests but could cause flakiness. Consider using time mocking for more reliable tests.

@adriannoes
Copy link
Copy Markdown
Owner Author

5. Domain-Specific Analysis

5.1 Replay Attack Prevention

Timestamp Validation:

  • ✅ Correct implementation of 5-minute age window
  • ✅ Correct implementation of 30-second future tolerance
  • ✅ Timezone handling covers naive and non-UTC timestamps
  • ✅ Error messages include detailed context for debugging

Nonce Validation:

  • ✅ Optional by design (backward compatible)
  • ✅ Lazy cleanup prevents unbounded memory growth
  • ⚠️ Race condition between is_used() and mark_used() (see Section 3.1)
  • ⚠️ Hardcoded 10-minute TTL (see Section 3.4)

5.2 HTTPS Enforcement

Client-Side Validation:

  • ✅ Localhost exception for development is appropriate
  • ✅ Clear error messages with override instructions
  • ✅ Warning log for HTTP localhost usage
  • ⚠️ IPv6 localhost edge case (see Section 3.5)

5.3 Concurrency Safety

Component Thread-Safe Notes
InMemoryNonceStore ✅ Yes Uses RLock for all operations
validate_envelope_timestamp ✅ Yes Stateless function
validate_envelope_nonce ⚠️ Partial TOCTOU race (see Section 3.1)

6. Verification Results

Automated Tests

Check Command Result
Unit Tests uv run pytest tests/transport/unit/test_validators.py -v 11/11 passed
HTTPS Tests uv run pytest tests/transport/test_client.py -v -k HTTPS 6/6 passed
Full Suite uv run pytest 627 passed
Coverage pytest --cov 91.90%

Static Analysis

Check Command Result
Linting uv run ruff check src/asap/transport/ All checks passed
Type Check uv run mypy --strict src/asap/transport/ No issues found

Manual Verification

  • Timestamp validation flow: Traced through server.py lines 856-879, confirms early return on validation failure.
  • Nonce validation flow: Traced through server.py lines 881-904, confirms proper error handling.
  • HTTPS enforcement: Traced through client.py lines 174-197, confirms localhost exception logic.

- Add atomic check_and_mark() method to NonceStore protocol
- Replace separate is_used()/mark_used() calls with atomic operation
- Add validation for empty nonce strings
- Truncate nonce values in logs to prevent full exposure
- Derive nonce TTL from MAX_ENVELOPE_AGE_SECONDS constant
- Fix TOCTOU race condition in InMemoryNonceStore
- Replace conditional dict merge with explicit if/else in errors.py
- Replace Any type annotation with ParseResult in client.py
- Improve code readability and maintainability
- Add integration test for validation order (timestamp before nonce)
- Replace time.sleep with time mocking in expired nonce test
- Update test assertions to match new error messages
- Add test for empty nonce string validation
- Add validation constants summary table
- Add security features status table
- Improve documentation accessibility
@adriannoes adriannoes merged commit c699e70 into main Jan 28, 2026
4 checks passed
@adriannoes adriannoes deleted the feature/sprint-s3-replay-attack-prevention branch January 28, 2026 00:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant