Skip to content

Commit 7d2dcfe

Browse files
authored
update agentapp run method (#161)
1 parent 86b11b9 commit 7d2dcfe

File tree

12 files changed

+272
-184
lines changed

12 files changed

+272
-184
lines changed

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ dependencies = [
2525

2626
[tool.setuptools]
2727
packages = { find = { where = ["src"] } }
28+
include-package-data = true
29+
30+
[tool.setuptools.package-data]
31+
"agentscope_runtime.engine.deployers.utils.service_utils" = ["*.j2"]
2832

2933
[build-system]
3034
requires = ["setuptools>=42", "wheel"]

src/agentscope_runtime/engine/app/agent_app.py

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from contextlib import asynccontextmanager
55
from typing import Optional, Any, Callable, List
66

7+
import uvicorn
78
from fastapi import FastAPI
89
from pydantic import BaseModel
910

@@ -13,6 +14,11 @@
1314
from ..deployers.adapter.responses.response_api_protocol_adapter import (
1415
ResponseAPIDefaultAdapter,
1516
)
17+
from ..deployers.utils.deployment_modes import DeploymentMode
18+
from ..deployers.utils.service_utils.fastapi_factory import FastAPIAppFactory
19+
from ..deployers.utils.service_utils.service_config import (
20+
DEFAULT_SERVICES_CONFIG,
21+
)
1622
from ..runner import Runner
1723
from ..schemas.agent_schemas import AgentRequest
1824
from ..services.context_manager import ContextManager
@@ -57,6 +63,8 @@ def __init__(
5763
self.request_model = request_model
5864
self.before_start = before_start
5965
self.after_finish = after_finish
66+
self.broker_url = broker_url
67+
self.backend_url = backend_url
6068

6169
self._agent = agent
6270
self._runner = None
@@ -116,39 +124,62 @@ def run(
116124
host="0.0.0.0",
117125
port=8090,
118126
embed_task_processor=False,
127+
services_config=None,
119128
**kwargs,
120129
):
130+
"""
131+
Run the AgentApp using FastAPIAppFactory directly.
132+
133+
Args:
134+
host: Host to bind to
135+
port: Port to bind to
136+
embed_task_processor: Whether to embed task processor
137+
services_config: Optional services configuration
138+
**kwargs: Additional keyword arguments
139+
"""
140+
121141
try:
122-
loop = asyncio.get_event_loop()
123-
if self._runner is not None:
124-
loop.run_until_complete(self._runner.__aenter__())
142+
logger.info(
143+
"[AgentApp] Starting AgentApp with FastAPIAppFactory...",
144+
)
145+
146+
# Use default services config if not provided
147+
if services_config is None:
148+
services_config = DEFAULT_SERVICES_CONFIG
149+
150+
# Create FastAPI application using the factory
151+
fastapi_app = FastAPIAppFactory.create_app(
152+
runner=self._runner,
153+
endpoint_path=self.endpoint_path,
154+
request_model=self.request_model,
155+
response_type=self.response_type,
156+
stream=self.stream,
157+
before_start=self.before_start,
158+
after_finish=self.after_finish,
159+
mode=DeploymentMode.DAEMON_THREAD,
160+
services_config=services_config,
161+
protocol_adapters=self.protocol_adapters,
162+
custom_endpoints=self.custom_endpoints,
163+
broker_url=self.broker_url,
164+
backend_url=self.backend_url,
165+
enable_embedded_worker=embed_task_processor,
166+
**kwargs,
167+
)
125168

126-
logger.info("[AgentApp] Runner initialized.")
169+
logger.info(f"[AgentApp] Starting server on {host}:{port}")
127170

128-
super().run(
171+
# Start the FastAPI application with uvicorn
172+
uvicorn.run(
173+
fastapi_app,
129174
host=host,
130175
port=port,
131-
embed_task_processor=embed_task_processor,
132-
**kwargs,
176+
log_level="info",
177+
access_log=True,
133178
)
134179

135180
except Exception as e:
136181
logger.error(f"[AgentApp] Error while running: {e}")
137-
138-
finally:
139-
try:
140-
try:
141-
loop = asyncio.get_event_loop()
142-
except RuntimeError:
143-
loop = asyncio.new_event_loop()
144-
asyncio.set_event_loop(loop)
145-
if self._runner is not None:
146-
loop.run_until_complete(
147-
self._runner.__aexit__(None, None, None),
148-
)
149-
logger.info("[AgentApp] Runner cleaned up.")
150-
except Exception as e:
151-
logger.error(f"[AgentApp] Error while cleaning up runner: {e}")
182+
raise
152183

153184
async def deploy(self, deployer, **kwargs):
154185
"""Deploy the agent app with custom endpoints support"""

src/agentscope_runtime/engine/deployers/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
class DeployManager(ABC):
1010
def __init__(self):
1111
self.deploy_id = str(uuid.uuid4())
12+
self._app = None
1213

1314
@abstractmethod
1415
async def deploy(self, *args, **kwargs) -> Dict[str, str]:

src/agentscope_runtime/engine/deployers/local_deployer.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ def __init__(
2929
self,
3030
host: str = "127.0.0.1",
3131
port: int = 8000,
32-
shutdown_timeout: int = 120,
32+
shutdown_timeout: int = 30,
33+
startup_timeout: int = 30,
3334
logger: Optional[logging.Logger] = None,
3435
):
3536
"""Initialize LocalDeployManager.
@@ -44,6 +45,7 @@ def __init__(
4445
self.host = host
4546
self.port = port
4647
self._shutdown_timeout = shutdown_timeout
48+
self._startup_timeout = startup_timeout
4749
self._logger = logger or logging.getLogger(__name__)
4850

4951
# State management
@@ -66,6 +68,7 @@ def __init__(
6668

6769
async def deploy(
6870
self,
71+
app=None,
6972
runner: Optional[Any] = None,
7073
endpoint_path: str = "/process",
7174
request_model: Optional[Type] = None,
@@ -111,6 +114,19 @@ async def deploy(
111114
if self.is_running:
112115
raise RuntimeError("Service is already running")
113116

117+
self._app = app
118+
if self._app is not None:
119+
runner = self._app._runner
120+
endpoint_path = self._app.endpoint_path
121+
response_type = self._app.response_type
122+
stream = self._app.stream
123+
request_model = self._app.request_model
124+
before_start = self._app.before_start
125+
after_finish = self._app.after_finish
126+
backend_url = self._app.backend_url
127+
broker_url = self._app.broker_url
128+
custom_endpoints = self._app.custom_endpoints
129+
protocol_adapters = self._app.protocol_adapters
114130
try:
115131
if mode == DeploymentMode.DAEMON_THREAD:
116132
return await self._deploy_daemon_thread(
@@ -194,7 +210,7 @@ def run_server():
194210
self._server_thread.start()
195211

196212
# Wait for server to start
197-
await self._wait_for_server_ready()
213+
await self._wait_for_server_ready(self._startup_timeout)
198214

199215
self.is_running = True
200216
self.deploy_id = f"daemon_{self.host}_{self.port}"

src/agentscope_runtime/engine/deployers/modelstudio_deployer.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,16 +201,6 @@ async def _oss_create_bucket_if_not_exists(client, bucket_name: str) -> None:
201201
)
202202

203203

204-
def _create_bucket_name(prefix: str, base_name: str) -> str:
205-
import re as _re
206-
207-
ts = time.strftime("%Y%m%d-%H%M%S", time.gmtime())
208-
base = _re.sub(r"\s+", "-", base_name)
209-
base = _re.sub(r"[^a-zA-Z0-9-]", "", base).lower().strip("-")
210-
name = f"{prefix}-{base}-{ts}"
211-
return name[:63]
212-
213-
214204
async def _oss_put_and_presign(
215205
client,
216206
bucket_name: str,

src/agentscope_runtime/engine/deployers/utils/service_utils/fastapi_factory.py

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,10 @@ async def _handle_shutdown(
274274
and not app.state.runner_managed_externally
275275
):
276276
runner = app.state.runner
277-
if runner and hasattr(runner, "context_manager"):
277+
if runner:
278278
try:
279-
await runner.context_manager.__aexit__(None, None, None)
279+
# Clean up runner
280+
await runner.__aexit__(None, None, None)
280281
except Exception as e:
281282
print(f"Warning: Error during runner cleanup: {e}")
282283

@@ -297,15 +298,15 @@ async def _create_internal_runner(services_config: ServicesConfig):
297298
memory_service=services["memory"],
298299
)
299300

300-
# Initialize context manager
301-
await context_manager.__aenter__()
302-
303301
# Create runner (agent will be set later)
304302
runner = Runner(
305303
agent=None, # Will be set by the specific deployment
306304
context_manager=context_manager,
307305
)
308306

307+
# Initialize runner
308+
await runner.__aenter__()
309+
309310
return runner
310311

311312
@staticmethod
@@ -673,18 +674,48 @@ def _create_streaming_parameter_wrapper(
673674
try:
674675
sig = inspect.signature(handler)
675676
params = list(sig.parameters.values())
677+
no_params = False
678+
param_annotation = None
676679

677680
if not params:
678-
# No parameters, call function directly
679-
return handler
681+
no_params = True
682+
else:
683+
# Get the first parameter
684+
first_param = params[0]
685+
param_annotation = first_param.annotation
680686

681-
# Get the first parameter
682-
first_param = params[0]
683-
param_annotation = first_param.annotation
687+
# If no annotation or annotation is Request, goto no params
688+
# logic
689+
if param_annotation in [inspect.Parameter.empty, Request]:
690+
no_params = True
684691

685-
# If no annotation or annotation is Request, pass Request directly
686-
if param_annotation in [inspect.Parameter.empty, Request]:
687-
return handler
692+
if no_params:
693+
if is_async_gen:
694+
695+
async def async_no_param_wrapper():
696+
async def generate():
697+
async for chunk in handler():
698+
yield str(chunk)
699+
700+
return StreamingResponse(
701+
generate(),
702+
media_type="text/plain",
703+
)
704+
705+
return async_no_param_wrapper
706+
else:
707+
708+
async def sync_no_param_wrapper():
709+
def generate():
710+
for chunk in handler():
711+
yield str(chunk)
712+
713+
return StreamingResponse(
714+
generate(),
715+
media_type="text/plain",
716+
)
717+
718+
return sync_no_param_wrapper
688719

689720
# Check if the annotation is a Pydantic model
690721
if isinstance(param_annotation, type) and issubclass(
@@ -693,24 +724,20 @@ def _create_streaming_parameter_wrapper(
693724
):
694725
if is_async_gen:
695726

696-
async def async_stream_pydantic_wrapper(request: Request):
727+
async def async_stream_pydantic_wrapper(
728+
request: Request,
729+
):
697730
try:
698731
body = await request.json()
699732
parsed_param = param_annotation(**body)
700733

701-
# Create async generator and return
702-
# StreamingResponse
703734
async def generate():
704735
async for chunk in handler(parsed_param):
705736
yield str(chunk)
706737

707738
return StreamingResponse(
708739
generate(),
709-
media_type="text/event-stream",
710-
headers={
711-
"Cache-Control": "no-cache",
712-
"Connection": "keep-alive",
713-
},
740+
media_type="text/plain",
714741
)
715742
except Exception as e:
716743
return StreamingResponse(
@@ -721,7 +748,9 @@ async def generate():
721748
return async_stream_pydantic_wrapper
722749
else:
723750

724-
async def sync_stream_pydantic_wrapper(request: Request):
751+
async def sync_stream_pydantic_wrapper(
752+
request: Request,
753+
):
725754
try:
726755
body = await request.json()
727756
parsed_param = param_annotation(**body)
@@ -738,19 +767,16 @@ def generate():
738767
return JSONResponse(
739768
status_code=422,
740769
content={
741-
"detail": f"Request parsing error: "
742-
f"{str(e)}",
770+
"detail": f"Request parsing error:"
771+
f" {str(e)}",
743772
},
744773
)
745774

746775
return sync_stream_pydantic_wrapper
747776

748-
# For other types, fall back to original behavior
749777
return handler
750778

751779
except Exception:
752-
# If anything goes wrong with introspection, fall back to
753-
# original behavior
754780
return handler
755781

756782
@staticmethod

0 commit comments

Comments
 (0)