Skip to content

Commit ed48d15

Browse files
committed
feat(adk): dream improvements
1 parent 26bc5ca commit ed48d15

9 files changed

Lines changed: 2325 additions & 460 deletions

File tree

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright 2026 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dream
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
"github.com/cloudwego/eino/adk"
26+
)
27+
28+
// This file holds the operations a caller invokes directly on a dream: starting a
29+
// run, querying its status, and canceling it. The scheduled middleware is created
30+
// with New in middleware.go.
31+
32+
// Run starts a one-shot dream and returns the job id.
33+
//
34+
// By default (RunConfig.Sync == false) Run is asynchronous: it persists a pending
35+
// job, starts the consolidation in a background goroutine, and returns the job id
36+
// immediately with a nil error. Track it with GetDreamStatus and stop it with
37+
// CancelDream using that id. Set RunConfig.Sync to block until the run reaches a
38+
// terminal state and return its error instead.
39+
//
40+
// The job's lifecycle is observable through GetDreamStatus and cancelable through
41+
// CancelDream (the in-process default store only supports same-process queries and
42+
// requires the process to outlive the run; inject a shared store for cross-process
43+
// visibility).
44+
//
45+
// Run acquires the per-memory-directory run lock so a manual dream does not write
46+
// concurrently with a scheduled run sharing the same store; if the lock is held, Run
47+
// returns an empty id and a nil error. The input memory directory is never modified:
48+
// the model edits a staged working copy that is promoted to the output directory on
49+
// success.
50+
func Run[M adk.MessageType](ctx context.Context, cfg *RunConfig[M]) (string, error) {
51+
cfg = cloneRunConfig(cfg)
52+
if err := applyCoreDefaults(ctx, &cfg.BaseConfig); err != nil {
53+
return "", err
54+
}
55+
m, err := newMiddleware(&cfg.BaseConfig, cfg.SessionID, nil)
56+
if err != nil {
57+
return "", err
58+
}
59+
sessionID := strings.TrimSpace(cfg.SessionID)
60+
61+
var unlock func(context.Context) error
62+
if store := m.store(); store != nil {
63+
u, ok, lockErr := store.AcquireLock(ctx, runLockKey(m.resolvedMemoryDir), m.lockTTL())
64+
if lockErr != nil || !ok {
65+
return "", lockErr
66+
}
67+
unlock = u
68+
}
69+
70+
job := m.newJob(sessionID, nil)
71+
m.persistJob(ctx, job)
72+
73+
if cfg.Sync {
74+
if unlock != nil {
75+
defer func() { _ = unlock(ctx) }()
76+
}
77+
if err := m.executeJob(ctx, job, sessionID, nil); err != nil {
78+
return job.ID, err
79+
}
80+
return job.ID, nil
81+
}
82+
83+
// Asynchronous: detach from the request lifecycle (so the run can outlive ctx)
84+
// while preserving its values, and hand lock ownership to the goroutine. The
85+
// run's outcome is reported through the job record and OnError, not the return.
86+
runCtx := withoutCancel(ctx)
87+
go func() {
88+
if unlock != nil {
89+
defer func() { _ = unlock(runCtx) }()
90+
}
91+
if err := m.executeJob(runCtx, job, sessionID, nil); err != nil {
92+
m.onErr(runCtx, OnErrorStageRunDream, err)
93+
}
94+
}()
95+
return job.ID, nil
96+
}
97+
98+
// GetDreamStatus returns the current Job record for jobID. It returns (nil, nil)
99+
// when no such job exists (for example after the retention TTL elapsed).
100+
func GetDreamStatus(ctx context.Context, store KVStore, jobID string) (*Job, error) {
101+
if store == nil {
102+
return nil, fmt.Errorf("dream: nil store")
103+
}
104+
return getJob(ctx, store, jobID)
105+
}
106+
107+
// CancelDream requests cancellation of a pending or running dream job. It marks the
108+
// job canceled in the store and signals any in-process run to abort. Canceling a job
109+
// that has already reached a terminal state is a no-op. Cross-process runs observe
110+
// the canceled status on their next iteration check and stop best-effort.
111+
func CancelDream(ctx context.Context, store KVStore, jobID string) error {
112+
if store == nil {
113+
return fmt.Errorf("dream: nil store")
114+
}
115+
job, err := getJob(ctx, store, jobID)
116+
if err != nil {
117+
return err
118+
}
119+
if job == nil {
120+
return fmt.Errorf("dream: job not found: %s", jobID)
121+
}
122+
if job.Status.IsTerminal() {
123+
return nil
124+
}
125+
job.Status = StatusCanceled
126+
job.EndedAt = time.Now()
127+
if err := setJob(ctx, store, job, jobTTL); err != nil {
128+
return err
129+
}
130+
signalCancel(jobID)
131+
return nil
132+
}

0 commit comments

Comments
 (0)