Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,17 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c

if m != nil && r.sessionCompaction {
if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) {
messageCountBefore := len(sess.Messages)
r.Summarize(ctx, sess, "", events)
events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost)

// Reset token counters after successful compaction so the check
// doesn't trigger again on the next iteration. Compact appends
// items on success, so a changed message count indicates success.
if len(sess.Messages) > messageCountBefore {
Comment thread
dgageot marked this conversation as resolved.
sess.InputTokens = 0
sess.OutputTokens = 0
}
}
}

Expand Down
212 changes: 212 additions & 0 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -75,6 +76,18 @@ func (m *mockStream) Recv() (chat.MessageStreamResponse, error) {

func (m *mockStream) Close() { m.closed = true }

// errorStream always returns an error on Recv, simulating a failed model call.
type errorStream struct {
err error
closed bool
}

func (e *errorStream) Recv() (chat.MessageStreamResponse, error) {
return chat.MessageStreamResponse{}, e.err
}

func (e *errorStream) Close() { e.closed = true }

type streamBuilder struct{ responses []chat.MessageStreamResponse }

func newStreamBuilder() *streamBuilder {
Expand Down Expand Up @@ -725,6 +738,205 @@ func TestCompaction(t *testing.T) {
require.NotEqual(t, -1, compactionStartIdx, "expected a SessionCompaction start event")
}

// capturingQueueProvider extends queueProvider to also capture the messages
// passed to each CreateChatCompletionStream call.
type capturingQueueProvider struct {
queueProvider
calls [][]chat.Message // messages sent on each call
}

func (p *capturingQueueProvider) CreateChatCompletionStream(_ context.Context, msgs []chat.Message, _ []tools.Tool) (chat.MessageStream, error) {
p.mu.Lock()
defer p.mu.Unlock()
p.calls = append(p.calls, msgs)
if len(p.streams) == 0 {
return &mockStream{}, nil
}
s := p.streams[0]
p.streams = p.streams[1:]
return s, nil
}

func TestCompaction_ContinuationMessageSent(t *testing.T) {
// After auto-compaction, the runtime must inject a continuation user
// message so that the model receives at least one non-system message.
// This prevents providers (e.g. Anthropic) from rejecting the request
// with "messages: Field required".

// Stream 1: initial response that pushes usage above 90% of context.
mainStream := newStreamBuilder().
AddContent("Hello there").
AddStopWithUsage(101, 0). // will exceed 90% of 100
Build()

// Stream 2: summary generation (used by the compactor runtime).
summaryStream := newStreamBuilder().
AddContent("summary of conversation").
AddStopWithUsage(1, 1).
Build()

// Stream 3: the model call that happens after compaction.
postCompactionStream := newStreamBuilder().
AddContent("I'll continue.").
AddStopWithUsage(5, 3).
Build()

prov := &capturingQueueProvider{
queueProvider: queueProvider{
id: "test/mock-model",
streams: []chat.MessageStream{mainStream, summaryStream, postCompactionStream},
},
}

root := agent.New("root", "You are a test agent", agent.WithModel(prov))
tm := team.New(team.WithAgents(root))

rt, err := NewLocalRuntime(tm, WithSessionCompaction(true), WithModelStore(mockModelStoreWithLimit{limit: 100}))
require.NoError(t, err)

// First RunStream: establishes baseline usage.
sess := session.New(session.WithUserMessage("Start"))
for range rt.RunStream(t.Context(), sess) {
}

// Second RunStream: will trigger compaction because usage > 90%.
sess.AddMessage(session.UserMessage("Again"))
for range rt.RunStream(t.Context(), sess) {
}

// The third model call (stream 3 = index 2 from the provider's perspective,
// but calls[1] because first RunStream used calls[0]) should contain
// a user message with the continuation prompt.
require.GreaterOrEqual(t, len(prov.calls), 2, "expected at least 2 model calls in second RunStream")

// The last captured call is the post-compaction model invocation.
lastCallMsgs := prov.calls[len(prov.calls)-1]

// Find the continuation user message.
var foundContinuation bool
for _, msg := range lastCallMsgs {
if msg.Role == chat.MessageRoleUser && msg.Content == "The conversation was automatically compacted. Please continue where you left off." {
foundContinuation = true
break
}
}
require.True(t, foundContinuation, "expected continuation user message after compaction; messages: %v", lastCallMsgs)

// Also check the summary is in the system messages.
var foundSummary bool
for _, msg := range lastCallMsgs {
if msg.Role == chat.MessageRoleSystem && strings.Contains(msg.Content, "summary of conversation") {
foundSummary = true
break
}
}
require.True(t, foundSummary, "expected session summary in system messages after compaction")
}

func TestCompaction_TokenCountersResetAfterSuccess(t *testing.T) {
// After successful compaction the token counters must be reset to zero
// to prevent the compaction check from triggering again immediately.

mainStream := newStreamBuilder().
AddContent("Hello").
AddStopWithUsage(101, 0).
Build()

summaryStream := newStreamBuilder().
AddContent("summary").
AddStopWithUsage(1, 1).
Build()

// Post-compaction stream with LOW usage.
postCompactionStream := newStreamBuilder().
AddContent("Continuing").
AddStopWithUsage(10, 5).
Build()

prov := &queueProvider{
id: "test/mock-model",
streams: []chat.MessageStream{mainStream, summaryStream, postCompactionStream},
}

root := agent.New("root", "You are a test agent", agent.WithModel(prov))
tm := team.New(team.WithAgents(root))

rt, err := NewLocalRuntime(tm, WithSessionCompaction(true), WithModelStore(mockModelStoreWithLimit{limit: 100}))
require.NoError(t, err)

sess := session.New(session.WithUserMessage("Start"))
for range rt.RunStream(t.Context(), sess) {
}

sess.AddMessage(session.UserMessage("Again"))

var events []Event
for ev := range rt.RunStream(t.Context(), sess) {
events = append(events, ev)
}

// Count how many times compaction started — should be exactly once.
var compactionCount int
for _, ev := range events {
if e, ok := ev.(*SessionCompactionEvent); ok && e.Status == "started" {
compactionCount++
}
}
require.Equal(t, 1, compactionCount, "compaction should trigger exactly once, not loop")

// Token counters should reflect the post-compaction model call, not the old values.
assert.Equal(t, int64(10), sess.InputTokens, "InputTokens should be from post-compaction call")
assert.Equal(t, int64(5), sess.OutputTokens, "OutputTokens should be from post-compaction call")
}

func TestCompaction_FailedCompactionNoStrayMessage(t *testing.T) {
// When compaction fails (summary runtime errors), no continuation
// message should be added and the original messages should be preserved.

// Stream 1: initial response with high usage.
mainStream := newStreamBuilder().
AddContent("Hello").
AddStopWithUsage(101, 0).
Build()

// Stream 2: summary generation FAILS — the stream returns an error.
failingStream := &errorStream{err: fmt.Errorf("simulated API error during summary")}

prov := &queueProvider{
id: "test/mock-model",
streams: []chat.MessageStream{mainStream, failingStream},
}

root := agent.New("root", "You are a test agent", agent.WithModel(prov))
tm := team.New(team.WithAgents(root))

rt, err := NewLocalRuntime(tm, WithSessionCompaction(true), WithModelStore(mockModelStoreWithLimit{limit: 100}))
require.NoError(t, err)

sess := session.New(session.WithUserMessage("Start"))
for range rt.RunStream(t.Context(), sess) {
}

sess.AddMessage(session.UserMessage("Again"))

for range rt.RunStream(t.Context(), sess) {
}

// No summary was appended, so no continuation message should exist.
for _, item := range sess.Messages {
if item.IsMessage() && item.Message.Message.Content == "The conversation was automatically compacted. Please continue where you left off." {
t.Fatal("found stray continuation message after failed compaction")
}
}

// No summary items should exist.
for _, item := range sess.Messages {
if item.Summary != "" {
t.Fatal("found summary item after failed compaction")
}
}
}

func TestSessionWithoutUserMessage(t *testing.T) {
stream := newStreamBuilder().AddContent("OK").AddStopWithUsage(1, 1).Build()

Expand Down
8 changes: 8 additions & 0 deletions pkg/runtime/session_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ func (c *sessionCompactor) Compact(ctx context.Context, sess *session.Session, a
}

sess.Messages = append(sess.Messages, session.Item{Summary: summary})

// After compaction, the summary is the last item. GetMessages starts
// collecting conversation messages after the last summary, so there
// would be zero conversation messages. Providers (e.g. Anthropic)
// reject requests with no non-system messages, so we add a
// continuation message to bridge the gap.
sess.AddMessage(session.ImplicitUserMessage("The conversation was automatically compacted. Please continue where you left off."))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually the issue, we are putting the session summary as a system prompt, but it should be a user message https://github.com/docker/cagent/blob/main/pkg/session/session.go#L602


_ = c.sessionStore.UpdateSession(ctx, sess)

slog.Debug("Generated session summary", "session_id", sess.ID, "summary_length", len(summary))
Expand Down