Skip to content

Commit d41ec9c

Browse files
committed
feat(adk): dream improvements
1 parent 26bc5ca commit d41ec9c

9 files changed

Lines changed: 1942 additions & 444 deletions

File tree

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2026 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dream
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
"github.com/cloudwego/eino/adk"
26+
)
27+
28+
// This file holds the operations a caller invokes directly on a dream: starting a
29+
// run, querying its status, and canceling it. The scheduled middleware is created
30+
// with New in middleware.go.
31+
32+
// RunRequest parameterizes a manual dream invocation.
33+
type RunRequest struct {
34+
// SessionID identifies the current session.
35+
// Optional. When empty, Config.SessionID is used.
36+
SessionID string
37+
}
38+
39+
// Run executes a dream immediately, without schedule gating, and returns the job id.
40+
//
41+
// The job's lifecycle is observable through GetDreamStatus and cancelable through
42+
// CancelDream using the returned id (the in-process default store only supports
43+
// same-process queries; inject a shared store for cross-process visibility).
44+
//
45+
// Run acquires the per-memory-directory run lock so a manual dream does not write
46+
// concurrently with a scheduled run sharing the same store; if the lock is held, Run
47+
// returns an empty id and a nil error. The input memory directory is never modified:
48+
// the model edits a staged working copy that is promoted to the output directory on
49+
// success.
50+
func Run[M adk.MessageType](ctx context.Context, cfg *Config[M], req *RunRequest) (string, error) {
51+
cfg = cloneConfig(cfg)
52+
if err := applyCoreDefaults(ctx, cfg); err != nil {
53+
return "", err
54+
}
55+
m, err := newMiddleware(cfg, nil)
56+
if err != nil {
57+
return "", err
58+
}
59+
if req == nil {
60+
req = &RunRequest{}
61+
}
62+
sessionID := strings.TrimSpace(req.SessionID)
63+
if sessionID == "" {
64+
sessionID = strings.TrimSpace(cfg.SessionID)
65+
}
66+
67+
if store := m.store(); store != nil {
68+
unlock, ok, lockErr := store.AcquireLock(ctx, runLockKey(m.resolvedMemoryDir), m.lockTTL())
69+
if lockErr != nil || !ok {
70+
return "", lockErr
71+
}
72+
defer func() { _ = unlock(ctx) }()
73+
}
74+
75+
job := m.newJob(sessionID, nil)
76+
m.persistJob(ctx, job)
77+
if err := m.executeJob(ctx, job, sessionID, nil); err != nil {
78+
return job.ID, err
79+
}
80+
return job.ID, nil
81+
}
82+
83+
// GetDreamStatus returns the current DreamJob record for jobID. It returns (nil, nil)
84+
// when no such job exists (for example after the retention TTL elapsed).
85+
func GetDreamStatus(ctx context.Context, store KVStore, jobID string) (*DreamJob, error) {
86+
if store == nil {
87+
return nil, fmt.Errorf("dream: nil store")
88+
}
89+
return getJob(ctx, store, jobID)
90+
}
91+
92+
// CancelDream requests cancellation of a pending or running dream job. It marks the
93+
// job canceled in the store and signals any in-process run to abort. Canceling a job
94+
// that has already reached a terminal state is a no-op. Cross-process runs observe
95+
// the canceled status on their next iteration check and stop best-effort.
96+
func CancelDream(ctx context.Context, store KVStore, jobID string) error {
97+
if store == nil {
98+
return fmt.Errorf("dream: nil store")
99+
}
100+
job, err := getJob(ctx, store, jobID)
101+
if err != nil {
102+
return err
103+
}
104+
if job == nil {
105+
return fmt.Errorf("dream: job not found: %s", jobID)
106+
}
107+
if job.Status.IsTerminal() {
108+
return nil
109+
}
110+
job.Status = DreamStatusCanceled
111+
job.EndedAt = time.Now()
112+
if err := setJob(ctx, store, job, jobTTL); err != nil {
113+
return err
114+
}
115+
signalCancel(jobID)
116+
return nil
117+
}

adk/middlewares/automemory/dream/config.go

Lines changed: 140 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
* limitations under the License.
1515
*/
1616

17-
// Package dream provides scheduled consolidation middleware built on top of
18-
// automemory-managed session files.
1917
package dream
2018

