Add semaphore to limit concurrent API calls during Kubernetes observer startup#11
Conversation
…r startup closes PrefectHQ#19937 When the observer starts, it receives events for all existing pods/jobs and checks for duplicates via API calls. This can overwhelm the server when there are many pods. This change adds a configurable semaphore (default: 5 concurrent calls) to rate-limit these startup checks. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
📝 WalkthroughWalkthroughA semaphore-based concurrency control mechanism is implemented to throttle API calls during Kubernetes observer startup. The new Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py`:
- Around line 135-159: The semaphore _startup_event_semaphore currently only
guards creation of the EventFilter, leaving the actual API call
orchestration_client.request("/events/filter", ...) unthrottled; move the await
orchestration_client.request(...) call (which sends the
event_filter.model_dump(...)) inside the same async with
_startup_event_semaphore block so the request is executed while the semaphore is
held, ensuring the startup API calls are properly rate-limited and still use the
event_filter constructed earlier.
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py`:
- Around line 59-64: The Field definition for startup_event_concurrency
currently allows zero or negative values which can create asyncio.Semaphore(0)
and deadlock; update the Field on startup_event_concurrency to include a minimum
constraint by adding ge=1 (e.g., Field(..., ge=1, description=...)) so the
Pydantic model enforces a lower bound and prevents passing 0 or negative values
to the observer startup logic.
| # Use semaphore to limit concurrent API calls during startup to prevent | ||
| # overwhelming the API server when there are many existing pods/jobs | ||
| async with _startup_event_semaphore: | ||
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | ||
| event_filter = EventFilter( | ||
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | ||
| resource=EventResourceFilter( | ||
| id=[f"prefect.kubernetes.pod.{uid}"], | ||
| ), | ||
| occurred=EventOccurredFilter( | ||
| since=( | ||
| k8s_created_time | ||
| if k8s_created_time | ||
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | ||
| ) | ||
| ), | ||
| ) | ||
|
|
||
| response = await orchestration_client.request( | ||
| "POST", | ||
| "/events/filter", | ||
| json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")), | ||
| json=dict( | ||
| filter=event_filter.model_dump(exclude_unset=True, mode="json") | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Semaphore doesn’t cover the API call, so throttling is ineffective.
Right now only EventFilter construction is inside the semaphore; the /events/filter request at Line 153 runs unbounded. Move the request into the async with block.
🐛 Suggested fix
async with _startup_event_semaphore:
# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)
-
- response = await orchestration_client.request(
- "POST",
- "/events/filter",
- json=dict(
- filter=event_filter.model_dump(exclude_unset=True, mode="json")
- ),
- )
+ response = await orchestration_client.request(
+ "POST",
+ "/events/filter",
+ json=dict(
+ filter=event_filter.model_dump(exclude_unset=True, mode="json")
+ ),
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Use semaphore to limit concurrent API calls during startup to prevent | |
| # overwhelming the API server when there are many existing pods/jobs | |
| async with _startup_event_semaphore: | |
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | |
| event_filter = EventFilter( | |
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | |
| resource=EventResourceFilter( | |
| id=[f"prefect.kubernetes.pod.{uid}"], | |
| ), | |
| occurred=EventOccurredFilter( | |
| since=( | |
| k8s_created_time | |
| if k8s_created_time | |
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | |
| ) | |
| ), | |
| ) | |
| response = await orchestration_client.request( | |
| "POST", | |
| "/events/filter", | |
| json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")), | |
| json=dict( | |
| filter=event_filter.model_dump(exclude_unset=True, mode="json") | |
| ), | |
| ) | |
| # Use semaphore to limit concurrent API calls during startup to prevent | |
| # overwhelming the API server when there are many existing pods/jobs | |
| async with _startup_event_semaphore: | |
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | |
| event_filter = EventFilter( | |
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | |
| resource=EventResourceFilter( | |
| id=[f"prefect.kubernetes.pod.{uid}"], | |
| ), | |
| occurred=EventOccurredFilter( | |
| since=( | |
| k8s_created_time | |
| if k8s_created_time | |
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | |
| ) | |
| ), | |
| ) | |
| response = await orchestration_client.request( | |
| "POST", | |
| "/events/filter", | |
| json=dict( | |
| filter=event_filter.model_dump(exclude_unset=True, mode="json") | |
| ), | |
| ) |
🤖 Prompt for AI Agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py` around
lines 135 - 159, The semaphore _startup_event_semaphore currently only guards
creation of the EventFilter, leaving the actual API call
orchestration_client.request("/events/filter", ...) unthrottled; move the await
orchestration_client.request(...) call (which sends the
event_filter.model_dump(...)) inside the same async with
_startup_event_semaphore block so the request is executed while the semaphore is
held, ensuring the startup API calls are properly rate-limited and still use the
event_filter constructed earlier.
| startup_event_concurrency: int = Field( | ||
| default=5, | ||
| description="Maximum number of concurrent API calls when checking for " | ||
| "duplicate events during observer startup. This helps prevent overloading " | ||
| "the API server when there are many existing pods/jobs in the cluster.", | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check if the file exists and read the relevant section
head -70 src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py | tail -20Repository: agentic-review-benchmarks/prefect-combined-coderabbit
Length of output: 989
🏁 Script executed:
# Search for usage of startup_event_concurrency in the codebase
rg "startup_event_concurrency" -A 3 -B 3Repository: agentic-review-benchmarks/prefect-combined-coderabbit
Length of output: 1607
🏁 Script executed:
# Search for asyncio.Semaphore usage to see if it's directly related to this field
rg "asyncio\.Semaphore|Semaphore" -A 2 -B 2Repository: agentic-review-benchmarks/prefect-combined-coderabbit
Length of output: 6193
🌐 Web query:
Pydantic v2 Field constraints ge parameter minimum value int
💡 Result:
In Pydantic v2, ge on an int field means “greater than or equal to” the value you provide (i.e., it sets the inclusive minimum). You choose the minimum—there’s no special built-in “minimum int” beyond Python’s int. (docs.pydantic.dev)
Example (model validation)
from pydantic import BaseModel, Field
class Model(BaseModel):
x: int = Field(ge=0) # x must be >= 0Model(x=-1) will fail validation; Model(x=0) and Model(x=5) are valid. (docs.pydantic.dev)
JSON Schema output
Field(ge=0) becomes JSON Schema "minimum": 0 (and le=... becomes "maximum": ...). (zenn.dev)
Alternative: constrained int type
from pydantic import BaseModel, conint
class Model(BaseModel):
x: conint(ge=0)conint explicitly documents ge as int | None for integer constraints. (docs.pydantic.dev)
If you have a union, apply ge only to the int branch
from typing import Annotated
from pydantic import BaseModel, Field
class Foo(BaseModel):
bar: str | list | Annotated[int, Field(ge=2)]Otherwise, Pydantic may try to apply ge to non-numeric values and raise a TypeError. (stackoverflow.com)
Citations:
- 1: https://docs.pydantic.dev/latest/concepts/fields/?utm_source=openai
- 2: https://docs.pydantic.dev/latest/concepts/fields/?utm_source=openai
- 3: https://zenn.dev/7shi/articles/20251231-pydantic_ge_le?utm_source=openai
- 4: https://docs.pydantic.dev/2.2/api/types/?utm_source=openai
- 5: https://stackoverflow.com/questions/79598174/how-do-conditionally-apply-field-constraints-based-on-value-type-in-pydantic-v2?utm_source=openai
Add validation to prevent startup_event_concurrency from causing deadlock.
Without a lower bound, setting this to 0 or negative will create asyncio.Semaphore(0), which blocks all acquisitions and causes observer initialization to hang. Add ge=1 to enforce a minimum value.
Suggested fix
startup_event_concurrency: int = Field(
default=5,
+ ge=1,
description="Maximum number of concurrent API calls when checking for "
"duplicate events during observer startup. This helps prevent overloading "
"the API server when there are many existing pods/jobs in the cluster.",
)🤖 Prompt for AI Agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py` around
lines 59 - 64, The Field definition for startup_event_concurrency currently
allows zero or negative values which can create asyncio.Semaphore(0) and
deadlock; update the Field on startup_event_concurrency to include a minimum
constraint by adding ge=1 (e.g., Field(..., ge=1, description=...)) so the
Pydantic model enforces a lower bound and prevents passing 0 or negative values
to the observer startup logic.
Benchmark PR from agentic-review-benchmarks#11
Summary by CodeRabbit
New Features
Tests