diff --git a/src/xagent/web/api/v1/tasks.py b/src/xagent/web/api/v1/tasks.py index 1143e5f90..2970f9f43 100644 --- a/src/xagent/web/api/v1/tasks.py +++ b/src/xagent/web/api/v1/tasks.py @@ -95,8 +95,10 @@ async def create_chat_task( Returns: :class:`CreateTaskResponse` with the new ``task_id``, - ``agent_id``, ``status='pending'``, and ``created_at`` for the - caller to start polling from. + ``agent_id``, ``status='running'`` (the atomic claim inside + the handler flips the row from PENDING to RUNNING before the + response is sent), and ``created_at`` for the caller to + start polling from. Raises: V1ApiError 401: missing/invalid/revoked key (raised inside @@ -160,10 +162,18 @@ async def create_chat_task( except TaskTurnError: raise V1ApiError(V1ErrorCode.TASK_BUSY, 409) + # ``status=task.status.value`` (i.e. 'running'), not 'pending': + # ``begin_turn`` ran an atomic UPDATE that flipped the row to + # RUNNING and ``db.refresh(task)``'d the in-memory object before + # returning. Returning 'pending' would lie to the SDK client -- + # an immediately-following GET would see 'running' and the caller + # would have to reconcile two contradictory values from + # back-to-back calls. This matches the AppendMessageResponse + # contract below. return CreateTaskResponse( task_id=int(task.id), agent_id=int(agent.id), - status="pending", + status=task.status.value, created_at=task.created_at, ) @@ -311,16 +321,18 @@ async def append_message_to_task( # see a value that matches what they'd read from the DB directly # via GET /v1/chat/tasks/{id}, with no clock-skew between the two. # - # ``status='running'`` (not 'pending') because the atomic UPDATE - # above already flipped the row to RUNNING in the same transaction. - # Returning 'pending' here would lie to the SDK client: an - # immediately-following GET would see 'running' and the client - # would have to reconcile two contradictory values from - # back-to-back calls. + # ``status=task.status.value`` (i.e. 'running'), read from the + # refreshed in-memory row rather than hardcoded, mirrors the + # CreateTaskResponse contract above: the atomic UPDATE inside + # ``begin_turn`` flipped the row to RUNNING in the same + # transaction. Returning 'pending' here would lie to the SDK + # client -- an immediately-following GET would see 'running' and + # the caller would have to reconcile two contradictory values + # from back-to-back calls. return AppendMessageResponse( task_id=int(task.id), agent_id=int(agent.id), - status="running", + status=task.status.value, accepted_at=task.updated_at, ) diff --git a/src/xagent/web/api/websocket.py b/src/xagent/web/api/websocket.py index 7bc402795..b7a72a5a3 100644 --- a/src/xagent/web/api/websocket.py +++ b/src/xagent/web/api/websocket.py @@ -1442,9 +1442,18 @@ async def execute_task_background( else: task_updated.status = TaskStatus.FAILED sync_workforce_run_status(db_new, task_updated, task_updated.status) - db_new.commit() + # Do NOT commit the terminal status here. Leave it + # pending so the assistant-message persistence below + # commits it atomically: the task is marked terminal + # only once the turn is durably complete. If that write + # fails, the status stays RUNNING and the outer except + # surfaces a real failure -- instead of leaving a + # COMPLETED row with no assistant message. Control + # statuses (PAUSED / WAITING_FOR_USER) above commit + # themselves; they have no assistant message to persist. logger.info( - f"Updated task {task_id} status to {task_updated.status.value}" + f"Task {task_id} marked {task_updated.status.value} " + "(pending commit with assistant message)" ) else: logger.info( @@ -1482,6 +1491,17 @@ async def execute_task_background( if isinstance(chat_response, dict) else None, ) + # Commit the pending terminal status. ``persist_assistant_message`` + # commits internally when it writes a row, but it + # early-returns WITHOUT committing when the assistant + # content is empty (a valid empty-reply turn). This + # explicit commit lands the terminal status in that + # case too, so an empty successful turn stays COMPLETED + # rather than being left RUNNING (and later flipped to + # FAILED by finish_turn). If persistence raised, control + # never reaches here -- the status stays uncommitted and + # the outer except surfaces a real failure. + db_new.commit() # Materialize broadcast metadata into primitives BEFORE the # ``finally`` block closes ``db_new``. ``task_updated`` is @@ -1596,25 +1616,57 @@ async def execute_task_background( logger.info(f"Background task {task_id} execution completed") except Exception as e: - logger.error(f"Background task {task_id} execution failed: {e}", exc_info=True) - # Send error event + # The outer try also spans the post-terminal steps -- assistant + # message persistence and the completion / paused broadcasts -- + # that run *after* the task status was already committed terminal + # (COMPLETED above). ``_terminal_task_error_payload`` writes FAILED + # + the real error_message unconditionally, so gate it on the + # task's current status: only a task still RUNNING is a genuine + # execution failure. Otherwise a failed post-completion broadcast + # would rewrite an already-COMPLETED task as FAILED and store the + # broadcast error as the task's failure cause. + status_db = get_session_local()() try: - message = str(e) - await manager.broadcast_to_task( - { - **_terminal_task_error_payload( - task_id, - message, - event_type="task_error", - ), - "task_id": task_id, - "error": message, - "timestamp": datetime.now(timezone.utc).timestamp(), - }, - task_id, + current = status_db.query(Task).filter(Task.id == task_id).first() + still_running = current is not None and ( + current.status == TaskStatus.RUNNING + ) + finally: + status_db.close() + + if not still_running: + # Terminal state already committed; the exception came from a + # best-effort post-completion step. Observe it without touching + # the row or emitting a contradictory task_error. ``finish_turn`` + # still reconciles the terminal fields afterward. + logger.warning( + f"Background task {task_id} post-terminal step failed; " + f"task state left unchanged: {e}", + exc_info=True, + ) + else: + logger.error( + f"Background task {task_id} execution failed: {e}", exc_info=True ) - except Exception as broadcast_error: - logger.error(f"Failed to send error notification: {broadcast_error}") + # Genuine failure: _terminal_task_error_payload persists FAILED + # + the real error_message and builds the notification payload. + try: + message = str(e) + await manager.broadcast_to_task( + { + **_terminal_task_error_payload( + task_id, + message, + event_type="task_error", + ), + "task_id": task_id, + "error": message, + "timestamp": datetime.now(timezone.utc).timestamp(), + }, + task_id, + ) + except Exception as broadcast_error: + logger.error(f"Failed to send error notification: {broadcast_error}") except asyncio.CancelledError: logger.info(f"Background task {task_id} cancelled") raise diff --git a/src/xagent/web/schemas/v1.py b/src/xagent/web/schemas/v1.py index 9b215d808..7b53c6bd9 100644 --- a/src/xagent/web/schemas/v1.py +++ b/src/xagent/web/schemas/v1.py @@ -177,9 +177,10 @@ class V1TemplateDetail(V1TemplateSummary): class CreateTaskResponse(BaseModel): """``POST /v1/chat/tasks`` -> 202 Accepted response. - The task has been persisted and queued for background execution; - callers poll ``GET /v1/chat/tasks/{task_id}`` to observe the - transition pending -> running -> completed/failed. + The task has been persisted, claimed as RUNNING in the same + transaction, and queued for background execution; callers poll + ``GET /v1/chat/tasks/{task_id}`` to observe the transition + running -> completed/failed. """ task_id: int = Field(..., description="Newly created task primary key.") @@ -187,8 +188,10 @@ class CreateTaskResponse(BaseModel): status: str = Field( ..., description=( - "Initial status, always 'pending' in the 202 response. " - "Use GET /v1/chat/tasks/{task_id} to observe later transitions." + "Initial status, 'running' in the 202 response (the atomic " + "claim inside POST commits the status flip before the " + "response is sent). Use GET /v1/chat/tasks/{task_id} to " + "observe later transitions." ), ) created_at: datetime = Field(..., description="UTC creation timestamp.") @@ -230,7 +233,11 @@ class AppendMessageResponse(BaseModel): agent_id: int = Field(..., description="Agent the task is bound to.") status: str = Field( ..., - description="Initial status of the new turn, always 'pending'.", + description=( + "Initial status of the new turn, 'running' in the 202 " + "response (the atomic claim inside POST commits the status " + "flip before the response is sent)." + ), ) accepted_at: datetime = Field( ..., diff --git a/tests/web/api/v1/test_tasks.py b/tests/web/api/v1/test_tasks.py index c77f96623..dc8254330 100644 --- a/tests/web/api/v1/test_tasks.py +++ b/tests/web/api/v1/test_tasks.py @@ -113,7 +113,9 @@ def test_create_task_happy_path(mock_start_task): assert resp.status_code == 202, resp.text body = resp.json() assert body["agent_id"] == agent_id - assert body["status"] == "pending" + # POST atomically claims RUNNING before returning 202, so the + # response body reports the post-claim state, not 'pending'. + assert body["status"] == "running" assert "task_id" in body assert "created_at" in body task_id = body["task_id"] diff --git a/tests/web/test_websocket_uploaded_files_context.py b/tests/web/test_websocket_uploaded_files_context.py index 724b5913f..108a91406 100644 --- a/tests/web/test_websocket_uploaded_files_context.py +++ b/tests/web/test_websocket_uploaded_files_context.py @@ -203,6 +203,268 @@ def fake_release_current_runner_task_lease_with_workforce_sync( assert captured["agent_task"] == "重试" +class _NoopAgentService: + def set_conversation_history(self, history): + pass + + def set_execution_context_messages(self, messages): + pass + + def set_recovered_skill_context(self, skill_context): + pass + + +class _NoopBackgroundTaskManager: + async def wait_for_previous(self, task_id): + pass + + def cleanup_task(self, task_id): + pass + + +def _wire_execute_task_background(monkeypatch, db_session, manager): + """Wire execute_task_background's collaborators to the test session. + + ``get_session_local`` is pointed at the same engine as ``db_session`` + so the failure handler's status probe (and the terminal payload + writer, when not stubbed) read the row the test committed. + """ + + def fake_get_db(): + yield db_session + + def fake_release(db, task_id, *, status): + return True + + test_sessionmaker = sessionmaker(bind=db_session.get_bind()) + + monkeypatch.setattr( + websocket_api, "background_task_manager", _NoopBackgroundTaskManager() + ) + monkeypatch.setattr(websocket_api, "manager", manager) + monkeypatch.setattr( + websocket_api, + "release_current_runner_task_lease_with_workforce_sync", + fake_release, + ) + monkeypatch.setattr(websocket_api, "get_session_local", lambda: test_sessionmaker) + monkeypatch.setattr(database_models, "get_db", fake_get_db) + + def fake_persist_assistant_message(db, *args, **kwargs): + # The real helper commits the session; mirror that so the pending + # terminal status set by execute_task_background is durably landed + # (the status commit now rides on the assistant-message write). + db.commit() + + monkeypatch.setattr( + "xagent.web.services.chat_history_service.persist_assistant_message", + fake_persist_assistant_message, + ) + + +@pytest.mark.asyncio +async def test_completion_broadcast_failure_keeps_task_completed( + db_session, monkeypatch +): + """A failure in the post-completion broadcast must not be treated as an + execution failure: the already-COMPLETED row is left untouched, no + task_error is emitted, and the terminal failure writer is not invoked.""" + user = _create_user(db_session, 1, "owner") + _create_task(db_session, task_id=10, user_id=1, status=TaskStatus.RUNNING) + db_session.commit() + + sent_events: list[object] = [] + payload_calls: list[tuple] = [] + + class BroadcastManager: + async def broadcast_to_task(self, event, task_id): + etype = event.get("type") if isinstance(event, dict) else None + sent_events.append(etype) + if etype == "task_completed": + raise RuntimeError("websocket client disconnected") + + class AgentManager: + async def get_agent_for_task(self, task_id, db, **kwargs): + return _NoopAgentService() + + async def execute_task(self, **kwargs): + return {"success": True, "output": "ok", "file_outputs": []} + + def fake_payload(task_id, message, *, event_type="agent_error"): + payload_calls.append((task_id, message, event_type)) + return {"type": event_type, "task_id": task_id} + + _wire_execute_task_background(monkeypatch, db_session, BroadcastManager()) + monkeypatch.setattr(websocket_api, "_terminal_task_error_payload", fake_payload) + + await execute_task_background( + task_id=10, + user_message="hi", + context={}, + agent_manager=AgentManager(), + user_id=int(user.id), + llm_user_message="hi", + ) + + db_session.expire_all() + task = db_session.query(Task).filter(Task.id == 10).first() + assert task.status == TaskStatus.COMPLETED + assert task.error_message is None + assert "task_completed" in sent_events + assert "task_error" not in sent_events + assert payload_calls == [] + + +@pytest.mark.asyncio +async def test_execution_failure_routes_real_error_to_terminal_payload( + db_session, monkeypatch +): + """A genuine execution failure (task still RUNNING) routes the real + exception text through _terminal_task_error_payload (which persists + FAILED + error_message) and emits task_error.""" + user = _create_user(db_session, 1, "owner") + _create_task(db_session, task_id=11, user_id=1, status=TaskStatus.RUNNING) + db_session.commit() + + sent_events: list[object] = [] + payload_calls: list[tuple] = [] + + class BroadcastManager: + async def broadcast_to_task(self, event, task_id): + sent_events.append(event.get("type") if isinstance(event, dict) else None) + + class AgentManager: + async def get_agent_for_task(self, task_id, db, **kwargs): + return _NoopAgentService() + + async def execute_task(self, **kwargs): + raise RuntimeError("agent boom xyz") + + def fake_payload(task_id, message, *, event_type="agent_error"): + payload_calls.append((task_id, message, event_type)) + return {"type": event_type, "task_id": task_id} + + _wire_execute_task_background(monkeypatch, db_session, BroadcastManager()) + monkeypatch.setattr(websocket_api, "_terminal_task_error_payload", fake_payload) + + await execute_task_background( + task_id=11, + user_message="hi", + context={}, + agent_manager=AgentManager(), + user_id=int(user.id), + llm_user_message="hi", + ) + + assert payload_calls == [(11, "agent boom xyz", "task_error")] + assert "task_error" in sent_events + + +@pytest.mark.asyncio +async def test_assistant_persist_failure_surfaces_as_task_failure( + db_session, monkeypatch +): + """Assistant-message persistence is a durable write, not best-effort. + If it fails after a successful agent result, the terminal status must + not have been committed (it is pending until the message lands), so + the failure is surfaced through _terminal_task_error_payload rather + than leaving a COMPLETED row with no message.""" + user = _create_user(db_session, 1, "owner") + _create_task(db_session, task_id=12, user_id=1, status=TaskStatus.RUNNING) + db_session.commit() + + payload_calls: list[tuple] = [] + + class BroadcastManager: + async def broadcast_to_task(self, event, task_id): + pass + + class AgentManager: + async def get_agent_for_task(self, task_id, db, **kwargs): + return _NoopAgentService() + + async def execute_task(self, **kwargs): + return {"success": True, "output": "ok", "file_outputs": []} + + def boom(*args, **kwargs): + raise RuntimeError("durable persist failed") + + def fake_payload(task_id, message, *, event_type="agent_error"): + payload_calls.append((task_id, event_type)) + return {"type": event_type, "task_id": task_id} + + _wire_execute_task_background(monkeypatch, db_session, BroadcastManager()) + # Override the no-op persist with one that fails (durable write error). + monkeypatch.setattr( + "xagent.web.services.chat_history_service.persist_assistant_message", boom + ) + monkeypatch.setattr(websocket_api, "_terminal_task_error_payload", fake_payload) + + await execute_task_background( + task_id=12, + user_message="hi", + context={}, + agent_manager=AgentManager(), + user_id=int(user.id), + llm_user_message="hi", + ) + + # The COMPLETED status was never committed (pending until the message + # write that failed), so the status probe still sees RUNNING and the + # failure is routed to the terminal payload writer. + assert payload_calls == [(12, "task_error")] + + +@pytest.mark.asyncio +async def test_empty_reply_turn_still_completes(db_session, monkeypatch): + """An empty assistant reply makes persist_assistant_message + early-return WITHOUT committing. The explicit terminal commit must + still land COMPLETED, so a successful empty turn is not left RUNNING + (and later flipped to FAILED by finish_turn).""" + user = _create_user(db_session, 1, "owner") + _create_task(db_session, task_id=13, user_id=1, status=TaskStatus.RUNNING) + db_session.commit() + + payload_calls: list[tuple] = [] + + class BroadcastManager: + async def broadcast_to_task(self, event, task_id): + pass + + class AgentManager: + async def get_agent_for_task(self, task_id, db, **kwargs): + return _NoopAgentService() + + async def execute_task(self, **kwargs): + return {"success": True, "output": "ok", "file_outputs": []} + + def fake_payload(task_id, message, *, event_type="agent_error"): + payload_calls.append((task_id, event_type)) + return {"type": event_type, "task_id": task_id} + + _wire_execute_task_background(monkeypatch, db_session, BroadcastManager()) + # Empty-content path: persist early-returns None without committing. + monkeypatch.setattr( + "xagent.web.services.chat_history_service.persist_assistant_message", + lambda *args, **kwargs: None, + ) + monkeypatch.setattr(websocket_api, "_terminal_task_error_payload", fake_payload) + + await execute_task_background( + task_id=13, + user_message="hi", + context={}, + agent_manager=AgentManager(), + user_id=int(user.id), + llm_user_message="hi", + ) + + db_session.expire_all() + task = db_session.query(Task).filter(Task.id == 13).first() + assert task.status == TaskStatus.COMPLETED + assert payload_calls == [] + + def test_build_uploaded_files_context_includes_agent_builder_kb_instruction(): context = _build_uploaded_files_context( [