Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
cyclonedx-npm --output-format json --output-file sbom.json || true

- name: Setup osv-scanner
uses: google/osv-scanner-action@v1
uses: google/osv-scanner-action@v1.9.0
with:
args: -r . -o vuln_report.txt

Expand Down
Empty file added modules/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
18 changes: 16 additions & 2 deletions ratelimit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

import time
import functools
import inspect
from typing import Callable, Dict, Tuple
from fastapi import Request
from fastapi.responses import JSONResponse
Expand All @@ -13,6 +15,7 @@ def _key(request: Request) -> str:

def limiter(limit: int = 60, window_sec: int = 60):
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
req = None
for a in args:
Expand All @@ -21,9 +24,15 @@ async def wrapper(*args, **kwargs):
break
if req is None:
req = kwargs.get("request", None)

# If no request found (e.g., in tests or direct function calls), execute without rate limiting
if req is None:
return await func(*args, **kwargs)
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)

# Apply rate limiting: check window, count requests, and enforce limits
now = time.time()
k = _key(req)
window_start, count = _STORE.get(k, (now, 0))
Expand All @@ -37,6 +46,11 @@ async def wrapper(*args, **kwargs):
headers={"Retry-After": str(retry_after)},
)
_STORE[k] = (window_start, count + 1)
return await func(*args, **kwargs)

# Call the original function
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
return wrapper
return decorator
Loading