|
| 1 | +""" |
| 2 | +tests/test_rate_limiting.py |
| 3 | +============================ |
| 4 | +Unit tests for the _RateLimiter class in dashboard_api.py |
| 5 | +IEC 62443-3-3 SR 7.1 — Denial-of-Service protection |
| 6 | +
|
| 7 | +Tests cover: |
| 8 | + - Normal requests under the limit are allowed |
| 9 | + - Requests that exceed the limit are blocked (is_allowed returns False) |
| 10 | + - Counter resets correctly after the sliding window expires |
| 11 | + - retry_after() returns a positive integer |
| 12 | + - RATE_LIMIT_READ_RPM env var overrides default limit |
| 13 | + - Different IPs have independent counters |
| 14 | +""" |
| 15 | + |
| 16 | +from __future__ import annotations |
| 17 | + |
| 18 | +import os |
| 19 | +import time |
| 20 | + |
| 21 | +import pytest |
| 22 | + |
| 23 | + |
| 24 | +# --------------------------------------------------------------------------- |
| 25 | +# Import the private _RateLimiter directly from dashboard_api |
| 26 | +# (tests the class, not the aiohttp middleware, to avoid needing a running server) |
| 27 | +# --------------------------------------------------------------------------- |
| 28 | + |
| 29 | + |
| 30 | +@pytest.fixture() |
| 31 | +def limiter(monkeypatch: pytest.MonkeyPatch): |
| 32 | + """Fresh _RateLimiter with a tiny limit of 5 req/min for fast tests.""" |
| 33 | + monkeypatch.setenv("RATE_LIMIT_READ_RPM", "5") |
| 34 | + # Re-import to get a fresh instance using the patched env var |
| 35 | + import importlib |
| 36 | + import src.interfaces.dashboard_api as mod |
| 37 | + importlib.reload(mod) |
| 38 | + return mod._RateLimiter() |
| 39 | + |
| 40 | + |
| 41 | +class TestRateLimiter: |
| 42 | + |
| 43 | + def test_allows_requests_under_limit(self, limiter) -> None: |
| 44 | + """First 5 requests for an IP must be allowed.""" |
| 45 | + for _ in range(5): |
| 46 | + assert limiter.is_allowed("10.0.0.1") is True |
| 47 | + |
| 48 | + def test_blocks_request_over_limit(self, limiter) -> None: |
| 49 | + """6th request within the same window must be blocked.""" |
| 50 | + for _ in range(5): |
| 51 | + limiter.is_allowed("10.0.0.2") |
| 52 | + assert limiter.is_allowed("10.0.0.2") is False |
| 53 | + |
| 54 | + def test_different_ips_are_independent(self, limiter) -> None: |
| 55 | + """Blocking one IP must not affect another IP's counter.""" |
| 56 | + for _ in range(5): |
| 57 | + limiter.is_allowed("10.0.0.3") |
| 58 | + # IP 3 is now at limit |
| 59 | + assert limiter.is_allowed("10.0.0.3") is False |
| 60 | + # IP 4 is untouched → still allowed |
| 61 | + assert limiter.is_allowed("10.0.0.4") is True |
| 62 | + |
| 63 | + def test_retry_after_positive(self, limiter) -> None: |
| 64 | + """retry_after() must return a positive integer when limit is exceeded.""" |
| 65 | + for _ in range(5): |
| 66 | + limiter.is_allowed("10.0.0.5") |
| 67 | + limiter.is_allowed("10.0.0.5") # trigger block |
| 68 | + retry = limiter.retry_after("10.0.0.5") |
| 69 | + assert isinstance(retry, int) |
| 70 | + assert retry >= 1 |
| 71 | + |
| 72 | + def test_retry_after_unknown_ip_returns_one(self, limiter) -> None: |
| 73 | + """retry_after() for an IP with no history must return 1.""" |
| 74 | + assert limiter.retry_after("10.99.99.99") == 1 |
| 75 | + |
| 76 | + def test_window_evicts_old_timestamps(self, monkeypatch: pytest.MonkeyPatch) -> None: |
| 77 | + """Timestamps from outside the sliding window must be evicted.""" |
| 78 | + import src.interfaces.dashboard_api as mod |
| 79 | + |
| 80 | + # Use a fresh limiter with limit=2 for this test |
| 81 | + monkeypatch.setenv("RATE_LIMIT_READ_RPM", "2") |
| 82 | + lim = mod._RateLimiter() |
| 83 | + ip = "10.0.0.10" |
| 84 | + |
| 85 | + # Simulate 2 old timestamps (61 seconds ago) |
| 86 | + old_ts = time.monotonic() - 61 |
| 87 | + lim._counters[ip] = __import__("collections").deque([old_ts, old_ts]) |
| 88 | + |
| 89 | + # Now a new request should be allowed (old entries evicted) |
| 90 | + assert lim.is_allowed(ip) is True |
| 91 | + # And a second new request too (only 1 in window) |
| 92 | + assert lim.is_allowed(ip) is True |
| 93 | + # Third one is over the limit of 2 |
| 94 | + assert lim.is_allowed(ip) is False |
| 95 | + |
| 96 | + def test_env_override_sets_custom_limit(self, monkeypatch: pytest.MonkeyPatch) -> None: |
| 97 | + """RATE_LIMIT_READ_RPM env var must set the limiter threshold.""" |
| 98 | + monkeypatch.setenv("RATE_LIMIT_READ_RPM", "3") |
| 99 | + import importlib |
| 100 | + import src.interfaces.dashboard_api as mod |
| 101 | + importlib.reload(mod) |
| 102 | + lim = mod._RateLimiter() |
| 103 | + assert lim._read_limit == 3 |
| 104 | + for _ in range(3): |
| 105 | + assert lim.is_allowed("10.0.0.7") is True |
| 106 | + assert lim.is_allowed("10.0.0.7") is False |
0 commit comments