Skip to content

Commit 45f220b

Browse files
committed
Add Lock + Confirm/Cancel semantics to MessageQueue
Add Confirm and Cancel methods to the MessageQueue interface so that persistent queue implementations can use a transactional dequeue pattern: Dequeue locks a message (in-flight), Confirm permanently removes it after sess.AddMessage succeeds, Cancel releases it back to the queue on failure. This prevents message loss when the process crashes or the context is cancelled between dequeue and session persistence. The in-memory implementation treats Confirm/Cancel as no-ops since the message is already consumed from the channel on Dequeue. The agent loop now calls Confirm after successfully adding a follow-up message to the session. Drain (used for steer messages) auto-confirms all messages in a batch.
1 parent eea7d5b commit 45f220b

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

pkg/runtime/loop.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,9 +427,14 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
427427
// a new turn — the model sees them as fresh input, not a
428428
// mid-stream interruption. Each follow-up gets a full
429429
// undivided agent turn.
430+
//
431+
// Dequeue locks the message; Confirm is called after
432+
// AddMessage succeeds so persistent queue implementations
433+
// can safely re-queue on failure.
430434
if followUp, ok := r.DequeueFollowUp(ctx); ok {
431435
userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...)
432436
sess.AddMessage(userMsg)
437+
_ = r.followUpQueue.Confirm(ctx)
433438
events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1)
434439
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
435440
continue // re-enter the loop for a new turn

pkg/runtime/runtime.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,35 @@ type QueuedMessage struct {
9393
// is called from API handlers while Dequeue/Drain are called from the agent
9494
// loop goroutine.
9595
//
96+
// Dequeue uses a Lock + Confirm/Cancel pattern: Dequeue locks the next
97+
// message (making it invisible to subsequent Dequeue calls), Confirm
98+
// permanently removes it after the message has been successfully added to
99+
// the session, and Cancel releases it back to the queue if processing
100+
// fails. This prevents message loss in persistent queue implementations
101+
// when the process crashes or the context is cancelled between dequeue
102+
// and session persistence.
103+
//
96104
// The default implementation is NewInMemoryMessageQueue. Callers that need
97105
// durable or distributed storage can provide their own implementation
98106
// via the WithSteerQueue or WithFollowUpQueue options.
99107
type MessageQueue interface {
100108
// Enqueue adds a message to the queue. Returns false if the queue is
101109
// full or the context is cancelled.
102110
Enqueue(ctx context.Context, msg QueuedMessage) bool
103-
// Dequeue removes and returns the next message from the queue.
104-
// Returns the message and true, or a zero value and false if the
105-
// queue is empty. Must not block.
111+
// Dequeue locks and returns the next message from the queue. The
112+
// message is invisible to subsequent Dequeue calls until Confirm or
113+
// Cancel is called. Returns the message and true, or a zero value
114+
// and false if the queue is empty. Must not block.
106115
Dequeue(ctx context.Context) (QueuedMessage, bool)
107-
// Drain returns all pending messages and removes them from the queue.
116+
// Confirm permanently removes the most recently dequeued message.
117+
// Must be called after the message has been successfully persisted
118+
// to the session. For in-memory queues this is a no-op.
119+
Confirm(ctx context.Context) error
120+
// Cancel releases the most recently dequeued message back to the
121+
// queue. For in-memory queues this is a no-op (the message was
122+
// already consumed from the channel).
123+
Cancel(ctx context.Context) error
124+
// Drain locks, returns, and auto-confirms all pending messages.
108125
// Must not block — if the queue is empty it returns nil.
109126
Drain(ctx context.Context) []QueuedMessage
110127
// Len returns the current number of messages in the queue.
@@ -148,6 +165,14 @@ func (q *inMemoryMessageQueue) Dequeue(_ context.Context) (QueuedMessage, bool)
148165
}
149166
}
150167

168+
// Confirm is a no-op for in-memory queues — the message was already
169+
// removed from the channel on Dequeue.
170+
func (q *inMemoryMessageQueue) Confirm(_ context.Context) error { return nil }
171+
172+
// Cancel is a no-op for in-memory queues — the message cannot be put
173+
// back into a buffered channel without risking deadlock.
174+
func (q *inMemoryMessageQueue) Cancel(_ context.Context) error { return nil }
175+
151176
func (q *inMemoryMessageQueue) Drain(_ context.Context) []QueuedMessage {
152177
var msgs []QueuedMessage
153178
for {

0 commit comments

Comments
 (0)