Skip to content

Commit e2b9cf8

Browse files
committed
fix(review): cancel-aware turn completion and stale-cancel HTTP status
Driver.streamTurn now checks ctx.Err() before completing a turn on ProviderTurnDone, mirroring the closed-channel guard. Both branches can be ready in the same select cycle (provider had queued done while handleToolCall was running and ctx was canceled); without the check the race could journal turn.completed for a canceled turn. Dispatcher.handleTurnCancel now returns ErrTurnNotRunning / ErrTurnMismatch sentinels (defined in agentserver), and writeResult maps them to 404 / 409 respectively. Stale cancels and mistyped turn IDs no longer surface as 500.
1 parent 5bc6248 commit e2b9cf8

4 files changed

Lines changed: 28 additions & 2 deletions

File tree

internal/agentdriver/dispatcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,15 +227,15 @@ func (d *Dispatcher) handleTurnCancel(c agentproto.TurnCancelCmd) (agentserver.D
227227
rt, ok := d.running[c.SessionID]
228228
d.mu.Unlock()
229229
if !ok {
230-
return agentserver.DispatchResult{}, fmt.Errorf("no running turn on session %s", c.SessionID)
230+
return agentserver.DispatchResult{}, fmt.Errorf("%w: %s", agentserver.ErrTurnNotRunning, c.SessionID)
231231
}
232232
// If the cancel names a specific turn, only cancel when it matches the
233233
// turn that is actually in flight. A late cancel for a turn that has
234234
// already completed must NOT abort a newer turn that started since.
235235
// An empty TurnID means "cancel whatever is running" — kept for
236236
// pre-turn-id clients; deprecate in M2c.
237237
if c.TurnID != "" && c.TurnID != rt.turnID {
238-
return agentserver.DispatchResult{}, fmt.Errorf("turn %s is not the running turn on session %s (running=%s)", c.TurnID, c.SessionID, rt.turnID)
238+
return agentserver.DispatchResult{}, fmt.Errorf("%w: turn %s on session %s (running=%s)", agentserver.ErrTurnMismatch, c.TurnID, c.SessionID, rt.turnID)
239239
}
240240
rt.cancel()
241241
return agentserver.DispatchResult{SessionID: c.SessionID}, nil

internal/agentdriver/driver.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ func (d *Driver) streamTurn(ctx context.Context, session agentcore.Session, turn
180180
return d.failTurn(session, turnID, fmt.Errorf("tool call: %w", err))
181181
}
182182
case ProviderTurnDone:
183+
// Both ctx.Done() and a buffered ProviderTurnDone can be
184+
// ready in the same select cycle (e.g. an operator cancels
185+
// while handleToolCall was running and the provider had
186+
// already queued its done event). If this branch wins the
187+
// race we would journal turn.completed for a canceled
188+
// turn. Mirror the closed-channel guard above.
189+
if cerr := ctx.Err(); cerr != nil {
190+
return d.failTurn(session, turnID, cerr)
191+
}
183192
session, err = flushText(session)
184193
if err != nil {
185194
return d.failTurn(session, turnID, fmt.Errorf("flush text: %w", err))

internal/agentserver/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ func (s *Server) writeResult(w http.ResponseWriter, res DispatchResult, err erro
281281
status = http.StatusNotFound
282282
case errors.Is(err, agentcore.ErrTurnAlreadyRunning):
283283
status = http.StatusConflict
284+
case errors.Is(err, ErrTurnNotRunning):
285+
status = http.StatusNotFound
286+
case errors.Is(err, ErrTurnMismatch):
287+
status = http.StatusConflict
284288
case errors.Is(err, agentcore.ErrPermissionNotFound):
285289
status = http.StatusNotFound
286290
case errors.Is(err, agentcore.ErrPermissionDecision):

internal/agentserver/store_dispatcher.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,19 @@ type StoreDispatcher struct {
2828
// envelope but cannot handle the operation themselves.
2929
var ErrUnsupportedCommand = errors.New("agentserver: dispatcher does not support this command")
3030

31+
// ErrTurnNotRunning is returned by Dispatchers when a turn.cancel arrives
32+
// for a session that has no in-flight turn. Stale cancels (the turn
33+
// already completed, or a client retry after the cancel succeeded) are an
34+
// expected client-visible state, not a server fault — transport maps this
35+
// to 404.
36+
var ErrTurnNotRunning = errors.New("agentserver: no running turn on session")
37+
38+
// ErrTurnMismatch is returned by Dispatchers when a turn.cancel names a
39+
// turn ID that is not the currently running turn on the session. The
40+
// session exists and a turn is running, but it's a different one — the
41+
// late cancel must not abort it. Mapped to 409.
42+
var ErrTurnMismatch = errors.New("agentserver: turn id does not match running turn")
43+
3144
// Dispatch routes the typed command. Errors are returned as-is; the
3245
// transport maps them to HTTP status codes.
3346
func (d *StoreDispatcher) Dispatch(ctx context.Context, cmd agentproto.RPCCommand) (DispatchResult, error) {

0 commit comments

Comments
 (0)