Skip to content

Commit c1d3392

Browse files
committed
Add server support
1 parent 643a2e0 commit c1d3392

14 files changed

+551
-22
lines changed

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
TaskState,
4646
UnsupportedOperationError,
4747
)
48+
from a2a.utils.constants import DEFAULT_LIST_TASKS_PAGE_SIZE
4849
from a2a.utils.errors import ServerError
4950
from a2a.utils.task import apply_history_length
5051
from a2a.utils.telemetry import SpanKind, trace_class
@@ -129,8 +130,13 @@ async def on_list_tasks(
129130
context: ServerCallContext | None = None,
130131
) -> ListTasksResult:
131132
"""Default handler for 'tasks/list'."""
132-
# TODO: #515 - Implement method
133-
raise NotImplementedError('tasks/list not implemented')
133+
page = await self.task_store.list(params, context)
134+
return ListTasksResult(
135+
next_page_token=page.next_page_token,
136+
page_size=params.page_size or DEFAULT_LIST_TASKS_PAGE_SIZE,
137+
tasks=page.tasks,
138+
total_size=page.total_size,
139+
)
134140

135141
async def on_cancel_task(
136142
self, params: TaskIdParams, context: ServerCallContext | None = None
@@ -590,3 +596,9 @@ async def on_delete_task_push_notification_config(
590596
await self._push_config_store.delete_info(
591597
params.id, params.push_notification_config_id
592598
)
599+
600+
601+
def _next_page_token(current_page_token: str) -> str:
602+
if not current_page_token:
603+
return '1'
604+
return str(int(current_page_token) + 1)

src/a2a/server/request_handlers/grpc_handler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,8 +345,15 @@ async def ListTasks(
345345
Returns:
346346
A `ListTasksResponse` object.
347347
"""
348-
# TODO: #515 - Implement method
349-
raise NotImplementedError('tasks/list not implemented')
348+
try:
349+
server_context = self.context_builder.build(context)
350+
result = await self.request_handler.on_list_tasks(
351+
proto_utils.FromProto.list_tasks_params(request), server_context
352+
)
353+
return proto_utils.ToProto.list_tasks_response(result)
354+
except ServerError as e:
355+
await self.abort_context(e, context)
356+
return a2a_pb2.ListTasksResponse()
350357

351358
async def GetAgentCard(
352359
self,

src/a2a/server/request_handlers/jsonrpc_handler.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
ListTaskPushNotificationConfigRequest,
2929
ListTaskPushNotificationConfigResponse,
3030
ListTaskPushNotificationConfigSuccessResponse,
31+
ListTasksParams,
3132
ListTasksRequest,
3233
ListTasksResponse,
34+
ListTasksResult,
35+
ListTasksSuccessResponse,
3336
Message,
3437
SendMessageRequest,
3538
SendMessageResponse,
@@ -375,8 +378,23 @@ async def list_tasks(
375378
Returns:
376379
A `ListTasksResponse` object containing the Task or a JSON-RPC error.
377380
"""
378-
# TODO: #515 - Implement method
379-
raise NotImplementedError('tasks/list not implemented')
381+
try:
382+
result = await self.request_handler.on_list_tasks(
383+
request.params or ListTasksParams(), context
384+
)
385+
except ServerError as e:
386+
return ListTasksResponse(
387+
root=JSONRPCErrorResponse(
388+
id=request.id, error=e.error if e.error else InternalError()
389+
)
390+
)
391+
return prepare_response_object(
392+
request.id,
393+
result,
394+
(ListTasksResult,),
395+
ListTasksSuccessResponse,
396+
ListTasksResponse,
397+
)
380398

381399
async def list_push_notification_config(
382400
self,

src/a2a/server/request_handlers/response_helpers.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
JSONRPCErrorResponse,
1919
ListTaskPushNotificationConfigResponse,
2020
ListTaskPushNotificationConfigSuccessResponse,
21+
ListTasksResponse,
22+
ListTasksResult,
23+
ListTasksSuccessResponse,
2124
Message,
2225
SendMessageResponse,
2326
SendMessageSuccessResponse,
@@ -42,6 +45,7 @@
4245
SendStreamingMessageResponse,
4346
ListTaskPushNotificationConfigResponse,
4447
DeleteTaskPushNotificationConfigResponse,
48+
ListTasksResponse,
4549
)
4650
"""Type variable for RootModel response types."""
4751

@@ -56,6 +60,7 @@
5660
SendStreamingMessageSuccessResponse,
5761
ListTaskPushNotificationConfigSuccessResponse,
5862
DeleteTaskPushNotificationConfigSuccessResponse,
63+
ListTasksSuccessResponse,
5964
)
6065
"""Type variable for SuccessResponse types."""
6166

@@ -69,6 +74,7 @@
6974
| A2AError
7075
| JSONRPCError
7176
| list[TaskPushNotificationConfig]
77+
| ListTasksResult
7278
)
7379
"""Type alias for possible event types produced by handlers."""
7480

src/a2a/server/request_handlers/rest_handler.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from a2a.types import (
2222
AgentCard,
2323
GetTaskPushNotificationConfigParams,
24+
ListTasksParams,
2425
TaskIdParams,
2526
TaskNotFoundError,
2627
TaskQueryParams,
@@ -264,12 +265,12 @@ async def on_get_task(
264265
return MessageToDict(proto_utils.ToProto.task(task))
265266
raise ServerError(error=TaskNotFoundError())
266267

267-
async def list_push_notifications(
268+
async def list_tasks(
268269
self,
269270
request: Request,
270271
context: ServerCallContext,
271272
) -> dict[str, Any]:
272-
"""Handles the 'tasks/pushNotificationConfig/list' REST method.
273+
"""Handles the 'tasks/list' REST method.
273274
274275
This method is currently not implemented.
275276
@@ -278,19 +279,21 @@ async def list_push_notifications(
278279
context: Context provided by the server.
279280
280281
Returns:
281-
A list of `dict` representing the `TaskPushNotificationConfig` objects.
282+
A list of dict representing the`Task` objects.
282283
283284
Raises:
284285
NotImplementedError: This method is not yet implemented.
285286
"""
286-
raise NotImplementedError('list notifications not implemented')
287+
params = ListTasksParams.model_validate(request.query_params)
288+
result = await self.request_handler.on_list_tasks(params, context)
289+
return MessageToDict(proto_utils.ToProto.list_tasks_response(result))
287290

288-
async def list_tasks(
291+
async def list_push_notifications(
289292
self,
290293
request: Request,
291294
context: ServerCallContext,
292295
) -> dict[str, Any]:
293-
"""Handles the 'tasks/list' REST method.
296+
"""Handles the 'tasks/pushNotificationConfig/list' REST method.
294297
295298
This method is currently not implemented.
296299
@@ -299,10 +302,9 @@ async def list_tasks(
299302
context: Context provided by the server.
300303
301304
Returns:
302-
A list of dict representing the`Task` objects.
305+
A list of `dict` representing the `TaskPushNotificationConfig` objects.
303306
304307
Raises:
305308
NotImplementedError: This method is not yet implemented.
306309
"""
307-
# TODO: #515 - Implement method
308-
raise NotImplementedError('list tasks not implemented')
310+
raise NotImplementedError('list notifications not implemented')

src/a2a/server/tasks/database_task_store.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
try:
5-
from sqlalchemy import Table, delete, select
5+
from sqlalchemy import Table, delete, func, select
66
from sqlalchemy.ext.asyncio import (
77
AsyncEngine,
88
AsyncSession,
@@ -21,8 +21,9 @@
2121

2222
from a2a.server.context import ServerCallContext
2323
from a2a.server.models import Base, TaskModel, create_task_model
24-
from a2a.server.tasks.task_store import TaskStore
25-
from a2a.types import Task # Task is the Pydantic model
24+
from a2a.server.tasks.task_store import TaskStore, TasksPage
25+
from a2a.types import ListTasksParams, Task
26+
from a2a.utils.constants import DEFAULT_LIST_TASKS_PAGE_SIZE
2627

2728

2829
logger = logging.getLogger(__name__)
@@ -147,6 +148,54 @@ async def get(
147148
logger.debug('Task %s not found in store.', task_id)
148149
return None
149150

151+
async def list(
152+
self, params: ListTasksParams, context: ServerCallContext | None = None
153+
) -> TasksPage:
154+
"""Retrieves all tasks from the database."""
155+
await self._ensure_initialized()
156+
async with self.async_session_maker() as session:
157+
page_number = int(params.page_token) if params.page_token else 0
158+
page_size = params.page_size or DEFAULT_LIST_TASKS_PAGE_SIZE
159+
offset = page_number * page_size
160+
161+
# Base query for filtering
162+
base_stmt = select(self.task_model)
163+
if params.context_id:
164+
base_stmt = base_stmt.where(
165+
self.task_model.context_id == params.context_id
166+
)
167+
if params.status is not None:
168+
base_stmt = base_stmt.where(
169+
self.task_model.status['state'].as_string()
170+
== params.status.value
171+
)
172+
173+
# Get total count
174+
count_stmt = select(func.count()).select_from(base_stmt.alias())
175+
total_count = (await session.execute(count_stmt)).scalar_one()
176+
177+
# Get paginated results
178+
stmt = (
179+
base_stmt.order_by(self.task_model.id.desc())
180+
.limit(page_size)
181+
.offset(offset)
182+
)
183+
result = await session.execute(stmt)
184+
tasks_models = result.scalars().all()
185+
tasks = [self._from_orm(task_model) for task_model in tasks_models]
186+
187+
next_page_token = (
188+
str(page_number + 1)
189+
if total_count > (page_number + 1) * page_size
190+
else ''
191+
)
192+
193+
return TasksPage(
194+
tasks=tasks,
195+
total_size=total_count,
196+
next_page_token=next_page_token,
197+
)
198+
150199
async def delete(
151200
self, task_id: str, context: ServerCallContext | None = None
152201
) -> None:

src/a2a/server/tasks/inmemory_task_store.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import logging
33

44
from a2a.server.context import ServerCallContext
5-
from a2a.server.tasks.task_store import TaskStore
6-
from a2a.types import Task
5+
from a2a.server.tasks.task_store import TaskStore, TasksPage
6+
from a2a.types import ListTasksParams, Task
7+
from a2a.utils.constants import DEFAULT_LIST_TASKS_PAGE_SIZE
78

89

910
logger = logging.getLogger(__name__)
@@ -43,6 +44,59 @@ async def get(
4344
logger.debug('Task %s not found in store.', task_id)
4445
return task
4546

47+
async def list(
48+
self,
49+
params: ListTasksParams,
50+
context: ServerCallContext | None = None,
51+
) -> TasksPage:
52+
"""Retrieves a list of tasks from the store."""
53+
async with self.lock:
54+
tasks = list(self.tasks.values())
55+
56+
# Apply filtering
57+
if params.context_id:
58+
tasks = [
59+
task for task in tasks if task.context_id == params.context_id
60+
]
61+
if params.status is not None:
62+
tasks = [
63+
task for task in tasks if task.status.state == params.status
64+
]
65+
66+
# Reduce payload
67+
base_updates = {}
68+
if not params.include_artifacts:
69+
base_updates = {'artifacts': []}
70+
for i in range(len(tasks)):
71+
updates = dict(base_updates)
72+
history = tasks[i].history
73+
if params.history_length is not None and history:
74+
limited_history = (
75+
history[-params.history_length :]
76+
if params.history_length > 0
77+
else []
78+
)
79+
updates['history'] = limited_history
80+
tasks[i] = tasks[i].model_copy(update=updates)
81+
82+
# Apply pagination
83+
total_size = len(tasks)
84+
page_token = int(params.page_token) if params.page_token else 0
85+
page_size = params.page_size or DEFAULT_LIST_TASKS_PAGE_SIZE
86+
tasks = tasks[page_token * page_size : (page_token + 1) * page_size]
87+
88+
next_page_token = (
89+
str(page_token + 1)
90+
if (page_token + 1) * page_size < total_size
91+
else ''
92+
)
93+
94+
return TasksPage(
95+
next_page_token=next_page_token,
96+
tasks=tasks,
97+
total_size=total_size,
98+
)
99+
46100
async def delete(
47101
self, task_id: str, context: ServerCallContext | None = None
48102
) -> None:

src/a2a/server/tasks/task_store.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
from abc import ABC, abstractmethod
22

3+
from pydantic import BaseModel
4+
35
from a2a.server.context import ServerCallContext
4-
from a2a.types import Task
6+
from a2a.types import ListTasksParams, Task
7+
8+
9+
class TasksPage(BaseModel):
10+
"""Page with tasks."""
11+
12+
next_page_token: str = ''
13+
tasks: list[Task]
14+
total_size: int
515

616

717
class TaskStore(ABC):
@@ -22,6 +32,14 @@ async def get(
2232
) -> Task | None:
2333
"""Retrieves a task from the store by ID."""
2434

35+
@abstractmethod
36+
async def list(
37+
self,
38+
params: ListTasksParams,
39+
context: ServerCallContext | None = None,
40+
) -> TasksPage:
41+
"""Retrieves a list of tasks from the store."""
42+
2543
@abstractmethod
2644
async def delete(
2745
self, task_id: str, context: ServerCallContext | None = None

0 commit comments

Comments
 (0)