refactor: use FastAPI app.state for dependency injection#115
refactor: use FastAPI app.state for dependency injection#115amito merged 1 commit intoredhat-et:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds an async FastAPI lifespan that initializes shared services into Changes
Sequence DiagramsequenceDiagram
participant Startup as App Startup
participant Init as init_app_state()
participant AppState as app.state
participant Request as HTTP Request
participant Provider as Dependency Provider\n(get_*)
participant Endpoint as Route Handler
participant K8s as KubernetesClusterManager
Startup->>Init: call init_app_state (offloaded via asyncio.to_thread)
Init->>AppState: populate workflow, model_catalog,\ndeployment_generator, yaml_validator,\ncluster_managers cache
Request->>Provider: resolve dependency (Depends)
Provider->>AppState: read service from request.app.state
Provider->>Endpoint: inject dependency
Endpoint->>K8s: await get_cluster_manager_or_raise(request, namespace)
K8s->>AppState: cached per-namespace manager\n(or created under lock via run_in_threadpool)
Endpoint->>Endpoint: handle request using injected services
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/neuralnav/api/dependencies.py (1)
48-50: Empty cleanup function may need implementation.
close_app_stateis currently a no-op. If any of the initialized resources (e.g.,RecommendationWorkflow,KubernetesClusterManagerinstances incluster_managers) hold connections or require explicit cleanup, this should be implemented to prevent resource leaks on shutdown.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/neuralnav/api/dependencies.py` around lines 48 - 50, close_app_state is currently a no-op; implement shutdown logic to avoid resource leaks by iterating known state entries (e.g., app.state.cluster_managers and app.state.recommendation_workflow), detecting and calling their cleanup methods (common names: close, shutdown, stop, terminate) and awaiting them if they are coroutines; swallow/log exceptions per-item so one failure doesn't block others, and finally clear or delete those app.state attributes to release references. Ensure you reference the symbols close_app_state, RecommendationWorkflow, KubernetesClusterManager, and cluster_managers when locating and updating the function.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/neuralnav/api/dependencies.py`:
- Around line 48-50: close_app_state is currently a no-op; implement shutdown
logic to avoid resource leaks by iterating known state entries (e.g.,
app.state.cluster_managers and app.state.recommendation_workflow), detecting and
calling their cleanup methods (common names: close, shutdown, stop, terminate)
and awaiting them if they are coroutines; swallow/log exceptions per-item so one
failure doesn't block others, and finally clear or delete those app.state
attributes to release references. Ensure you reference the symbols
close_app_state, RecommendationWorkflow, KubernetesClusterManager, and
cluster_managers when locating and updating the function.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8e1dec2a-00ff-4d77-8ccb-0f48414a523c
📒 Files selected for processing (6)
src/neuralnav/api/app.pysrc/neuralnav/api/dependencies.pysrc/neuralnav/api/routes/configuration.pysrc/neuralnav/api/routes/intent.pysrc/neuralnav/api/routes/recommendation.pysrc/neuralnav/api/routes/reference_data.py
anfredette
left a comment
There was a problem hiding this comment.
I left a couple of comments, but looks good to me otherwise.
src/neuralnav/api/dependencies.py
Outdated
|
|
||
| def close_app_state(app: FastAPI) -> None: | ||
| """Close resources and clear state.""" | ||
|
|
There was a problem hiding this comment.
This function is a no-op. Should it be doing anything?
There was a problem hiding this comment.
| managers = getattr(app.state, "cluster_managers", {}) | |
| managers.clear() |
There was a problem hiding this comment.
We can either remove it completely and re-introduce it if we have resources which need cleanup, or use your suggestion. We can still do without cleanup at this point (the cleanup now happens during process shutdown).
WDYT?
|
|
||
|
|
||
| def get_deployment_generator() -> DeploymentGenerator: | ||
| def get_deployment_generator(request: Request) -> DeploymentGenerator: |
There was a problem hiding this comment.
Nit: get_deployment_mode and set_deployment_mode take Request but are called directly (e.g. configuration.py:62) rather than injected via Depends(). Was this intentional?
There was a problem hiding this comment.
This is a very valid point, and not nit-picky at all. Both get_deployment_mode and set_deployment_mode are defined alongside other dependencies in depdendencies.py, but don't follow the same pattern.
I think it makes more sense to inline their logic into the route handlers (get_mode() and set_mode() in configuration.py).
89eab63 to
a3813f6
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/neuralnav/api/dependencies.py`:
- Around line 32-109: Run the code formatter to fix ruff-formatting errors in
src/neuralnav/api/dependencies.py (the CI failure). In practice, run `ruff
format src/neuralnav/api/dependencies.py` (or your repo's pre-commit/format
command) and commit the resulting changes; ensure formatting issues around the
module-level blocks and function definitions like init_app_state,
close_app_state, and get_cluster_manager_or_raise (and constants like
_MAX_CACHED_NAMESPACES) are resolved so `ruff format --check` passes.
- Around line 82-108: get_cluster_manager_or_raise is synchronous and uses
threading.Lock while calling the blocking
KubernetesClusterManager(namespace=...) constructor (which runs subprocess.run),
causing event-loop stalls; convert get_cluster_manager_or_raise to an async
function, replace the module-level threading.Lock (_cluster_manager_lock) with
an asyncio.Lock stored on app.state (or a module-level asyncio.Lock initialized
at import), perform the blocking constructor call inside a threadpool (e.g., via
asyncio.to_thread or loop.run_in_executor) and keep the critical-section cache
logic under the async lock, and then update all call sites in configuration.py
(deploy_to_cluster, get_k8s_deployment_status, delete_deployment,
list_all_deployments) to await get_cluster_manager_or_raise. Ensure exceptions
are preserved (raise HTTPException on KubernetesDeploymentError) and the
namespace cache size check (_MAX_CACHED_NAMESPACES) remains enforced.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b9cef2a1-90af-40bb-b551-172a8f014bce
📒 Files selected for processing (6)
src/neuralnav/api/app.pysrc/neuralnav/api/dependencies.pysrc/neuralnav/api/routes/configuration.pysrc/neuralnav/api/routes/intent.pysrc/neuralnav/api/routes/recommendation.pysrc/neuralnav/api/routes/reference_data.py
🚧 Files skipped from review as they are similar to previous changes (3)
- src/neuralnav/api/routes/intent.py
- src/neuralnav/api/app.py
- src/neuralnav/api/routes/reference_data.py
src/neuralnav/api/dependencies.py
Outdated
| _cluster_manager_lock = threading.Lock() | ||
| _MAX_CACHED_NAMESPACES = 32 | ||
|
|
||
|
|
||
| def get_cluster_manager_or_raise(namespace: str = "default") -> KubernetesClusterManager: | ||
| def get_cluster_manager_or_raise( | ||
| request: Request, namespace: str = "default" | ||
| ) -> KubernetesClusterManager: | ||
| """Get or create a cluster manager, raising an exception if not accessible.""" | ||
| manager = get_cluster_manager(namespace) | ||
| if manager is None: | ||
| try: | ||
| return KubernetesClusterManager(namespace=namespace) | ||
| except KubernetesDeploymentError as e: | ||
| from fastapi import HTTPException, status | ||
|
|
||
| raise HTTPException( | ||
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | ||
| detail=f"Kubernetes cluster not accessible: {str(e)}", | ||
| ) from e | ||
| return manager | ||
| managers: dict[str, KubernetesClusterManager] = request.app.state.cluster_managers | ||
| if namespace not in managers: | ||
| with _cluster_manager_lock: | ||
| if namespace not in managers: | ||
| if len(managers) >= _MAX_CACHED_NAMESPACES: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=f"Too many namespaces (limit {_MAX_CACHED_NAMESPACES})", | ||
| ) | ||
| try: | ||
| managers[namespace] = KubernetesClusterManager(namespace=namespace) | ||
| logger.info( | ||
| "Kubernetes cluster manager initialized for namespace=%s", namespace | ||
| ) | ||
| except KubernetesDeploymentError as e: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | ||
| detail=f"Kubernetes cluster not accessible: {e}", | ||
| ) from e |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
python - <<'PY'
import ast
from pathlib import Path
# 1) Confirm get_cluster_manager_or_raise is sync and uses lock
dep = next(Path(".").rglob("dependencies.py"))
tree = ast.parse(dep.read_text())
for node in tree.body:
if isinstance(node, ast.FunctionDef) and node.name == "get_cluster_manager_or_raise":
print(f"{dep}:{node.lineno} -> sync function")
uses_lock = any(
isinstance(n, ast.With) and any(
isinstance(item.context_expr, ast.Name) and item.context_expr.id == "_cluster_manager_lock"
for item in n.items
)
for n in ast.walk(node)
)
print(f"uses _cluster_manager_lock via with: {uses_lock}")
# 2) Find async functions calling get_cluster_manager_or_raise
for p in Path(".").rglob("*.py"):
try:
t = ast.parse(p.read_text())
except Exception:
continue
for fn in [n for n in ast.walk(t) if isinstance(n, ast.AsyncFunctionDef)]:
for call in [n for n in ast.walk(fn) if isinstance(n, ast.Call)]:
if isinstance(call.func, ast.Name) and call.func.id == "get_cluster_manager_or_raise":
print(f"async caller: {p}:{call.lineno} in async def {fn.name}")
PYRepository: redhat-et/neuralnav
Length of output: 533
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check how get_cluster_manager_or_raise is called in async handlers
echo "=== Call patterns in async handlers ==="
rg -A 3 -B 3 "get_cluster_manager_or_raise" src/neuralnav/api/routes/configuration.py
echo ""
echo "=== Checking KubernetesClusterManager init signature ==="
find . -name "*.py" -type f | xargs grep -l "class KubernetesClusterManager"Repository: redhat-et/neuralnav
Length of output: 1287
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== KubernetesClusterManager.__init__ implementation ==="
sed -n '/class KubernetesClusterManager/,/^class /p' src/neuralnav/cluster/manager.py | head -50
echo ""
echo "=== Check if __init__ does blocking I/O (Kubernetes API calls) ==="
rg -A 20 "def __init__" src/neuralnav/cluster/manager.py | head -40Repository: redhat-et/neuralnav
Length of output: 2926
Blocking subprocess calls in async request handlers degrade concurrency.
get_cluster_manager_or_raise is a sync function called directly from 4 async route handlers (deploy_to_cluster, get_k8s_deployment_status, delete_deployment, list_all_deployments) without awaiting. Its KubernetesClusterManager constructor calls subprocess.run(["kubectl", "cluster-info"], timeout=10), which blocks the event loop. Combined with threading.Lock, cache misses can cause up to 10-second stalls across concurrent requests.
Make the function async using asyncio.Lock and offload blocking constructor to threadpool:
Refactoring steps
- Change function to async and use
asyncio.Lock:
-import threading
+import asyncio
+from starlette.concurrency import run_in_threadpool
-_cluster_manager_lock = threading.Lock()
+_cluster_manager_lock = asyncio.Lock()
-def get_cluster_manager_or_raise(
+async def get_cluster_manager_or_raise(
request: Request, namespace: str = "default"
) -> KubernetesClusterManager:
- with _cluster_manager_lock:
+ async with _cluster_manager_lock:
- managers[namespace] = KubernetesClusterManager(namespace=namespace)
+ managers[namespace] = await run_in_threadpool(
+ KubernetesClusterManager, namespace=namespace
+ )- Update all 4 call sites in configuration.py to await:
- Line 238:
manager = await get_cluster_manager_or_raise(...) - Line 336:
manager = await get_cluster_manager_or_raise(...) - Line 416:
manager = await get_cluster_manager_or_raise(...) - Line 453:
manager = await get_cluster_manager_or_raise(...)
- Line 238:
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| _cluster_manager_lock = threading.Lock() | |
| _MAX_CACHED_NAMESPACES = 32 | |
| def get_cluster_manager_or_raise(namespace: str = "default") -> KubernetesClusterManager: | |
| def get_cluster_manager_or_raise( | |
| request: Request, namespace: str = "default" | |
| ) -> KubernetesClusterManager: | |
| """Get or create a cluster manager, raising an exception if not accessible.""" | |
| manager = get_cluster_manager(namespace) | |
| if manager is None: | |
| try: | |
| return KubernetesClusterManager(namespace=namespace) | |
| except KubernetesDeploymentError as e: | |
| from fastapi import HTTPException, status | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail=f"Kubernetes cluster not accessible: {str(e)}", | |
| ) from e | |
| return manager | |
| managers: dict[str, KubernetesClusterManager] = request.app.state.cluster_managers | |
| if namespace not in managers: | |
| with _cluster_manager_lock: | |
| if namespace not in managers: | |
| if len(managers) >= _MAX_CACHED_NAMESPACES: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Too many namespaces (limit {_MAX_CACHED_NAMESPACES})", | |
| ) | |
| try: | |
| managers[namespace] = KubernetesClusterManager(namespace=namespace) | |
| logger.info( | |
| "Kubernetes cluster manager initialized for namespace=%s", namespace | |
| ) | |
| except KubernetesDeploymentError as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail=f"Kubernetes cluster not accessible: {e}", | |
| ) from e | |
| import asyncio | |
| from starlette.concurrency import run_in_threadpool | |
| _cluster_manager_lock = asyncio.Lock() | |
| _MAX_CACHED_NAMESPACES = 32 | |
| async def get_cluster_manager_or_raise( | |
| request: Request, namespace: str = "default" | |
| ) -> KubernetesClusterManager: | |
| """Get or create a cluster manager, raising an exception if not accessible.""" | |
| managers: dict[str, KubernetesClusterManager] = request.app.state.cluster_managers | |
| if namespace not in managers: | |
| async with _cluster_manager_lock: | |
| if namespace not in managers: | |
| if len(managers) >= _MAX_CACHED_NAMESPACES: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Too many namespaces (limit {_MAX_CACHED_NAMESPACES})", | |
| ) | |
| try: | |
| managers[namespace] = await run_in_threadpool( | |
| KubernetesClusterManager, namespace=namespace | |
| ) | |
| logger.info( | |
| "Kubernetes cluster manager initialized for namespace=%s", namespace | |
| ) | |
| except KubernetesDeploymentError as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail=f"Kubernetes cluster not accessible: {e}", | |
| ) from e |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/neuralnav/api/dependencies.py` around lines 82 - 108,
get_cluster_manager_or_raise is synchronous and uses threading.Lock while
calling the blocking KubernetesClusterManager(namespace=...) constructor (which
runs subprocess.run), causing event-loop stalls; convert
get_cluster_manager_or_raise to an async function, replace the module-level
threading.Lock (_cluster_manager_lock) with an asyncio.Lock stored on app.state
(or a module-level asyncio.Lock initialized at import), perform the blocking
constructor call inside a threadpool (e.g., via asyncio.to_thread or
loop.run_in_executor) and keep the critical-section cache logic under the async
lock, and then update all call sites in configuration.py (deploy_to_cluster,
get_k8s_deployment_status, delete_deployment, list_all_deployments) to await
get_cluster_manager_or_raise. Ensure exceptions are preserved (raise
HTTPException on KubernetesDeploymentError) and the namespace cache size check
(_MAX_CACHED_NAMESPACES) remains enforced.
There was a problem hiding this comment.
Just to close the loop, after you make those two changes, I'll be good to go with this one, so I'll approve it, and you can merge it when you're done.
It looks like you also have some lint and formatting issues. At least one of the lint issues looked like a formatting issue, so just running the formatter may take care of both.
a3813f6 to
b555548
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/neuralnav/api/routes/configuration.py (1)
296-317:⚠️ Potential issue | 🟠 MajorBlocking subprocess call bypasses the refactored async pattern.
get_cluster_statusdirectly instantiatesKubernetesClusterManager(namespace="default")at line 305, which:
- Calls
subprocess.run(["kubectl", "cluster-info"], timeout=10)synchronously, blocking the event loop- Bypasses the
app.state.cluster_managerscache used by all other cluster endpoints- Creates a new instance per request instead of reusing cached managers
This is inconsistent with the refactored async pattern and can cause up to 10-second stalls under concurrent requests.
Proposed fix: use the async cluster manager helper
`@router.get`("/cluster-status") -async def get_cluster_status(): +async def get_cluster_status(http_request: Request, namespace: str = "default"): """ Get Kubernetes cluster status. Returns: Cluster accessibility and basic info """ try: - temp_manager = KubernetesClusterManager(namespace="default") - deployments = temp_manager.list_inferenceservices() + manager = await get_cluster_manager_or_raise(http_request, namespace) + deployments = manager.list_inferenceservices() return { "accessible": True, - "namespace": temp_manager.namespace, + "namespace": manager.namespace, "inference_services": deployments, "count": len(deployments), "message": "Cluster accessible", } except Exception as e: logger.error(f"Failed to query cluster status: {e}") return {"accessible": False, "error": str(e)}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/neuralnav/api/routes/configuration.py` around lines 296 - 317, get_cluster_status currently constructs KubernetesClusterManager() directly (causing a blocking subprocess call and bypassing the shared cache); change the endpoint to accept a Request (e.g., async def get_cluster_status(request: Request)) and obtain the manager from the app-level cache instead of instantiating KubernetesClusterManager — e.g., lookup request.app.state.cluster_managers.get("default") and if the stored value is a coroutine/factory await or call the async helper that initializes managers, otherwise use the cached instance; ensure you import fastapi.Request, handle a missing manager by returning accessible: False with an error, and remove the direct KubernetesClusterManager(...) construction so no synchronous subprocess.run is executed in the request handler.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/neuralnav/api/routes/configuration.py`:
- Around line 296-317: get_cluster_status currently constructs
KubernetesClusterManager() directly (causing a blocking subprocess call and
bypassing the shared cache); change the endpoint to accept a Request (e.g.,
async def get_cluster_status(request: Request)) and obtain the manager from the
app-level cache instead of instantiating KubernetesClusterManager — e.g., lookup
request.app.state.cluster_managers.get("default") and if the stored value is a
coroutine/factory await or call the async helper that initializes managers,
otherwise use the cached instance; ensure you import fastapi.Request, handle a
missing manager by returning accessible: False with an error, and remove the
direct KubernetesClusterManager(...) construction so no synchronous
subprocess.run is executed in the request handler.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 73c24a50-b331-4906-8ce8-0d814963c9cf
📒 Files selected for processing (6)
src/neuralnav/api/app.pysrc/neuralnav/api/dependencies.pysrc/neuralnav/api/routes/configuration.pysrc/neuralnav/api/routes/intent.pysrc/neuralnav/api/routes/recommendation.pysrc/neuralnav/api/routes/reference_data.py
🚧 Files skipped from review as they are similar to previous changes (3)
- src/neuralnav/api/routes/intent.py
- src/neuralnav/api/routes/reference_data.py
- src/neuralnav/api/routes/recommendation.py
b555548 to
34450bf
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/neuralnav/api/dependencies.py`:
- Around line 44-45: The module-level _cluster_manager_lock must be moved onto
app.state to avoid asyncio.Lock being bound to the wrong event loop; remove the
module-scope _cluster_manager_lock and instead initialize and store an
asyncio.Lock instance on app.state (e.g., app.state._cluster_manager_lock)
alongside app.state.cluster_managers, then update any code that
acquires/releases _cluster_manager_lock to reference
app.state._cluster_manager_lock (and create it lazily on startup if needed) so
the lock is owned by the same loop as the FastAPI app state.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 14013dde-f0a8-4fca-aa0c-5ada8a2c4ab5
📒 Files selected for processing (6)
src/neuralnav/api/app.pysrc/neuralnav/api/dependencies.pysrc/neuralnav/api/routes/configuration.pysrc/neuralnav/api/routes/intent.pysrc/neuralnav/api/routes/recommendation.pysrc/neuralnav/api/routes/reference_data.py
🚧 Files skipped from review as they are similar to previous changes (4)
- src/neuralnav/api/routes/intent.py
- src/neuralnav/api/app.py
- src/neuralnav/api/routes/recommendation.py
- src/neuralnav/api/routes/configuration.py
| app.state.cluster_managers = {} # dict[str, KubernetesClusterManager] | ||
| app.state.workflow = RecommendationWorkflow() |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
🌐 Web query:
In Python 3.11+ / 3.12, can a module-level asyncio.Lock() be safely reused across multiple event loops, or can it become bound to the first loop that acquires it and later fail in a different loop?
💡 Result:
No, a module-level asyncio.Lock cannot be safely reused across multiple event loops in Python 3.11+ or 3.12. It becomes bound to the first event loop that uses it (specifically, the first time _get_loop is called during an operation like acquire) and will raise a RuntimeError if attempted to be used with a different loop. This behavior comes from asyncio.Lock inheriting from mixins._LoopBoundMixin, which implements _get_loop to lazily bind the instance to the currently running loop (asyncio.get_running_loop) under a global lock for thread-safety. Once bound (self._loop is set), any subsequent use checks that the current running loop matches the bound loop; otherwise, it raises RuntimeError(f'{self!r} is bound to a different event loop'). Official docs note the removal of the explicit loop parameter in 3.10 (sources 1,4,6), but the binding still occurs dynamically via this mixin (sources 2,14,18,25). Stack Overflow discussions confirm that pre-3.10 locks bound at creation, but post-3.10 they bind at first use—still preventing reuse across different loops (sources 3,7). Creating the lock at module level (outside any loop) means it starts unbound, binds to the first loop that acquires it, and fails in subsequent different loops (e.g., multiple asyncio.run calls). To use across multiple loops, create a new Lock instance per loop, ideally inside the async context.
Citations:
- 1: https://docs.python.org/3.11/library/asyncio-sync.html
- 2: https://github.com/python/cpython/blob/3.11/Lib/asyncio/locks.py
- 3: https://stackoverflow.com/questions/60026975/event-loop-error-in-asyncio-lock-when-instantiated-multiple-times
- 4: https://docs.python.org/3/library/asyncio-sync.html
- 5: https://docs.python.org/3.12/library/asyncio-sync.html
- 6: https://stackoverflow.com/questions/79825053/cant-use-lock-around-gather-in-python-asyncio
- 7: https://github.com/python/cpython/blob/main/Lib/asyncio/locks.py
- 8: avoid the global_lock in
asyncio.mixins._LoopBoundMixinpython/cpython#95474 - 9: https://android.googlesource.com/platform/prebuilts/python/linux-x86/+/refs/heads/emu-33-release/lib/python3.10/asyncio/mixins.py
Move the cluster-manager lock to app.state.
cluster_managers is stored on app.state, but _cluster_manager_lock remains at module scope. A module-level asyncio.Lock binds to the first event loop that acquires it and will raise RuntimeError if used by a different loop. This causes failures in multi-loop scenarios (e.g., loop-per-test setups). Storing the lock on app.state alongside the cache ensures consistent ownership and prevents cross-loop binding issues.
Suggested change
def init_app_state(app: FastAPI) -> None:
"""Initialize all singletons on app.state during lifespan startup."""
app.state.model_catalog = ModelCatalog()
app.state.slo_repo = SLOTemplateRepository()
app.state.deployment_generator = DeploymentGenerator(simulator_mode=False)
app.state.yaml_validator = YAMLValidator()
app.state.cluster_managers = {} # dict[str, KubernetesClusterManager]
+ app.state.cluster_manager_lock = asyncio.Lock()
app.state.workflow = RecommendationWorkflow()
-_cluster_manager_lock = asyncio.Lock()
_MAX_CACHED_NAMESPACES = 32
async def get_cluster_manager_or_raise(
request: Request, namespace: str = "default"
) -> KubernetesClusterManager:
"""Get or create a cluster manager, raising an exception if not accessible."""
managers: dict[str, KubernetesClusterManager] = request.app.state.cluster_managers
+ cluster_manager_lock = cast(asyncio.Lock, request.app.state.cluster_manager_lock)
if namespace not in managers:
- async with _cluster_manager_lock:
+ async with cluster_manager_lock:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/neuralnav/api/dependencies.py` around lines 44 - 45, The module-level
_cluster_manager_lock must be moved onto app.state to avoid asyncio.Lock being
bound to the wrong event loop; remove the module-scope _cluster_manager_lock and
instead initialize and store an asyncio.Lock instance on app.state (e.g.,
app.state._cluster_manager_lock) alongside app.state.cluster_managers, then
update any code that acquires/releases _cluster_manager_lock to reference
app.state._cluster_manager_lock (and create it lazily on startup if needed) so
the lock is owned by the same loop as the FastAPI app state.
Replace global singleton pattern with FastAPI's app.state and Depends() for cleaner dependency injection. All shared instances are now initialized during app lifespan startup and injected via request.app.state. Assisted-by: Claude <noreply@anthropic.com> Signed-off-by: Amit Oren <amoren@redhat.com>
34450bf to
e551bdf
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (2)
src/neuralnav/api/routes/configuration.py (2)
61-75: Potential race onsimulator_modefor concurrent mode changes.Direct mutation of
gen.simulator_modewithout synchronization could lead to TOCTOU issues if multiple concurrent PUT requests arrive. However, since this is a simple boolean assignment (atomic under CPython's GIL) and mode changes are typically rare admin operations, the practical impact is low.If concurrent mode switching becomes a concern, consider adding a lock around the read/write.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/neuralnav/api/routes/configuration.py` around lines 61 - 75, The get_mode/set_mode handlers directly read/write gen.simulator_mode causing a potential race for concurrent PUTs; wrap accesses with an async lock on the deployment generator (e.g., add or use an asyncio.Lock stored on http_request.app.state.deployment_generator, reference symbols gen, get_mode, set_mode, gen.simulator_mode, and DeploymentModeRequest) so set_mode acquires the lock before mutating simulator_mode and get_mode acquires the lock when reading; ensure the lock is created when the deployment_generator is initialized and use await lock in the handlers to serialize read/write operations.
459-474: Consider parallelizing status fetches for better performance.The loop makes sequential
await run_in_threadpool(...)calls for each deployment (2 calls per deployment). For clusters with many InferenceServices, this could become slow. Consider usingasyncio.gatherto parallelize these fetches.♻️ Optional optimization
deployments = [] + async def fetch_deployment_info(deployment_id): + svc_status = await run_in_threadpool(manager.get_inferenceservice_status, deployment_id) + pods = await run_in_threadpool(manager.get_deployment_pods, deployment_id) + return {"deployment_id": deployment_id, "status": svc_status, "pods": pods} + + deployments = await asyncio.gather(*[fetch_deployment_info(d) for d in deployment_ids]) - for deployment_id in deployment_ids: - svc_status = await run_in_threadpool(manager.get_inferenceservice_status, deployment_id) - pods = await run_in_threadpool(manager.get_deployment_pods, deployment_id) - - deployments.append({"deployment_id": deployment_id, "status": svc_status, "pods": pods})🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/neuralnav/api/routes/configuration.py` around lines 459 - 474, The loop is doing sequential await run_in_threadpool calls for manager.get_inferenceservice_status and manager.get_deployment_pods per deployment, causing slowness; refactor to run these I/O calls concurrently by creating tasks (using asyncio.gather) for each deployment: first await run_in_threadpool(manager.list_inferenceservices) to get deployment_ids, then for each deployment_id create coroutines that call run_in_threadpool(manager.get_inferenceservice_status, deployment_id) and run_in_threadpool(manager.get_deployment_pods, deployment_id) (you can gather the two per-id coroutines or create a single per-id task that gathers both), await asyncio.gather over all per-id tasks, and then build the deployments list with {"deployment_id": ..., "status": ..., "pods": ...} from the gathered results.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/neuralnav/api/routes/configuration.py`:
- Around line 61-75: The get_mode/set_mode handlers directly read/write
gen.simulator_mode causing a potential race for concurrent PUTs; wrap accesses
with an async lock on the deployment generator (e.g., add or use an asyncio.Lock
stored on http_request.app.state.deployment_generator, reference symbols gen,
get_mode, set_mode, gen.simulator_mode, and DeploymentModeRequest) so set_mode
acquires the lock before mutating simulator_mode and get_mode acquires the lock
when reading; ensure the lock is created when the deployment_generator is
initialized and use await lock in the handlers to serialize read/write
operations.
- Around line 459-474: The loop is doing sequential await run_in_threadpool
calls for manager.get_inferenceservice_status and manager.get_deployment_pods
per deployment, causing slowness; refactor to run these I/O calls concurrently
by creating tasks (using asyncio.gather) for each deployment: first await
run_in_threadpool(manager.list_inferenceservices) to get deployment_ids, then
for each deployment_id create coroutines that call
run_in_threadpool(manager.get_inferenceservice_status, deployment_id) and
run_in_threadpool(manager.get_deployment_pods, deployment_id) (you can gather
the two per-id coroutines or create a single per-id task that gathers both),
await asyncio.gather over all per-id tasks, and then build the deployments list
with {"deployment_id": ..., "status": ..., "pods": ...} from the gathered
results.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 40628212-cdb5-4bb7-a0fc-af30d4d48d55
📒 Files selected for processing (6)
src/neuralnav/api/app.pysrc/neuralnav/api/dependencies.pysrc/neuralnav/api/routes/configuration.pysrc/neuralnav/api/routes/intent.pysrc/neuralnav/api/routes/recommendation.pysrc/neuralnav/api/routes/reference_data.py
🚧 Files skipped from review as they are similar to previous changes (3)
- src/neuralnav/api/routes/intent.py
- src/neuralnav/api/app.py
- src/neuralnav/api/routes/recommendation.py
Replace global singleton pattern with FastAPI's app.state and Depends() for cleaner dependency injection. All shared instances are now initialized during app lifespan startup and injected via request.app.state.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor