forked from docker/docker-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession_compaction.go
More file actions
112 lines (92 loc) · 3.29 KB
/
session_compaction.go
File metadata and controls
112 lines (92 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package runtime
import (
"context"
_ "embed"
"log/slog"
"time"
"github.com/docker/cagent/pkg/agent"
"github.com/docker/cagent/pkg/chat"
"github.com/docker/cagent/pkg/model/provider"
"github.com/docker/cagent/pkg/model/provider/options"
"github.com/docker/cagent/pkg/session"
"github.com/docker/cagent/pkg/team"
)
//go:embed prompts/compaction-system.txt
var compactionSystemPrompt string
//go:embed prompts/compaction-user.txt
var compactionUserPrompt string
type sessionCompactor struct {
model provider.Provider
sessionStore session.Store
}
func newSessionCompactor(model provider.Provider, sessionStore session.Store) *sessionCompactor {
return &sessionCompactor{
model: model,
sessionStore: sessionStore,
}
}
func (c *sessionCompactor) Compact(ctx context.Context, sess *session.Session, additionalPrompt string, events chan Event, agentName string) {
slog.Debug("Generating summary for session", "session_id", sess.ID)
events <- SessionCompaction(sess.ID, "started", agentName)
defer func() {
events <- SessionCompaction(sess.ID, "completed", agentName)
}()
summaryModel := provider.CloneWithOptions(ctx, c.model, options.WithStructuredOutput(nil))
root := agent.New("root", compactionSystemPrompt, agent.WithModel(summaryModel))
newTeam := team.New(team.WithAgents(root))
messages := sess.GetMessages(root)
if !hasConversationMessages(messages) {
events <- Warning("Session is empty. Start a conversation before compacting.", agentName)
return
}
summarySession := session.New()
summarySession.Title = "Generating summary..."
for _, msg := range messages {
summarySession.AddMessage(&session.Message{Message: msg})
}
prompt := compactionUserPrompt
if additionalPrompt != "" {
prompt += "\n\nAdditional instructions from user: " + additionalPrompt
}
summarySession.AddMessage(&session.Message{
Message: chat.Message{
Role: chat.MessageRoleUser,
Content: prompt,
CreatedAt: time.Now().Format(time.RFC3339),
},
})
summaryRuntime, err := New(newTeam, WithSessionCompaction(false))
if err != nil {
slog.Error("Failed to create summary generator runtime", "error", err)
events <- Error(err.Error())
return
}
_, err = summaryRuntime.Run(ctx, summarySession)
if err != nil {
slog.Error("Failed to generate session summary", "error", err)
events <- Error(err.Error())
return
}
summary := summarySession.GetLastAssistantMessageContent()
if summary == "" {
return
}
sess.Messages = append(sess.Messages, session.Item{Summary: summary})
// After compaction, the summary is the last item. GetMessages starts
// collecting conversation messages after the last summary, so there
// would be zero conversation messages. Providers (e.g. Anthropic)
// reject requests with no non-system messages, so we add a
// continuation message to bridge the gap.
sess.AddMessage(session.ImplicitUserMessage("The conversation was automatically compacted. Please continue where you left off."))
_ = c.sessionStore.UpdateSession(ctx, sess)
slog.Debug("Generated session summary", "session_id", sess.ID, "summary_length", len(summary))
events <- SessionSummary(sess.ID, summary, agentName)
}
func hasConversationMessages(messages []chat.Message) bool {
for _, msg := range messages {
if msg.Role != chat.MessageRoleSystem {
return true
}
}
return false
}