Skip to content

Commit 5b5da63

Browse files
authored
update runner managment logic (#190)
1 parent 3b37f60 commit 5b5da63

2 files changed

Lines changed: 30 additions & 8 deletions

File tree

src/agentscope_runtime/engine/deployers/utils/service_utils/fastapi_factory.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# -*- coding: utf-8 -*-
22
# pylint:disable=too-many-branches, unused-argument, too-many-return-statements
3-
3+
# pylint:disable=protected-access
44

55
import asyncio
66
import inspect
77
import json
8+
import logging
89
from contextlib import asynccontextmanager
910
from typing import Optional, Callable, Type, Any, List, Dict
1011

@@ -18,6 +19,8 @@
1819
from ..deployment_modes import DeploymentMode
1920
from ...adapter.protocol_adapter import ProtocolAdapter
2021

22+
logger = logging.getLogger(__name__)
23+
2124

2225
async def error_stream(e):
2326
yield (
@@ -122,9 +125,9 @@ async def lifespan(app: FastAPI):
122125
app.state.deployment_mode = mode
123126
app.state.services_config = services_config
124127
app.state.stream_enabled = stream
125-
app.state.response_type = response_type
126128
app.state.custom_func = func
127-
app.state.external_runner = runner
129+
app.state.runner = runner
130+
app.state.response_type = response_type
128131
app.state.endpoint_path = endpoint_path
129132
app.state.protocol_adapters = protocol_adapters # Store for later use
130133
app.state.custom_endpoints = (
@@ -170,6 +173,28 @@ async def _handle_startup(
170173
app.state.runner = external_runner
171174
app.state.runner_managed_externally = True
172175

176+
# in case no runner
177+
if app.state.runner:
178+
app_service_instances = (
179+
app.state.runner._context_manager.service_instances
180+
)
181+
for instance in app_service_instances.values():
182+
# If any instance was not ready, reset runner.
183+
if not await instance.health():
184+
app.state.runner_managed_externally = False
185+
break
186+
187+
if not app.state.runner_managed_externally:
188+
try:
189+
# aexit any possible running instances before set up
190+
# runner
191+
await app.state.runner.__aexit__(None, None, None)
192+
await app.state.runner.__aenter__()
193+
except Exception as e:
194+
logger.error(
195+
f"Warning: Error during runner setup: {e}",
196+
)
197+
173198
elif mode in [
174199
DeploymentMode.DETACHED_PROCESS,
175200
DeploymentMode.STANDALONE,
@@ -243,9 +268,6 @@ def start_celery_worker():
243268
queues=queues,
244269
)
245270
except Exception as e:
246-
import logging
247-
248-
logger = logging.getLogger(__name__)
249271
logger.error(f"Failed to start Celery worker: {e}")
250272

251273
worker_thread = threading.Thread(
@@ -279,7 +301,7 @@ async def _handle_shutdown(
279301
# Clean up runner
280302
await runner.__aexit__(None, None, None)
281303
except Exception as e:
282-
print(f"Warning: Error during runner cleanup: {e}")
304+
logger.error(f"Warning: Error during runner cleanup: {e}")
283305

284306
@staticmethod
285307
async def _create_internal_runner(services_config: ServicesConfig):

tests/unit/test_service_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def test_create_app_with_runner(self, mocker):
4444
stream=False,
4545
)
4646

47-
assert app.state.external_runner == mock_runner
47+
assert app.state.runner == mock_runner
4848
assert app.state.endpoint_path == "/api/process"
4949
assert app.state.response_type == "json"
5050
assert app.state.stream_enabled is False

0 commit comments

Comments
 (0)