Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/pyiem/webutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,11 +464,18 @@ def _mcall(
func: Callable,
environ: dict,
start_response: Callable,
ip_throttle_secs: float | Callable,
memcachekey: str | Callable | None,
expire: int | Callable,
content_type: str | Callable,
):
"""Call the function with memcachekey handling."""
if ip_is_throttled(environ, ip_throttle_secs):
start_response(
"429 Too Many Requests",
[("Content-type", "text/plain")],
)
return b"Too many requests from your IP address, slow down."
if memcachekey is None:
return func(environ, start_response)
key = memcachekey if isinstance(memcachekey, str) else memcachekey(environ)
Expand Down Expand Up @@ -593,14 +600,6 @@ def _handle_exp(errormsg, routine=False, code=500):
)
return msg.encode("ascii", errors="replace")

if ip_is_throttled(environ, kwargs.get("ip_throttle_secs", 0)):
start_response(
"429 Too Many Requests",
[("Content-type", "text/plain")],
)
yield b"Too many requests from your IP address, slow down."
return

start_time = datetime.now(timezone.utc)
status_code = 500
try:
Expand All @@ -615,6 +614,7 @@ def _handle_exp(errormsg, routine=False, code=500):
func,
environ,
start_response,
kwargs.get("ip_throttle_secs", 0),
kwargs.get("memcachekey"),
kwargs.get("memcacheexpire", 3600),
kwargs.get("content_type", "application/json"),
Expand Down
14 changes: 11 additions & 3 deletions tests/test_webutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,25 @@ def test_xss_false_positive_ampersand():
def test_ip_throttled_callable():
"""Test that the ip throttle is callable."""

@iemapp(allowed_as_list=["q"], ip_throttle_secs=lambda _x: 0)
class Schema(CGIModel):
"""Test."""

q: Annotated[int, Field(description="A")] = 0

@iemapp(
schema=Schema,
ip_throttle_secs=lambda x: 0 if x["q"] < 0 else 10,
)
def application(_environ, start_response):
"""Test."""
start_response("200 OK", [("Content-type", "text/plain")])
return f"{random.random()}"

eo = {"REMOTE_ADDR": "7.7.7.7"}
c = Client(application)
resp = c.get("/?q=1", environ_overrides=eo)
resp = c.get("/?q=-1", environ_overrides=eo)
assert resp.status_code == 200
resp = c.get("/?q=1", environ_overrides=eo)
resp = c.get("/?q=-1", environ_overrides=eo)
assert resp.status_code == 200


Expand Down
Loading