forked from agentscope-ai/agentscope-runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_agent_app_stream_task.py
More file actions
276 lines (232 loc) · 8.08 KB
/
test_agent_app_stream_task.py
File metadata and controls
276 lines (232 loc) · 8.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# -*- coding: utf-8 -*-
# pylint:disable=redefined-outer-name, unused-argument
"""
Integration tests for stream_query background task functionality.
"""
import asyncio
import multiprocessing
import time
import aiohttp
import pytest
from agentscope_runtime.engine import AgentApp
from agentscope_runtime.engine.schemas.agent_schemas import (
AgentRequest,
RunStatus,
)
PORT = 8095
async def wait_for_task_completion(
session: aiohttp.ClientSession,
task_id: str,
timeout: float = 10.0,
poll_interval: float = 0.2,
) -> dict:
"""
Poll and wait for task to complete.
Args:
session: aiohttp client session
task_id: Task ID to poll
timeout: Maximum wait time in seconds
poll_interval: Time between polls in seconds
Returns:
Final task status dict
Raises:
TimeoutError: If task does not complete within timeout
"""
start = time.time()
while time.time() - start < timeout:
status_url = f"http://localhost:{PORT}/process/task/{task_id}"
async with session.get(status_url) as resp:
assert resp.status == 200
data = await resp.json()
if data["status"] in ["finished", "error", "failed"]:
return data
await asyncio.sleep(poll_interval)
raise TimeoutError(
f"Task {task_id} did not complete within {timeout}s",
)
def run_app():
"""Start AgentApp with stream task enabled."""
app = AgentApp(
app_name="TestAgent",
app_description="Test agent for background tasks",
enable_stream_task=True,
stream_task_queue="test_queue",
stream_task_timeout=30,
)
@app.query(framework="agentscope")
async def query_func(self, msgs, request: AgentRequest, **kwargs):
"""
Mock query handler that yields (msg, last) tuples.
Simulates agentscope's stream_printing_messages format.
"""
from agentscope.message import Msg
for i in range(3):
await asyncio.sleep(0.1)
msg = Msg(
name="assistant",
content=f"Thinking step {i}",
role="assistant",
)
yield msg, False
final_msg = Msg(
name="assistant",
content="Final answer",
role="assistant",
)
yield final_msg, True
app.run(host="127.0.0.1", port=PORT)
@pytest.fixture(scope="module")
def start_app():
"""Launch AgentApp in a separate process before the async tests."""
proc = multiprocessing.Process(target=run_app)
proc.start()
import socket
for _ in range(50):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect(("localhost", PORT))
break
except OSError:
time.sleep(0.1)
else:
proc.terminate()
pytest.fail("Server did not start within timeout")
yield
proc.terminate()
proc.join()
@pytest.mark.asyncio
async def test_root_endpoint_shows_task_endpoints(start_app):
"""
Test that root endpoint shows task-related endpoints
when enable_stream_task is True.
"""
url = f"http://localhost:{PORT}/"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
assert resp.status == 200
data = await resp.json()
assert "endpoints" in data
assert "task" in data["endpoints"]
assert data["endpoints"]["task"] == "/process/task"
assert "task_status" in data["endpoints"]
@pytest.mark.asyncio
async def test_submit_stream_query_task(start_app):
"""Test submitting a stream query as background task."""
url = f"http://localhost:{PORT}/process/task"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
json={
"input": [
{
"role": "user",
"type": "message",
"content": [{"type": "text", "text": "Hello"}],
},
],
"session_id": "test-session",
},
) as resp:
assert resp.status == 200
data = await resp.json()
assert "task_id" in data
assert data["status"] == "submitted"
assert data["queue"] == "test_queue"
assert "message" in data
@pytest.mark.asyncio
async def test_get_task_status_pending(start_app):
"""Test getting task status while task is running."""
submit_url = f"http://localhost:{PORT}/process/task"
async with aiohttp.ClientSession() as session:
async with session.post(
submit_url,
json={
"input": [
{
"role": "user",
"type": "message",
"content": [{"type": "text", "text": "Hello"}],
},
],
"session_id": "test-session",
},
) as resp:
data = await resp.json()
task_id = data["task_id"]
await asyncio.sleep(0.05)
status_url = f"http://localhost:{PORT}/process/task/{task_id}"
async with session.get(status_url) as resp:
assert resp.status == 200
status_data = await resp.json()
assert "status" in status_data
assert status_data["status"] in [
"pending",
"running",
"finished",
]
@pytest.mark.asyncio
async def test_get_task_status_finished(start_app):
"""Test getting task status after task completes."""
submit_url = f"http://localhost:{PORT}/process/task"
async with aiohttp.ClientSession() as session:
async with session.post(
submit_url,
json={
"input": [
{
"role": "user",
"type": "message",
"content": [{"type": "text", "text": "Test"}],
},
],
"session_id": "test-session-2",
},
) as resp:
data = await resp.json()
task_id = data["task_id"]
status_data = await wait_for_task_completion(session, task_id)
assert status_data["status"] == "finished"
assert "result" in status_data
assert status_data["result"] is not None
result = status_data["result"]
assert result["object"] == "response"
assert result["status"] == RunStatus.Completed
@pytest.mark.asyncio
async def test_task_only_stores_final_response(start_app):
"""
Test that task only stores the final response,
not intermediate events.
"""
submit_url = f"http://localhost:{PORT}/process/task"
async with aiohttp.ClientSession() as session:
async with session.post(
submit_url,
json={
"input": [
{
"role": "user",
"type": "message",
"content": [{"type": "text", "text": "Final test"}],
},
],
"session_id": "test-session-final",
},
) as resp:
data = await resp.json()
task_id = data["task_id"]
status_data = await wait_for_task_completion(session, task_id)
assert status_data["status"] == "finished"
result = status_data["result"]
assert isinstance(result, dict)
assert result["object"] == "response"
assert result["status"] == RunStatus.Completed
@pytest.mark.asyncio
async def test_get_nonexistent_task(start_app):
"""Test getting status of a non-existent task."""
url = f"http://localhost:{PORT}/process/task/nonexistent-task-id"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
assert resp.status == 200
data = await resp.json()
assert "error" in data
assert "not found" in data["error"].lower()