Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/serve_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ func (s *serveServer) handleSessionInterrupt(w http.ResponseWriter, r *http.Requ
if fastErr != nil {
log.Printf("[serve] fast provider unavailable for interrupt: %v", fastErr)
}
action, interruptErr := rt.InterruptMessage(r.Context(), msg, displayText, strings.TrimSpace(req.InterjectionID), fastProvider)
action, interruptErr := rt.InterruptMessage(r.Context(), msg, displayText, strings.TrimSpace(req.InterjectionID), fastProvider, false)
if interruptErr != nil {
writeOpenAIError(w, http.StatusConflict, "conflict_error", interruptErr.Error())
return
Expand Down
35 changes: 34 additions & 1 deletion cmd/serve_jobs_v2_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -172,10 +173,13 @@ func (s *serveServer) notifyQueuedAgentWeb(ctx context.Context, runID, sessionID
// originating session is already generating. Pass no classifier
// provider so this best-effort notice cannot be upgraded into a
// cancel/interrupt decision for the user's active run.
if _, err := rt.InterruptMessage(ctx, llm.UserText(message), message, "job_notify_"+strings.TrimSpace(runID), nil); err == nil {
if _, err := rt.InterruptMessage(ctx, llm.UserText(message), message, "job_notify_"+strings.TrimSpace(runID), nil, true); err == nil {
return nil
}
}
if s.startQueuedAgentNotificationContinuation(ctx, rt, sessionID, message) {
return nil
}
if err := rt.appendNotificationMessage(ctx, sessionID, message); err == nil {
return nil
}
Expand All @@ -187,6 +191,35 @@ func (s *serveServer) notifyQueuedAgentWeb(ctx context.Context, runID, sessionID
return appendQueuedAgentNotificationToStore(ctx, s.store, sessionID, message)
}

func (s *serveServer) startQueuedAgentNotificationContinuation(ctx context.Context, rt *serveRuntime, sessionID, message string) bool {
if s == nil || rt == nil || strings.TrimSpace(sessionID) == "" || strings.TrimSpace(message) == "" {
return false
}
if rt.hasActiveRun() {
return false
}
model := strings.TrimSpace(rt.defaultModel)
previousResponseID := strings.TrimSpace(rt.getLastResponseID())
if previousResponseID == "" {
previousResponseID = s.latestDurableResponseIDForSession(ctx, sessionID)
}
run, err := s.startResponseRun(rt, true, false, []llm.Message{llm.UserText(message)}, llm.Request{
SessionID: sessionID,
Model: model,
}, sessionID, startResponseRunOptions{
previousResponseID: previousResponseID,
uiSession: true,
})
if err != nil {
log.Printf("[jobs-v2] queued agent notification continuation failed for session %s: %v", sessionID, err)
return false
}
if run == nil {
return false
}
return true
}

func (rt *serveRuntime) appendNotificationMessage(ctx context.Context, sessionID, message string) error {
if rt == nil || strings.TrimSpace(message) == "" {
return nil
Expand Down
54 changes: 54 additions & 0 deletions cmd/serve_jobs_v2_notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/samsaffron/term-llm/internal/llm"
"github.com/samsaffron/term-llm/internal/session"
Expand Down Expand Up @@ -209,6 +210,59 @@ func TestJobsV2NotifyWhenDoneAppendsWebNotificationToIdleSession(t *testing.T) {
}
}

func TestJobsV2NotifyWhenDoneContinuesLoadedIdleWebSession(t *testing.T) {
store := newServeRuntimeTestStore()
provider := llm.NewMockProvider("mock").AddTextResponse("I saw the queued job finish.")
rt := &serveRuntime{
provider: provider,
providerKey: "mock",
engine: llm.NewEngine(provider, nil),
defaultModel: "mock-model",
store: store,
platform: "web",
}
mgr := newServeSessionManager(time.Minute, 10, func(context.Context) (*serveRuntime, error) {
return rt, nil
})
defer mgr.Close()
if _, err := mgr.GetOrCreate(context.Background(), "sess-origin"); err != nil {
t.Fatalf("GetOrCreate: %v", err)
}
srv := &serveServer{
store: store,
sessionMgr: mgr,
responseRuns: newServeResponseRunManager(),
}
defer srv.responseRuns.Close()

message := "Queued job job_123 (developer) succeeded: hello world"
if err := srv.notifyQueuedAgentWeb(context.Background(), "run_123", "sess-origin", message); err != nil {
t.Fatalf("notifyQueuedAgentWeb: %v", err)
}

waitForServeCondition(t, 2*time.Second, func() bool {
store.mu.Lock()
defer store.mu.Unlock()
return len(store.messages["sess-origin"]) >= 2
}, "notification continuation to persist user notice and assistant response")

store.mu.Lock()
msgs := append([]session.Message(nil), store.messages["sess-origin"]...)
store.mu.Unlock()
if len(msgs) != 2 {
t.Fatalf("messages = %d, want 2: %#v", len(msgs), msgs)
}
if msgs[0].Role != llm.RoleUser || !strings.Contains(msgs[0].TextContent, message) {
t.Fatalf("first message = (%s, %q), want user notification", msgs[0].Role, msgs[0].TextContent)
}
if msgs[1].Role != llm.RoleAssistant || !strings.Contains(msgs[1].TextContent, "I saw the queued job finish") {
t.Fatalf("second message = (%s, %q), want assistant continuation", msgs[1].Role, msgs[1].TextContent)
}
if provider.CurrentTurn() != 1 {
t.Fatalf("provider turns = %d, want 1", provider.CurrentTurn())
}
}

func TestJobsV2NotifyFailureDoesNotChangeRunStatus(t *testing.T) {
mgr, err := newJobsV2ManagerWithNotifier(":memory:", 0, nil, func(context.Context, jobsV2Run, jobsV2Job, jobsV2RunStatus, jobsV2RunResult, string, bool, string) error {
return errors.New("notify failed")
Expand Down
6 changes: 3 additions & 3 deletions cmd/serve_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ func (rt *serveRuntime) updateInterruptFromEvent(ev llm.Event) {
}

func (rt *serveRuntime) Interrupt(ctx context.Context, msg string, fastProvider llm.Provider) (llm.InterruptAction, error) {
return rt.InterruptMessage(ctx, llm.UserText(msg), msg, "", fastProvider)
return rt.InterruptMessage(ctx, llm.UserText(msg), msg, "", fastProvider, false)
}

func (rt *serveRuntime) InterruptMessage(ctx context.Context, msg llm.Message, displayText string, interjectionID string, fastProvider llm.Provider) (llm.InterruptAction, error) {
func (rt *serveRuntime) InterruptMessage(ctx context.Context, msg llm.Message, displayText string, interjectionID string, fastProvider llm.Provider, autoContinue bool) (llm.InterruptAction, error) {
rt.interruptMu.Lock()
state := rt.activeInterrupt
if state == nil {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (rt *serveRuntime) InterruptMessage(ctx context.Context, msg llm.Message, d
cancel()
}
case llm.InterruptInterject:
rt.engine.QueueInterjection(llm.QueuedInterjection{ID: interjectionID, Message: msg, DisplayText: displayText})
rt.engine.QueueInterjection(llm.QueuedInterjection{ID: interjectionID, Message: msg, DisplayText: displayText, AutoContinue: autoContinue})
}
return action, nil
}
Expand Down
89 changes: 84 additions & 5 deletions internal/llm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ const (
// QueuedInterjection is a structured user message submitted while a run is active.
// Queued entries are cancellable until the engine drains them into a provider turn.
type QueuedInterjection struct {
ID string
Message Message
DisplayText string
Status InterjectionStatus
ID string
Message Message
DisplayText string
Status InterjectionStatus
AutoContinue bool // If true, drain at a text-only turn boundary and continue the run.
}

type queuedInterjection = QueuedInterjection
Expand Down Expand Up @@ -711,6 +712,66 @@ func (e *Engine) drainInterjections() []queuedInterjection {
return out
}

// drainAutoContinueInterjections atomically commits the queued interjection
// prefix marked AutoContinue. It intentionally preserves FIFO order: a normal
// pending user interjection blocks later auto-continue notifications from being
// injected ahead of it.
func (e *Engine) drainAutoContinueInterjections() []queuedInterjection {
e.callbackMu.Lock()
defer e.callbackMu.Unlock()

if len(e.pendingInterjections) == 0 || !e.pendingInterjections[0].AutoContinue {
return nil
}
n := 0
for n < len(e.pendingInterjections) && e.pendingInterjections[n].AutoContinue {
n++
}
out := make([]queuedInterjection, n)
copy(out, e.pendingInterjections[:n])
for i := range out {
out[i].Status = InterjectionCommitted
}
copy(e.pendingInterjections, e.pendingInterjections[n:])
for i := len(e.pendingInterjections) - n; i < len(e.pendingInterjections); i++ {
e.pendingInterjections[i] = queuedInterjection{}
}
e.pendingInterjections = e.pendingInterjections[:len(e.pendingInterjections)-n]
return out
}

func (e *Engine) continueWithAutoInterjections(ctx context.Context, send eventSender, req *Request, turnCallback TurnCompletedCallback, attempt int, finalMsg Message) (bool, error) {
interjections := e.drainAutoContinueInterjections()
if len(interjections) == 0 {
return false, nil
}
if len(finalMsg.Parts) > 0 {
req.Messages = append(req.Messages, finalMsg)
}
interjectionMsgs := make([]Message, 0, len(interjections))
for _, interjection := range interjections {
interjectionMsg := interjection.Message
interjectionMsg.Role = RoleUser
req.Messages = append(req.Messages, interjectionMsg)
interjectionMsgs = append(interjectionMsgs, interjectionMsg)
}
if turnCallback != nil {
cbCtx, cancel := callbackContext(ctx)
_ = turnCallback(cbCtx, attempt, interjectionMsgs, TurnMetrics{})
cancel()
}
for _, interjection := range interjections {
text := interjection.DisplayText
if text == "" {
text = MessageText(interjection.Message)
}
if err := send.Send(Event{Type: EventInterjection, Text: text, InterjectionID: interjection.ID, Message: interjection.Message, InterjectionStatus: InterjectionCommitted}); err != nil {
return false, err
}
}
return true, nil
}

// applyToolOutputTruncation applies global and compaction truncation limits
// to tool output content. Global limit fires first (typically stricter),
// then compaction limit as a secondary safety net.
Expand Down Expand Up @@ -2131,8 +2192,9 @@ turnLoop:
// Call turnCallback with final text-only response (no tools)
// Note: responseCallback is NOT called here because no tool execution follows.
// responseCallback is only for persisting assistant messages before tool execution.
var finalMsg Message
if textBuilder.Len() > 0 || reasoningBuilder.Len() > 0 || len(reasoningSummaryParts) > 0 || reasoningItemID != "" || reasoningEncryptedContent != "" {
finalMsg := buildAssistantMessageWithReasoningMetadata(
finalMsg = buildAssistantMessageWithReasoningMetadata(
textBuilder.String(),
nil,
reasoningBuilder.String(),
Expand Down Expand Up @@ -2188,6 +2250,19 @@ turnLoop:
attempt--
continue
}
// Auto-continue interjections are completion notices from background work.
// Unlike ordinary user interjections during prose, they should be handed to
// the provider immediately so the active web run can acknowledge them rather
// than leaving the session stopped with a pending notice.
if attempt < maxTurns-1 {
continued, err := e.continueWithAutoInterjections(ctx, send, &req, turnCallback, attempt, finalMsg)
if err != nil {
return err
}
if continued {
continue
}
}
if err := send.Send(Event{Type: EventDone}); err != nil {
return err
}
Expand Down Expand Up @@ -2310,6 +2385,10 @@ turnLoop:
cancel()
}
}
// Auto-continue interjections are completion notices from background work.
// Unlike ordinary user interjections during prose, they should be handed to
// the provider immediately so the active web run can acknowledge them rather
// than leaving the session stopped with a pending notice.
if err := send.Send(Event{Type: EventDone}); err != nil {
return err
}
Expand Down
Loading
Loading