Skip to content

Commit 26bc5ca

Browse files
hi-penderclaude
authored andcommitted
feat(adk): background-task manager with subagent/filesystem/deep wiring (#1107)
* feat(adk): background-task manager with subagent/filesystem/deep wiring Introduce a shared, domain-agnostic background-task engine and wire it into the subagent and filesystem middlewares and the deep prebuilt agent. adk/backgroundtask (engine): - Manager tracks foreground/background/auto-background runs under one task-ID space; Run blocks with an optional foreground budget, then either completes, auto-backgrounds (Config.ShouldAutoBackground), or times out. - RunStream + StreamWorkFunc forward a run's output to the caller in real time during the foreground phase, then on auto-background inject a generic notice and drain the rest into the task result. - Cancellation records a reason on Task.Error and the foreground caller reports StatusCanceled (not a StatusFailed ctx error). - Task ids are TaskType_base62(int64), where the int64 packs a ms timestamp, a per-ms sequence (spinning to the next ms on overflow), and random low bits — unique within a process and self-describing by type. - Optional OutputStore persists completed results (filesystem.Backend satisfies it directly); WaitForTask/WaitAllDone for lifecycle waits. adk/middlewares/backgroundtask (control tools): - Injects task_output/task_stop once, bound to a Config{Manager}. task_output supports CC-aligned block/timeout inputs. adk/middlewares/subagent + filesystem: - subagent agent tool and filesystem execute tool route through a shared Manager when configured, gaining run_in_background; the streaming execute tool streams its foreground output via RunStream. filesystem.Shell now documents the ctx-cancellation contract. adk/prebuilt/deep: - deep.New accepts a Manager, wiring it into the top-level subagent + filesystem middlewares and injecting the control tools once; sub-agents stay foreground-only. Replaces the old task_tool with the subagent middleware. Co-Authored-By: Claude Opus 4 <noreply@anthropic.com> * feat: adjust background.Manager * refactor(adk): clarify background task events * feat: simplify done check * feat: reduce one goroutine for direct run_in_background * refactor: enchance background task with direct run_in_background * refactor(adk): make background task output file worker-owned The background-task Manager previously declared an output file globally and wrote it once at completion, which broke its "interim output" promise and made the file redundant with Task.Result. Move output-file ownership to the launching adapters (execute / agent tools): the Manager only records RunInput.OutputFile, while the worker writes — shell runs tee interim output as it streams, sub-agent runs append their final result. - backgroundtask: drop Config.OutputStore/OutputDir and persistOutput; add RunInput.OutputFile (path only, Manager never writes) - filesystem: add Appender optional interface + AppendRequest (InMemoryBackend implements it); output files require an Appender, no rewrite fallback - bundle Manager + output config into a nested BackgroundConfig across the filesystem, subagent, and deep configs - name output files after the launching tool-call id (matching Task.ToolUseID), with a uuid fallback when absent - task_output's formatTask points at the file when present instead of inlining the result Co-Authored-By: Claude Opus 4 <noreply@anthropic.com> * feat: support mark output file * fix: golangci-lint * refactor(adk): hand WorkFunc a TaskInfo and key output-file failures by id The launcher needs the Manager-assigned task id at write time to report an output-file write failure, but the id is generated inside createTask, after the work closure is already built. Pass a TaskInfo (read-only snapshot of creation-time identity) as an explicit WorkFunc/StreamWorkFunc parameter so the work receives the id directly; MarkOutputFileUnreliable then keys by id (O(1) map lookup) instead of scanning all tasks by output-file path. Also make the failed-write reporting honest: when a write fails, neither the partial file nor the in-memory Result is the authoritative complete output (Result may be empty while the task runs, or a partial projection of the file for sub-agent runs). formatTask and the OutputFileErr doc no longer claim Result is always the full copy. Co-Authored-By: Claude Opus 4 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4 <noreply@anthropic.com>
1 parent a77de0d commit 26bc5ca

25 files changed

Lines changed: 5507 additions & 766 deletions

adk/backgroundtask/id.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2026 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package backgroundtask
18+
19+
import (
20+
"math/rand"
21+
"time"
22+
)
23+
24+
// Task-id layout: a positive int64 (63 usable bits) packed as
25+
//
26+
// [ 41 bits ms timestamp ][ 12 bits sequence ][ 10 bits random ]
27+
//
28+
// Uniqueness within a process is guaranteed by (timestamp, sequence): the
29+
// sequence resets each millisecond and increments for every id minted within the
30+
// same millisecond, all under the Manager lock. If more than 2^12 ids are minted
31+
// in a single millisecond the generator spins to the next millisecond rather than
32+
// wrapping the sequence, so (timestamp, sequence) never repeats. The random low
33+
// bits only make ids look unordered/unpredictable; they are not relied upon for
34+
// uniqueness.
35+
//
36+
// 41 bits of milliseconds covers ~69 years; 12 bits allows 4096 ids per
37+
// millisecond before the generator advances to the next millisecond.
38+
const (
39+
idSeqBits = 12
40+
idRandomBits = 10
41+
idSeqLimit = 1 << idSeqBits
42+
idRandomMask = (1 << idRandomBits) - 1
43+
)
44+
45+
// nextRawID packs the next task id integer. Must be called with m.mu held, as it
46+
// reads and advances m.seq / m.lastMs.
47+
func (m *Manager) nextRawID() int64 {
48+
ms := time.Now().UnixMilli()
49+
switch {
50+
case ms > m.lastMs:
51+
m.lastMs = ms
52+
m.seq = 0
53+
default:
54+
// Same millisecond (or a backward clock step): keep the id monotonic by
55+
// staying on lastMs and advancing the sequence. On sequence overflow, move
56+
// to the next millisecond so (timestamp, sequence) stays unique.
57+
ms = m.lastMs
58+
m.seq++
59+
if m.seq >= idSeqLimit {
60+
ms = m.waitNextMs(m.lastMs)
61+
m.lastMs = ms
62+
m.seq = 0
63+
}
64+
}
65+
66+
//nolint:gosec // non-cryptographic: random bits only diffuse the id's look.
67+
r := int64(rand.Intn(idRandomMask + 1))
68+
return (ms << (idSeqBits + idRandomBits)) | (m.seq << idRandomBits) | r
69+
}
70+
71+
// waitNextMs busy-waits until the wall clock advances past prevMs. Reached only
72+
// when more than 2^12 ids are minted within one millisecond.
73+
func (m *Manager) waitNextMs(prevMs int64) int64 {
74+
ms := time.Now().UnixMilli()
75+
for ms <= prevMs {
76+
ms = time.Now().UnixMilli()
77+
}
78+
return ms
79+
}
80+
81+
// base62 encodes a non-negative int64 using [0-9A-Za-z]. It is the compact,
82+
// URL-safe textual form of a task id's integer.
83+
func base62(n int64) string {
84+
const alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
85+
if n == 0 {
86+
return "0"
87+
}
88+
var buf [11]byte // ceil(63 / log2(62)) = 11
89+
i := len(buf)
90+
for n > 0 {
91+
i--
92+
buf[i] = alphabet[n%62]
93+
n /= 62
94+
}
95+
return string(buf[i:])
96+
}
97+
98+
// defaultTaskIDPrefix is used when a task has no Type tag.
99+
const defaultTaskIDPrefix = "task"
100+
101+
// taskIDPrefix returns the id prefix for a task type, falling back to a generic
102+
// prefix when the type is empty. The type tag (e.g. "bash", "subagent") makes ids
103+
// self-describing: "bash_3Fa9...".
104+
func taskIDPrefix(taskType string) string {
105+
if taskType == "" {
106+
return defaultTaskIDPrefix
107+
}
108+
return taskType
109+
}

adk/backgroundtask/id_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright 2026 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package backgroundtask
18+
19+
import (
20+
"context"
21+
"errors"
22+
"strings"
23+
"testing"
24+
25+
"github.com/stretchr/testify/assert"
26+
"github.com/stretchr/testify/require"
27+
)
28+
29+
func TestBase62(t *testing.T) {
30+
assert.Equal(t, "0", base62(0))
31+
assert.Equal(t, "A", base62(10))
32+
assert.Equal(t, "10", base62(62))
33+
// Round-trippable shape: only alphabet chars, non-empty.
34+
const alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
35+
for _, n := range []int64{1, 61, 100, 1 << 40, (1 << 63) - 1} {
36+
s := base62(n)
37+
assert.NotEmpty(t, s)
38+
for _, c := range s {
39+
assert.True(t, strings.ContainsRune(alphabet, c), "char %q not in alphabet", c)
40+
}
41+
}
42+
}
43+
44+
func TestTaskIDPrefix(t *testing.T) {
45+
assert.Equal(t, "bash", taskIDPrefix("bash"))
46+
assert.Equal(t, "subagent", taskIDPrefix("subagent"))
47+
assert.Equal(t, defaultTaskIDPrefix, taskIDPrefix(""))
48+
}
49+
50+
// IDs minted in a tight loop within one process must never collide, and must
51+
// carry the task-type prefix.
52+
func TestCreateTask_IDsUniqueAndPrefixed(t *testing.T) {
53+
m := New(context.Background(), &Config{})
54+
defer closeWithTimeout(m)
55+
56+
const n = 20000
57+
seen := make(map[string]struct{}, n)
58+
for i := 0; i < n; i++ {
59+
id, err := m.createTask(context.Background(), &RunInput{Type: "bash", Description: "x"})
60+
if err != nil {
61+
t.Fatalf("createTask: %v", err)
62+
}
63+
assert.True(t, strings.HasPrefix(id, "bash_"), "id %q missing type prefix", id)
64+
if _, dup := seen[id]; dup {
65+
t.Fatalf("duplicate id generated: %q", id)
66+
}
67+
seen[id] = struct{}{}
68+
}
69+
assert.Len(t, seen, n)
70+
}
71+
72+
// An empty task type falls back to the generic prefix.
73+
func TestCreateTask_EmptyTypePrefix(t *testing.T) {
74+
m := New(context.Background(), &Config{})
75+
defer closeWithTimeout(m)
76+
77+
id, err := m.createTask(context.Background(), &RunInput{Description: "x"})
78+
assert.NoError(t, err)
79+
assert.True(t, strings.HasPrefix(id, defaultTaskIDPrefix+"_"), "id %q", id)
80+
}
81+
82+
type taskIDContextKey struct{}
83+
84+
func TestManager_IDGenOverridesDefaultID(t *testing.T) {
85+
const wantID = "short_000001"
86+
ctx := context.WithValue(context.Background(), taskIDContextKey{}, "trace-1")
87+
called := false
88+
m := New(context.Background(), &Config{
89+
IDGen: func(ctx context.Context, input *RunInput) (string, error) {
90+
called = true
91+
assert.Equal(t, "bash", input.Type)
92+
assert.Equal(t, "call_1", input.ToolUseID)
93+
assert.Equal(t, "trace-1", ctx.Value(taskIDContextKey{}))
94+
return wantID, nil
95+
},
96+
})
97+
defer closeWithTimeout(m)
98+
99+
result, err := m.Run(ctx, &RunInput{
100+
Description: "x",
101+
Type: "bash",
102+
ToolUseID: "call_1",
103+
}, workReturning("ok", nil))
104+
require.NoError(t, err)
105+
assert.True(t, called)
106+
assert.Equal(t, wantID, result.ID)
107+
108+
task, ok := m.Get(wantID)
109+
require.True(t, ok)
110+
assert.Equal(t, wantID, task.ID)
111+
assert.Equal(t, "bash", task.Type)
112+
}
113+
114+
func TestCreateTask_IDGenEmptyIDFails(t *testing.T) {
115+
m := New(context.Background(), &Config{
116+
IDGen: func(context.Context, *RunInput) (string, error) {
117+
return "", nil
118+
},
119+
})
120+
defer closeWithTimeout(m)
121+
122+
_, err := m.createTask(context.Background(), &RunInput{Description: "x"})
123+
require.Error(t, err)
124+
assert.Contains(t, err.Error(), "empty id")
125+
assert.Empty(t, m.List())
126+
}
127+
128+
func TestCreateTask_IDGenDuplicateIDFails(t *testing.T) {
129+
m := New(context.Background(), &Config{
130+
IDGen: func(context.Context, *RunInput) (string, error) {
131+
return "fixed", nil
132+
},
133+
})
134+
defer closeWithTimeout(m)
135+
136+
id, err := m.createTask(context.Background(), &RunInput{Description: "first"})
137+
require.NoError(t, err)
138+
assert.Equal(t, "fixed", id)
139+
140+
_, err = m.createTask(context.Background(), &RunInput{Description: "second"})
141+
require.Error(t, err)
142+
assert.Contains(t, err.Error(), `task id "fixed" already exists`)
143+
assert.Len(t, m.List(), 1)
144+
}
145+
146+
func TestManager_IDGenErrorFailsRun(t *testing.T) {
147+
wantErr := errors.New("allocate id")
148+
m := New(context.Background(), &Config{
149+
IDGen: func(context.Context, *RunInput) (string, error) {
150+
return "", wantErr
151+
},
152+
})
153+
defer closeWithTimeout(m)
154+
155+
_, err := m.Run(context.Background(), &RunInput{Description: "x"}, workReturning("ok", nil))
156+
require.Error(t, err)
157+
assert.ErrorIs(t, err, wantErr)
158+
assert.Contains(t, err.Error(), "task id generator")
159+
assert.Empty(t, m.List())
160+
}

0 commit comments

Comments
 (0)