Skip to content

Add semaphore to limit concurrent API calls during Kubernetes observer startup#165

Open
tomerqodo wants to merge 2 commits into
qodo_action_req_1_base_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11from
qodo_action_req_1_head_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11
Open

Add semaphore to limit concurrent API calls during Kubernetes observer startup#165
tomerqodo wants to merge 2 commits into
qodo_action_req_1_base_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11from
qodo_action_req_1_head_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11

Conversation

@tomerqodo
Copy link
Copy Markdown

Benchmark PR from agentic-review-benchmarks#11

desertaxle and others added 2 commits January 25, 2026 12:10
…r startup

closes PrefectHQ#19937

When the observer starts, it receives events for all existing pods/jobs
and checks for duplicates via API calls. This can overwhelm the server
when there are many pods. This change adds a configurable semaphore
(default: 5 concurrent calls) to rate-limit these startup checks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@qodo-code-review
Copy link
Copy Markdown

Code Review by Qodo

🐞 Bugs (1) 📘 Rule violations (0) 📎 Requirement gaps (0)

Grey Divider


Action required

1. Semaphore misses API call 🐞 Bug ⛯ Reliability
Description
• The new startup semaphore is intended to limit concurrent /events/filter calls, but it is only
  held while building the EventFilter, then released before the actual
  orchestration_client.request(...) runs.
• This defeats the throttling goal and can still overwhelm the Prefect API during startup scans with
  many existing pods.
• The current behavior also adds avoidable scheduling overhead (queueing for a semaphore that
  doesn’t protect the expensive operation).
Code

src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py[R135-159]

+        # Use semaphore to limit concurrent API calls during startup to prevent
+        # overwhelming the API server when there are many existing pods/jobs
+        async with _startup_event_semaphore:
+            # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
+            event_filter = EventFilter(
+                event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
+                resource=EventResourceFilter(
+                    id=[f"prefect.kubernetes.pod.{uid}"],
+                ),
+                occurred=EventOccurredFilter(
+                    since=(
+                        k8s_created_time
+                        if k8s_created_time
+                        else (datetime.now(timezone.utc) - timedelta(hours=1))
+                    )
+                ),
+            )

        response = await orchestration_client.request(
            "POST",
            "/events/filter",
-            json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")),
+            json=dict(
+                filter=event_filter.model_dump(exclude_unset=True, mode="json")
+            ),
        )
Evidence
The code comment states the semaphore is meant to limit concurrent startup API calls, but the
request() that performs the API call is outside the async with _startup_event_semaphore: block.
Therefore, the semaphore cannot limit concurrent API calls as intended.

src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py[129-159]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
The startup semaphore is intended to limit concurrent `/events/filter` calls, but it currently wraps only `EventFilter` construction. The actual `orchestration_client.request(...)` happens after the semaphore is released, so concurrency is not throttled.

### Issue Context
This branch runs when `event_type is None` (startup/backfill-style events), which is precisely when many concurrent dedupe checks can occur.

### Fix Focus Areas
- src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py[129-162]

### Suggested change
Move the `orchestration_client.request(...)` and the `if response.json()[&quot;events&quot;]: return` logic inside the `async with _startup_event_semaphore:` block so the semaphore actually limits concurrent requests.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



ⓘ The new review experience is currently in Beta. Learn more

Qodo Logo

Comment on lines +135 to 159
# Use semaphore to limit concurrent API calls during startup to prevent
# overwhelming the API server when there are many existing pods/jobs
async with _startup_event_semaphore:
# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)

response = await orchestration_client.request(
"POST",
"/events/filter",
json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")),
json=dict(
filter=event_filter.model_dump(exclude_unset=True, mode="json")
),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Semaphore misses api call 🐞 Bug ⛯ Reliability

• The new startup semaphore is intended to limit concurrent /events/filter calls, but it is only
  held while building the EventFilter, then released before the actual
  orchestration_client.request(...) runs.
• This defeats the throttling goal and can still overwhelm the Prefect API during startup scans with
  many existing pods.
• The current behavior also adds avoidable scheduling overhead (queueing for a semaphore that
  doesn’t protect the expensive operation).
Agent prompt
### Issue description
The startup semaphore is intended to limit concurrent `/events/filter` calls, but it currently wraps only `EventFilter` construction. The actual `orchestration_client.request(...)` happens after the semaphore is released, so concurrency is not throttled.

### Issue Context
This branch runs when `event_type is None` (startup/backfill-style events), which is precisely when many concurrent dedupe checks can occur.

### Fix Focus Areas
- src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py[129-162]

### Suggested change
Move the `orchestration_client.request(...)` and the `if response.json()["events"]: return` logic inside the `async with _startup_event_semaphore:` block so the semaphore actually limits concurrent requests.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants