1414 * limitations under the License.
1515 */
1616
17- // Package dream provides scheduled consolidation middleware built on top of
18- // automemory-managed session files.
1917package dream
2018
2119import (
@@ -24,41 +22,108 @@ import (
2422 "time"
2523
2624 "github.com/cloudwego/eino/adk"
25+ "github.com/cloudwego/eino/adk/filesystem"
2726 "github.com/cloudwego/eino/adk/middlewares/automemory"
2827 "github.com/cloudwego/eino/components/model"
2928)
3029
3130const (
32- defaultMinInterval = 24 * time .Hour
33- defaultMinTouchedSession = 5
34- defaultScanInterval = 10 * time .Minute
35- defaultLockTTL = time .Hour
31+ defaultMinInterval = 24 * time .Hour
32+ defaultMinTouchedSession = 5
33+ defaultScanInterval = 10 * time .Minute
34+ defaultLockTTL = time .Hour
35+ defaultMaxConsecutiveFailures = 3
36+ defaultMaxIterations = 24
3637)
3738
38- // OnError handles non-fatal dream errors.
39+ // errLocalKVStoreSingleProcess is surfaced through OnError when Store falls back to
40+ // the in-process default. It is a warning, not a fatal error: dreams still run, but
41+ // coordination (run lock, job status, scheduler touch counting) is not shared across
42+ // instances.
43+ var errLocalKVStoreSingleProcess = fmt .Errorf (
44+ "dream: Store not set, using in-process KVStore; this is single-process only, so run " +
45+ "locks and job status are not shared across instances and scheduled dreams will not " +
46+ "trigger when the middleware is constructed per session across instances " +
47+ "(inject a shared, durable KVStore in production)" )
48+
49+ // errCountGateNeedsSessionID is surfaced through OnError when the scheduled count
50+ // gate is active (MinTouchedSession > 1) but no SessionID is configured. The touched
51+ // set is keyed by SessionID, so without one every run records the same empty member,
52+ // the distinct-session count is stuck at 1, and the gate never opens. It is a
53+ // warning, not a fatal error.
54+ var errCountGateNeedsSessionID = fmt .Errorf (
55+ "dream: MinTouchedSession > 1 but Config.SessionID is empty; touched sessions are " +
56+ "counted by SessionID, so the count stays at 1 and scheduled dreams will never " +
57+ "trigger (set a per-session SessionID, or set MinTouchedSession to 1 to gate on " +
58+ "MinInterval alone)" )
59+
60+ // OnError handles non-fatal dream errors. The stage argument is one of the
61+ // OnErrorStage* constants identifying where the failure occurred.
3962// Optional. Nil means ignore the error.
40- type OnError func (ctx context.Context , stage string , err error )
63+ type OnError func (ctx context.Context , stage ErrorStage , err error )
4164
4265// HandleIterator handles the dream sub-agent event stream.
4366// Optional. Nil means dream drains the iterator itself.
4467type HandleIterator [M adk.MessageType ] func (ctx context.Context , iter * adk.AsyncIterator [* adk.TypedAgentEvent [M ]]) error
4568
46- // Config configures auto dream for both `New(...)` and `Run(...)`.
69+ // Config holds the fields a single dream run needs. It is the input to Run(...) and
70+ // is embedded by MiddlewareConfig for the scheduled path, so a one-shot caller never
71+ // sees the scheduler knobs.
4772type Config [M adk.MessageType ] struct {
4873 // MemoryDirectory is the memory root directory.
4974 // Required. Relative paths are resolved during init.
75+ // The dream run reads from this directory but never modifies it: the model
76+ // edits a working copy under StagingDirectory, which is then promoted to
77+ // OutputDirectory.
5078 MemoryDirectory string
5179
80+ // OutputDirectory is where consolidated memory is written when a run succeeds.
81+ // Optional. Default: MemoryDirectory (in-place consolidation).
82+ //
83+ // When equal to MemoryDirectory, the staged result is promoted over the source
84+ // after the run. The source is left untouched until promotion, so a failed or
85+ // canceled run never leaves the source half-processed. Promotion is a
86+ // best-effort copy (not atomic across files); a warning is reported via OnError.
87+ OutputDirectory string
88+
89+ // StagingDirectory is the root under which per-run working copies are created
90+ // (one subdirectory per job id). The model's writes are bounded to the staging
91+ // subdirectory; the source MemoryDirectory is copied in before the run.
92+ // Optional. Default: a directory under os.TempDir().
93+ StagingDirectory string
94+
5295 // MemoryBackend reads and updates memory files.
5396 // Required.
5497 MemoryBackend automemory.Backend
5598
99+ // Shell, when set, is used only to clean up the staging subdirectory after a
100+ // successful promotion (rm -rf).
101+ // Optional. When nil, the Shell is auto-derived from MemoryBackend if that value
102+ // also implements filesystem.Shell (the common case for sandbox/filesystem
103+ // backends that satisfy both interfaces in one struct), so it need not be
104+ // configured twice. Set this field only to override that, or to supply a Shell
105+ // when MemoryBackend is not one. When neither yields a Shell, staging directories
106+ // are left in place under StagingDirectory.
107+ Shell filesystem.Shell
108+
56109 // Model is the model used by the internal dream agent.
57110 // Required.
58111 Model model.BaseModel [M ]
59112
113+ // MaxIterations caps the dream agent's tool-call loop.
114+ // Consolidation reads the index, skims and reads multiple topic files, then
115+ // writes/edits several files plus the index, so this needs headroom on large
116+ // memory directories.
117+ // Optional. Default: 24.
118+ MaxIterations int
119+
60120 // SessionID is the current logical session ID.
61121 // Optional. When empty, dream runs without cross-turn session grouping.
122+ //
123+ // For the scheduled middleware, the trigger counts distinct SessionIDs that have
124+ // touched the memory directory. A per-session SessionID is therefore required for
125+ // MinTouchedSession > 1 to be meaningful; without one the count stays at 1. New
126+ // warns through OnError when this is misconfigured.
62127 SessionID string
63128
64129 // OnError handles non-fatal runtime errors.
@@ -77,86 +142,115 @@ type Config[M adk.MessageType] struct {
77142 // - manual `Run(...)` searches the provided/current session only
78143 SessionStore adk.SessionEventStore [M ]
79144
80- // Schedule controls middleware-triggered runs only.
81- // Optional. `Run(...)` ignores it.
82- Schedule * ScheduleConfig
145+ // Store persists job records and the per-memory-directory run lock, enabling
146+ // GetDreamStatus/CancelDream and preventing concurrent writes to the same memory
147+ // directory. The scheduled path additionally uses it for touched sessions and
148+ // schedule state.
149+ // Optional. Default: in-process store from NewLocalKVStore (single-process only;
150+ // a warning is emitted through OnError when this default is used).
151+ //
152+ // See KVStore for why production deployments MUST inject a shared, durable store.
153+ Store KVStore
154+
155+ // LockTTL is the lease for the per-memory-directory run lock.
156+ // It must comfortably exceed the longest expected dream runtime: if a run
157+ // outlives the lease, another process may acquire the lock and write the same
158+ // memory directory concurrently.
159+ // Optional. Default: 1h.
160+ LockTTL time.Duration
83161
84162 // HandleIterator overrides iterator consumption.
85163 // Optional. Default: nil.
86164 HandleIterator HandleIterator [M ]
87165}
88166
89- // ScheduleConfig controls middleware-triggered runs.
90- type ScheduleConfig struct {
167+ // MiddlewareConfig configures the scheduled dream middleware created by New(...).
168+ // It embeds Config and adds the trigger knobs that only apply to automatic,
169+ // middleware-driven runs.
170+ type MiddlewareConfig [M adk.MessageType ] struct {
171+ Config [M ]
172+
91173 // MinInterval is the minimum interval between successful runs.
92174 // Optional. Default: 24h.
93175 MinInterval time.Duration
94176
95- // MinTouchedSession is the minimum touched-session count before a run.
177+ // MinTouchedSession is the minimum number of distinct sessions (by Config.SessionID)
178+ // that must touch the memory directory before a run. Set it to 1 to gate on
179+ // MinInterval alone. Values > 1 require a per-session Config.SessionID.
96180 // Optional. Default: 5.
97181 MinTouchedSession int
98182
99183 // ScanInterval is the retry delay when the session threshold is not met.
100184 // Optional. Default: 10m.
101185 ScanInterval time.Duration
102186
103- // LockTTL is the lease for the per-memory-directory run lock.
104- // Optional. Default: 1h.
105- LockTTL time.Duration
106-
107- // Store persists touched sessions, schedule state, and run locks.
108- // Optional. Default: in-process `LocalStore`.
109- Store Store
187+ // MaxConsecutiveFailures caps how many times a failing run is retried against
188+ // the same unconsolidated window before the window is advanced to avoid
189+ // replaying the same sessions forever.
190+ // Optional. Default: 3.
191+ MaxConsecutiveFailures int
110192
111193 // RunInline runs triggered dreams in the `AfterAgent` call path.
112194 // Optional. Default: false.
113195 RunInline bool
114196}
115197
116- func applyCoreDefaults [M adk.MessageType ](cfg * Config [M ]) error {
198+ // scheduleParams carries the resolved scheduler knobs onto the middleware. It is nil
199+ // for a Run(...)-only construction, which never triggers on a schedule.
200+ type scheduleParams struct {
201+ minInterval time.Duration
202+ minTouchedSession int
203+ scanInterval time.Duration
204+ maxConsecutiveFailures int
205+ runInline bool
206+ }
207+
208+ func applyCoreDefaults [M adk.MessageType ](ctx context.Context , cfg * Config [M ]) error {
117209 if cfg == nil {
118210 return fmt .Errorf ("auto dream config: nil" )
119211 }
120212 if cfg .MemoryDirectory == "" || cfg .MemoryBackend == nil || cfg .Model == nil {
121213 return fmt .Errorf ("auto dream config: invalid" )
122214 }
215+ if cfg .LockTTL <= 0 {
216+ cfg .LockTTL = defaultLockTTL
217+ }
218+ if cfg .Store == nil {
219+ cfg .Store = NewLocalKVStore ()
220+ if cfg .OnError != nil {
221+ cfg .OnError (ctx , OnErrorStageInit , errLocalKVStoreSingleProcess )
222+ }
223+ }
123224 return nil
124225}
125226
126227func cloneConfig [M adk.MessageType ](cfg * Config [M ]) * Config [M ] {
127228 if cfg == nil {
128229 return nil
129230 }
130-
131231 cp := * cfg
132- if cfg .Schedule != nil {
133- scheduleCopy := * cfg .Schedule
134- cp .Schedule = & scheduleCopy
135- }
136232 return & cp
137233}
138234
139- func applyScheduleDefaults [M adk.MessageType ](cfg * Config [M ]) error {
140- if err := applyCoreDefaults (cfg ); err != nil {
141- return err
142- }
143- if cfg .Schedule == nil {
144- cfg .Schedule = & ScheduleConfig {}
145- }
146- if cfg .Schedule .MinInterval <= 0 {
147- cfg .Schedule .MinInterval = defaultMinInterval
235+ func cloneMiddlewareConfig [M adk.MessageType ](cfg * MiddlewareConfig [M ]) * MiddlewareConfig [M ] {
236+ if cfg == nil {
237+ return nil
148238 }
149- if cfg .Schedule .MinTouchedSession <= 0 {
150- cfg .Schedule .MinTouchedSession = defaultMinTouchedSession
239+ cp := * cfg
240+ return & cp
241+ }
242+
243+ func applyScheduleDefaults [M adk.MessageType ](cfg * MiddlewareConfig [M ]) {
244+ if cfg .MinInterval <= 0 {
245+ cfg .MinInterval = defaultMinInterval
151246 }
152- if cfg .Schedule . ScanInterval <= 0 {
153- cfg .Schedule . ScanInterval = defaultScanInterval
247+ if cfg .MinTouchedSession <= 0 {
248+ cfg .MinTouchedSession = defaultMinTouchedSession
154249 }
155- if cfg .Schedule . LockTTL <= 0 {
156- cfg .Schedule . LockTTL = defaultLockTTL
250+ if cfg .ScanInterval <= 0 {
251+ cfg .ScanInterval = defaultScanInterval
157252 }
158- if cfg .Schedule . Store == nil {
159- cfg .Schedule . Store = NewLocalStore ()
253+ if cfg .MaxConsecutiveFailures <= 0 {
254+ cfg .MaxConsecutiveFailures = defaultMaxConsecutiveFailures
160255 }
161- return nil
162256}
0 commit comments