2119
import (
@@ -24,41 +22,108 @@ import (
2422
"time"
2523

2624
"github.com/cloudwego/eino/adk"
25+
"github.com/cloudwego/eino/adk/filesystem"
2726
"github.com/cloudwego/eino/adk/middlewares/automemory"
2827
"github.com/cloudwego/eino/components/model"
2928
)
3029

3130
const (
32-
defaultMinInterval = 24 * time.Hour
33-
defaultMinTouchedSession = 5
34-
defaultScanInterval = 10 * time.Minute
35-
defaultLockTTL = time.Hour
31+
defaultMinInterval = 24 * time.Hour
32+
defaultMinTouchedSession = 5
33+
defaultScanInterval = 10 * time.Minute
34+
defaultLockTTL = time.Hour
35+
defaultMaxConsecutiveFailures = 3
36+
defaultMaxIterations = 24
3637
)
3738

38-
// OnError handles non-fatal dream errors.
39+
// errLocalKVStoreSingleProcess is surfaced through OnError when Store falls back to
40+
// the in-process default. It is a warning, not a fatal error: dreams still run, but
41+
// coordination (run lock, job status, scheduler touch counting) is not shared across
42+
// instances.
43+
var errLocalKVStoreSingleProcess = fmt.Errorf(
44+
"dream: Store not set, using in-process KVStore; this is single-process only, so run " +
45+
"locks and job status are not shared across instances and scheduled dreams will not " +
46+
"trigger when the middleware is constructed per session across instances " +
47+
"(inject a shared, durable KVStore in production)")
48+
49+
// errCountGateNeedsSessionID is surfaced through OnError when the scheduled count
50+
// gate is active (MinTouchedSession > 1) but no SessionID is configured. The touched
51+
// set is keyed by SessionID, so without one every run records the same empty member,
52+
// the distinct-session count is stuck at 1, and the gate never opens. It is a
53+
// warning, not a fatal error.
54+
var errCountGateNeedsSessionID = fmt.Errorf(
55+
"dream: MinTouchedSession > 1 but Config.SessionID is empty; touched sessions are " +
56+
"counted by SessionID, so the count stays at 1 and scheduled dreams will never " +
57+
"trigger (set a per-session SessionID, or set MinTouchedSession to 1 to gate on " +
58+
"MinInterval alone)")
59+
60+
// OnError handles non-fatal dream errors. The stage argument is one of the
61+
// OnErrorStage* constants identifying where the failure occurred.
3962
// Optional. Nil means ignore the error.
40-
type OnError func(ctx context.Context, stage string, err error)
63+
type OnError func(ctx context.Context, stage ErrorStage, err error)
4164

4265
// HandleIterator handles the dream sub-agent event stream.
4366
// Optional. Nil means dream drains the iterator itself.
4467
type HandleIterator[M adk.MessageType] func(ctx context.Context, iter *adk.AsyncIterator[*adk.TypedAgentEvent[M]]) error
4568

46-
// Config configures auto dream for both `New(...)` and `Run(...)`.
69+
// Config holds the fields a single dream run needs. It is the input to Run(...) and
70+
// is embedded by MiddlewareConfig for the scheduled path, so a one-shot caller never
71+
// sees the scheduler knobs.
4772
type Config[M adk.MessageType] struct {
4873
// MemoryDirectory is the memory root directory.
4974
// Required. Relative paths are resolved during init.
75+
// The dream run reads from this directory but never modifies it: the model
76+
// edits a working copy under StagingDirectory, which is then promoted to
77+
// OutputDirectory.
5078
MemoryDirectory string
5179

80+
// OutputDirectory is where consolidated memory is written when a run succeeds.
81+
// Optional. Default: MemoryDirectory (in-place consolidation).
82+
//
83+
// When equal to MemoryDirectory, the staged result is promoted over the source
84+
// after the run. The source is left untouched until promotion, so a failed or
85+
// canceled run never leaves the source half-processed. Promotion is a
86+
// best-effort copy (not atomic across files); a warning is reported via OnError.
87+
OutputDirectory string
88+
89+
// StagingDirectory is the root under which per-run working copies are created
90+
// (one subdirectory per job id). The model's writes are bounded to the staging
91+
// subdirectory; the source MemoryDirectory is copied in before the run.
92+
// Optional. Default: a directory under os.TempDir().
93+
StagingDirectory string
94+
5295
// MemoryBackend reads and updates memory files.
5396
// Required.
5497
MemoryBackend automemory.Backend
5598

99+
// Shell, when set, is used only to clean up the staging subdirectory after a
100+
// successful promotion (rm -rf).
101+
// Optional. When nil, the Shell is auto-derived from MemoryBackend if that value
102+
// also implements filesystem.Shell (the common case for sandbox/filesystem
103+
// backends that satisfy both interfaces in one struct), so it need not be
104+
// configured twice. Set this field only to override that, or to supply a Shell
105+
// when MemoryBackend is not one. When neither yields a Shell, staging directories
106+
// are left in place under StagingDirectory.
107+
Shell filesystem.Shell
108+
56109
// Model is the model used by the internal dream agent.
57110
// Required.
58111
Model model.BaseModel[M]
59112

113+
// MaxIterations caps the dream agent's tool-call loop.
114+
// Consolidation reads the index, skims and reads multiple topic files, then
115+
// writes/edits several files plus the index, so this needs headroom on large
116+
// memory directories.
117+
// Optional. Default: 24.
118+
MaxIterations int
119+
60120
// SessionID is the current logical session ID.
61121
// Optional. When empty, dream runs without cross-turn session grouping.
122+
//
123+
// For the scheduled middleware, the trigger counts distinct SessionIDs that have
124+
// touched the memory directory. A per-session SessionID is therefore required for
125+
// MinTouchedSession > 1 to be meaningful; without one the count stays at 1. New
126+
// warns through OnError when this is misconfigured.
62127
SessionID string
63128

64129
// OnError handles non-fatal runtime errors.
@@ -77,86 +142,115 @@ type Config[M adk.MessageType] struct {
77142
// - manual `Run(...)` searches the provided/current session only
78143
SessionStore adk.SessionEventStore[M]
79144

80-
// Schedule controls middleware-triggered runs only.
81-
// Optional. `Run(...)` ignores it.
82-
Schedule *ScheduleConfig
145+
// Store persists job records and the per-memory-directory run lock, enabling
146+
// GetDreamStatus/CancelDream and preventing concurrent writes to the same memory
147+
// directory. The scheduled path additionally uses it for touched sessions and
148+
// schedule state.
149+
// Optional. Default: in-process store from NewLocalKVStore (single-process only;
150+
// a warning is emitted through OnError when this default is used).
151+
//
152+
// See KVStore for why production deployments MUST inject a shared, durable store.
153+
Store KVStore
154+
155+
// LockTTL is the lease for the per-memory-directory run lock.
156+
// It must comfortably exceed the longest expected dream runtime: if a run
157+
// outlives the lease, another process may acquire the lock and write the same
158+
// memory directory concurrently.
159+
// Optional. Default: 1h.
160+
LockTTL time.Duration
83161

84162
// HandleIterator overrides iterator consumption.
85163
// Optional. Default: nil.
86164
HandleIterator HandleIterator[M]
87165
}
88166

89-
// ScheduleConfig controls middleware-triggered runs.
90-
type ScheduleConfig struct {
167+
// MiddlewareConfig configures the scheduled dream middleware created by New(...).
168+
// It embeds Config and adds the trigger knobs that only apply to automatic,
169+
// middleware-driven runs.
170+
type MiddlewareConfig[M adk.MessageType] struct {
171+
Config[M]
172+
91173
// MinInterval is the minimum interval between successful runs.
92174
// Optional. Default: 24h.
93175
MinInterval time.Duration
94176

95-
// MinTouchedSession is the minimum touched-session count before a run.
177+
// MinTouchedSession is the minimum number of distinct sessions (by Config.SessionID)
178+
// that must touch the memory directory before a run. Set it to 1 to gate on
179+
// MinInterval alone. Values > 1 require a per-session Config.SessionID.
96180
// Optional. Default: 5.
97181
MinTouchedSession int
98182

99183
// ScanInterval is the retry delay when the session threshold is not met.
100184
// Optional. Default: 10m.
101185
ScanInterval time.Duration
102186

103-
// LockTTL is the lease for the per-memory-directory run lock.
104-
// Optional. Default: 1h.
105-
LockTTL time.Duration
106-
107-
// Store persists touched sessions, schedule state, and run locks.
108-
// Optional. Default: in-process `LocalStore`.
109-
Store Store
187+
// MaxConsecutiveFailures caps how many times a failing run is retried against
188+
// the same unconsolidated window before the window is advanced to avoid
189+
// replaying the same sessions forever.
190+
// Optional. Default: 3.
191+
MaxConsecutiveFailures int
110192

111193
// RunInline runs triggered dreams in the `AfterAgent` call path.
112194
// Optional. Default: false.
113195
RunInline bool
114196
}
115197

116-
func applyCoreDefaults[M adk.MessageType](cfg *Config[M]) error {
198+
// scheduleParams carries the resolved scheduler knobs onto the middleware. It is nil
199+
// for a Run(...)-only construction, which never triggers on a schedule.
200+
type scheduleParams struct {
201+
minInterval time.Duration
202+
minTouchedSession int
203+
scanInterval time.Duration
204+
maxConsecutiveFailures int
205+
runInline bool
206+
}
207+
208+
func applyCoreDefaults[M adk.MessageType](ctx context.Context, cfg *Config[M]) error {
117209
if cfg == nil {
118210
return fmt.Errorf("auto dream config: nil")
119211
}
120212
if cfg.MemoryDirectory == "" || cfg.MemoryBackend == nil || cfg.Model == nil {
121213
return fmt.Errorf("auto dream config: invalid")
122214
}
215+
if cfg.LockTTL <= 0 {
216+
cfg.LockTTL = defaultLockTTL
217+
}
218+
if cfg.Store == nil {
219+
cfg.Store = NewLocalKVStore()
220+
if cfg.OnError != nil {
221+
cfg.OnError(ctx, OnErrorStageInit, errLocalKVStoreSingleProcess)
222+
}
223+
}
123224
return nil
124225
}
125226

126227
func cloneConfig[M adk.MessageType](cfg *Config[M]) *Config[M] {
127228
if cfg == nil {
128229
return nil
129230
}
130-
131231
cp := *cfg
132-
if cfg.Schedule != nil {
133-
scheduleCopy := *cfg.Schedule
134-
cp.Schedule = &scheduleCopy
135-
}
136232
return &cp
137233
}
138234

139-
func applyScheduleDefaults[M adk.MessageType](cfg *Config[M]) error {
140-
if err := applyCoreDefaults(cfg); err != nil {
141-
return err
142-
}
143-
if cfg.Schedule == nil {
144-
cfg.Schedule = &ScheduleConfig{}
145-
}
146-
if cfg.Schedule.MinInterval <= 0 {
147-
cfg.Schedule.MinInterval = defaultMinInterval
235+
func cloneMiddlewareConfig[M adk.MessageType](cfg *MiddlewareConfig[M]) *MiddlewareConfig[M] {
236+
if cfg == nil {
237+
return nil
148238
}
149-
if cfg.Schedule.MinTouchedSession <= 0 {
150-
cfg.Schedule.MinTouchedSession = defaultMinTouchedSession
239+
cp := *cfg
240+
return &cp
241+
}
242+
243+
func applyScheduleDefaults[M adk.MessageType](cfg *MiddlewareConfig[M]) {
244+
if cfg.MinInterval <= 0 {
245+
cfg.MinInterval = defaultMinInterval
151246
}
152-
if cfg.Schedule.ScanInterval <= 0 {
153-
cfg.Schedule.ScanInterval = defaultScanInterval
247+
if cfg.MinTouchedSession <= 0 {
248+
cfg.MinTouchedSession = defaultMinTouchedSession
154249
}
155-
if cfg.Schedule.LockTTL <= 0 {
156-
cfg.Schedule.LockTTL = defaultLockTTL
250+
if cfg.ScanInterval <= 0 {
251+
cfg.ScanInterval = defaultScanInterval
157252
}
158-
if cfg.Schedule.Store == nil {
159-
cfg.Schedule.Store = NewLocalStore()
253+
if cfg.MaxConsecutiveFailures <= 0 {
254+
cfg.MaxConsecutiveFailures = defaultMaxConsecutiveFailures
160255
}
161-
return nil
162256
}

0 commit comments

Comments
 (0)