Skip to content
32 changes: 22 additions & 10 deletions src/xagent/web/api/v1/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ async def create_chat_task(

Returns:
:class:`CreateTaskResponse` with the new ``task_id``,
``agent_id``, ``status='pending'``, and ``created_at`` for the
caller to start polling from.
``agent_id``, ``status='running'`` (the atomic claim inside
the handler flips the row from PENDING to RUNNING before the
response is sent), and ``created_at`` for the caller to
start polling from.

Raises:
V1ApiError 401: missing/invalid/revoked key (raised inside
Expand Down Expand Up @@ -160,10 +162,18 @@ async def create_chat_task(
except TaskTurnError:
raise V1ApiError(V1ErrorCode.TASK_BUSY, 409)

# ``status=task.status.value`` (i.e. 'running'), not 'pending':
# ``begin_turn`` ran an atomic UPDATE that flipped the row to
# RUNNING and ``db.refresh(task)``'d the in-memory object before
# returning. Returning 'pending' would lie to the SDK client --
# an immediately-following GET would see 'running' and the caller
# would have to reconcile two contradictory values from
# back-to-back calls. This matches the AppendMessageResponse
# contract below.
return CreateTaskResponse(
task_id=int(task.id),
agent_id=int(agent.id),
status="pending",
status=task.status.value,
Comment thread
qinxuye marked this conversation as resolved.
created_at=task.created_at,
)

Expand Down Expand Up @@ -311,16 +321,18 @@ async def append_message_to_task(
# see a value that matches what they'd read from the DB directly
# via GET /v1/chat/tasks/{id}, with no clock-skew between the two.
#
# ``status='running'`` (not 'pending') because the atomic UPDATE
# above already flipped the row to RUNNING in the same transaction.
# Returning 'pending' here would lie to the SDK client: an
# immediately-following GET would see 'running' and the client
# would have to reconcile two contradictory values from
# back-to-back calls.
# ``status=task.status.value`` (i.e. 'running'), read from the
# refreshed in-memory row rather than hardcoded, mirrors the
# CreateTaskResponse contract above: the atomic UPDATE inside
# ``begin_turn`` flipped the row to RUNNING in the same
# transaction. Returning 'pending' here would lie to the SDK
# client -- an immediately-following GET would see 'running' and
# the caller would have to reconcile two contradictory values
# from back-to-back calls.
return AppendMessageResponse(
task_id=int(task.id),
agent_id=int(agent.id),
status="running",
status=task.status.value,
accepted_at=task.updated_at,
)

Expand Down
90 changes: 71 additions & 19 deletions src/xagent/web/api/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,9 +1442,18 @@ async def execute_task_background(
else:
task_updated.status = TaskStatus.FAILED
sync_workforce_run_status(db_new, task_updated, task_updated.status)
db_new.commit()
# Do NOT commit the terminal status here. Leave it
# pending so the assistant-message persistence below
# commits it atomically: the task is marked terminal
# only once the turn is durably complete. If that write
# fails, the status stays RUNNING and the outer except
# surfaces a real failure -- instead of leaving a
# COMPLETED row with no assistant message. Control
# statuses (PAUSED / WAITING_FOR_USER) above commit
# themselves; they have no assistant message to persist.
logger.info(
f"Updated task {task_id} status to {task_updated.status.value}"
f"Task {task_id} marked {task_updated.status.value} "
"(pending commit with assistant message)"
)
else:
logger.info(
Expand Down Expand Up @@ -1482,6 +1491,17 @@ async def execute_task_background(
if isinstance(chat_response, dict)
else None,
)
# Commit the pending terminal status. ``persist_assistant_message``
# commits internally when it writes a row, but it
# early-returns WITHOUT committing when the assistant
# content is empty (a valid empty-reply turn). This
# explicit commit lands the terminal status in that
# case too, so an empty successful turn stays COMPLETED
# rather than being left RUNNING (and later flipped to
# FAILED by finish_turn). If persistence raised, control
# never reaches here -- the status stays uncommitted and
# the outer except surfaces a real failure.
db_new.commit()

# Materialize broadcast metadata into primitives BEFORE the
# ``finally`` block closes ``db_new``. ``task_updated`` is
Expand Down Expand Up @@ -1596,25 +1616,57 @@ async def execute_task_background(
logger.info(f"Background task {task_id} execution completed")

except Exception as e:
logger.error(f"Background task {task_id} execution failed: {e}", exc_info=True)
# Send error event
# The outer try also spans the post-terminal steps -- assistant
# message persistence and the completion / paused broadcasts --
# that run *after* the task status was already committed terminal
# (COMPLETED above). ``_terminal_task_error_payload`` writes FAILED
# + the real error_message unconditionally, so gate it on the
# task's current status: only a task still RUNNING is a genuine
# execution failure. Otherwise a failed post-completion broadcast
# would rewrite an already-COMPLETED task as FAILED and store the
# broadcast error as the task's failure cause.
status_db = get_session_local()()
try:
message = str(e)
await manager.broadcast_to_task(
{
**_terminal_task_error_payload(
task_id,
message,
event_type="task_error",
),
"task_id": task_id,
"error": message,
"timestamp": datetime.now(timezone.utc).timestamp(),
},
task_id,
current = status_db.query(Task).filter(Task.id == task_id).first()
still_running = current is not None and (
current.status == TaskStatus.RUNNING
)
finally:
status_db.close()

if not still_running:
Comment thread
qinxuye marked this conversation as resolved.
# Terminal state already committed; the exception came from a
# best-effort post-completion step. Observe it without touching
# the row or emitting a contradictory task_error. ``finish_turn``
# still reconciles the terminal fields afterward.
logger.warning(
f"Background task {task_id} post-terminal step failed; "
f"task state left unchanged: {e}",
exc_info=True,
)
else:
logger.error(
f"Background task {task_id} execution failed: {e}", exc_info=True
)
except Exception as broadcast_error:
logger.error(f"Failed to send error notification: {broadcast_error}")
# Genuine failure: _terminal_task_error_payload persists FAILED
# + the real error_message and builds the notification payload.
try:
message = str(e)
await manager.broadcast_to_task(
{
**_terminal_task_error_payload(
task_id,
message,
event_type="task_error",
),
"task_id": task_id,
"error": message,
"timestamp": datetime.now(timezone.utc).timestamp(),
},
task_id,
)
except Exception as broadcast_error:
logger.error(f"Failed to send error notification: {broadcast_error}")
except asyncio.CancelledError:
logger.info(f"Background task {task_id} cancelled")
raise
Expand Down
19 changes: 13 additions & 6 deletions src/xagent/web/schemas/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,21 @@ class V1TemplateDetail(V1TemplateSummary):
class CreateTaskResponse(BaseModel):
"""``POST /v1/chat/tasks`` -> 202 Accepted response.

The task has been persisted and queued for background execution;
callers poll ``GET /v1/chat/tasks/{task_id}`` to observe the
transition pending -> running -> completed/failed.
The task has been persisted, claimed as RUNNING in the same
transaction, and queued for background execution; callers poll
``GET /v1/chat/tasks/{task_id}`` to observe the transition
running -> completed/failed.
"""

task_id: int = Field(..., description="Newly created task primary key.")
agent_id: int = Field(..., description="Agent the task is bound to.")
status: str = Field(
...,
description=(
"Initial status, always 'pending' in the 202 response. "
"Use GET /v1/chat/tasks/{task_id} to observe later transitions."
"Initial status, 'running' in the 202 response (the atomic "
"claim inside POST commits the status flip before the "
"response is sent). Use GET /v1/chat/tasks/{task_id} to "
"observe later transitions."
),
)
created_at: datetime = Field(..., description="UTC creation timestamp.")
Expand Down Expand Up @@ -230,7 +233,11 @@ class AppendMessageResponse(BaseModel):
agent_id: int = Field(..., description="Agent the task is bound to.")
status: str = Field(
...,
description="Initial status of the new turn, always 'pending'.",
description=(
"Initial status of the new turn, 'running' in the 202 "
"response (the atomic claim inside POST commits the status "
"flip before the response is sent)."
),
)
accepted_at: datetime = Field(
...,
Expand Down
4 changes: 3 additions & 1 deletion tests/web/api/v1/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def test_create_task_happy_path(mock_start_task):
assert resp.status_code == 202, resp.text
body = resp.json()
assert body["agent_id"] == agent_id
assert body["status"] == "pending"
# POST atomically claims RUNNING before returning 202, so the
# response body reports the post-claim state, not 'pending'.
assert body["status"] == "running"
assert "task_id" in body
assert "created_at" in body
task_id = body["task_id"]
Expand Down
Loading
Loading