Skip to content

Commit 16ac03e

Browse files
committed
syz-agent: add agentic server
Add server for running agentic workflows as part of syzbot. The architecture and use are similar to that of syz-ci.
1 parent 0626e2a commit 16ac03e

File tree

3 files changed

+213
-2
lines changed

3 files changed

+213
-2
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ ifeq ("$(TARGETOS)", "trusty")
104104
endif
105105

106106
.PHONY: all clean host target \
107-
manager executor kfuzztest ci hub \
107+
manager executor kfuzztest ci hub agent \
108108
execprog mutate prog2c trace2syz repro upgrade db \
109109
usbgen symbolize cover kconf syz-build crush \
110110
bin/syz-extract bin/syz-fmt \
@@ -172,6 +172,9 @@ ci: descriptions
172172
hub: descriptions
173173
GOOS=$(HOSTOS) GOARCH=$(HOSTARCH) $(HOSTGO) build $(GOHOSTFLAGS) -o ./bin/syz-hub github.com/google/syzkaller/syz-hub
174174

175+
agent: descriptions
176+
GOOS=$(HOSTOS) GOARCH=$(HOSTARCH) $(HOSTGO) build $(GOHOSTFLAGS) -o ./bin/syz-agent github.com/google/syzkaller/syz-agent
177+
175178
repro: descriptions
176179
GOOS=$(HOSTOS) GOARCH=$(HOSTARCH) $(HOSTGO) build $(GOHOSTFLAGS) -o ./bin/syz-repro github.com/google/syzkaller/tools/syz-repro
177180

pkg/updater/updater.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func New(cfg *Config) (*Updater, error) {
8888
"tag": true, // contains syzkaller repo git hash
8989
"bin/syz-ci": true, // these are just copied from syzkaller dir
9090
"bin/syz-manager": true,
91+
"bin/syz-agent": true,
9192
"sys/*/test/*": true,
9293
}
9394
for target := range cfg.Targets {
@@ -273,7 +274,7 @@ func (upd *Updater) build(commit *vcs.Commit) error {
273274
}
274275
}
275276
// This will also generate descriptions and should go before the 'go test' below.
276-
cmd := osutil.Command(instance.MakeBin, "host", "ci")
277+
cmd := osutil.Command(instance.MakeBin, "host", "ci", "agent")
277278
cmd.Dir = upd.syzkallerDir
278279
cmd.Env = append([]string{"GOPATH=" + upd.gopathDir}, os.Environ()...)
279280
if _, err := osutil.Run(time.Hour, cmd); err != nil {

syz-agent/agent.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Copyright 2025 syzkaller project authors. All rights reserved.
2+
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"flag"
10+
"fmt"
11+
"maps"
12+
_ "net/http/pprof"
13+
"path/filepath"
14+
"sync"
15+
"time"
16+
17+
"github.com/google/syzkaller/dashboard/dashapi"
18+
"github.com/google/syzkaller/pkg/aflow"
19+
_ "github.com/google/syzkaller/pkg/aflow/flow"
20+
"github.com/google/syzkaller/pkg/aflow/trajectory"
21+
"github.com/google/syzkaller/pkg/config"
22+
"github.com/google/syzkaller/pkg/log"
23+
"github.com/google/syzkaller/pkg/mgrconfig"
24+
"github.com/google/syzkaller/pkg/osutil"
25+
"github.com/google/syzkaller/pkg/tool"
26+
"github.com/google/syzkaller/pkg/updater"
27+
"github.com/google/syzkaller/prog"
28+
)
29+
30+
type Config struct {
31+
// Currently serves only net/http/pprof handlers.
32+
HTTP string `json:"http"`
33+
DashboardAddr string `json:"dashboard_addr"`
34+
DashboardClient string `json:"dashboard_client"` // Global non-namespace client.
35+
DashboardKey string `json:"dashboard_key"`
36+
SyzkallerRepo string `json:"syzkaller_repo"`
37+
SyzkallerBranch string `json:"syzkaller_branch"`
38+
CodesearchToolBin string `json:"codesearch_tool_bin"`
39+
KernelConfig string `json:"kernel_config"`
40+
Target string `json:"target"`
41+
Image string `json:"image"`
42+
Type string `json:"type"`
43+
VM json.RawMessage `json:"vm"`
44+
// Use fixed base commit for patching jobs (for testing).
45+
FixedBaseCommit string `json:"fixed_base_commit"`
46+
// Use this LLM model (for testing, if empty use a default model).
47+
Model string `json:"model"`
48+
}
49+
50+
func main() {
51+
var (
52+
flagConfig = flag.String("config", "", "config file")
53+
flagExitOnUpgrade = flag.Bool("exit-on-upgrade", false,
54+
"exit after a syz-ci upgrade is applied; otherwise syz-ci restarts")
55+
flagAutoUpdate = flag.Bool("autoupdate", true, "auto-update the binary (for testing)")
56+
)
57+
defer tool.Init()()
58+
log.SetName("syz-agent")
59+
if err := run(*flagConfig, *flagExitOnUpgrade, *flagAutoUpdate); err != nil {
60+
log.Fatal(err)
61+
}
62+
}
63+
64+
func run(configFile string, exitOnUpgrade, autoUpdate bool) error {
65+
cfg := &Config{
66+
SyzkallerRepo: "https://github.com/google/syzkaller.git",
67+
SyzkallerBranch: "master",
68+
Model: aflow.DefaultModel,
69+
}
70+
if err := config.LoadFile(configFile, cfg); err != nil {
71+
return fmt.Errorf("failed to load config: %w", err)
72+
}
73+
tool.ServeHTTP(cfg.HTTP)
74+
os, vmarch, arch, _, _, err := mgrconfig.SplitTarget(cfg.Target)
75+
if err != nil {
76+
return err
77+
}
78+
dash, err := dashapi.New(cfg.DashboardClient, cfg.DashboardAddr, cfg.DashboardKey)
79+
if err != nil {
80+
return err
81+
}
82+
buildSem := osutil.NewSemaphore(1)
83+
updater, err := updater.New(&updater.Config{
84+
ExitOnUpdate: exitOnUpgrade,
85+
BuildSem: buildSem,
86+
SyzkallerRepo: cfg.SyzkallerRepo,
87+
SyzkallerBranch: cfg.SyzkallerBranch,
88+
Targets: map[updater.Target]bool{
89+
{
90+
OS: os,
91+
VMArch: vmarch,
92+
Arch: arch,
93+
}: true,
94+
},
95+
})
96+
if err != nil {
97+
return err
98+
}
99+
updatePending := make(chan struct{})
100+
shutdownPending := make(chan struct{})
101+
osutil.HandleInterrupts(shutdownPending)
102+
updater.UpdateOnStart(autoUpdate, updatePending, shutdownPending)
103+
104+
cache, err := aflow.NewCache(osutil.Abs("workdir"))
105+
if err != nil {
106+
return err
107+
}
108+
109+
ctx, stop := context.WithCancel(context.Background())
110+
var wg sync.WaitGroup
111+
wg.Add(1)
112+
go func() {
113+
defer wg.Done()
114+
for {
115+
ok, err := poll(ctx, cfg, dash, cache)
116+
if err != nil {
117+
log.Error(err)
118+
dash.LogError("syz-agent", "%v", err)
119+
}
120+
var delay time.Duration
121+
if !ok {
122+
// Don't poll dashboard too often, if there are no jobs,
123+
// or errors are happenning.
124+
delay = 10 * time.Second
125+
}
126+
select {
127+
case <-ctx.Done():
128+
return
129+
case <-time.After(delay):
130+
}
131+
}
132+
}()
133+
134+
select {
135+
case <-shutdownPending:
136+
case <-updatePending:
137+
}
138+
stop()
139+
wg.Wait()
140+
141+
select {
142+
case <-shutdownPending:
143+
default:
144+
updater.UpdateAndRestart()
145+
}
146+
return nil
147+
}
148+
149+
func poll(ctx context.Context, cfg *Config, dash *dashapi.Dashboard, cache *aflow.Cache) (bool, error) {
150+
req := &dashapi.AIJobPollReq{
151+
LLMModel: cfg.Model,
152+
CodeRevision: prog.GitRevision,
153+
}
154+
for _, flow := range aflow.Flows {
155+
req.Workflows = append(req.Workflows, dashapi.AIWorkflow{
156+
Type: flow.Type,
157+
Name: flow.Name,
158+
})
159+
}
160+
resp, err := dash.AIJobPoll(req)
161+
if err != nil {
162+
return false, err
163+
}
164+
if resp.ID == "" {
165+
return false, nil
166+
}
167+
flow := aflow.Flows[resp.Workflow]
168+
if flow == nil {
169+
return false, fmt.Errorf("unsupported flow %q", resp.Workflow)
170+
}
171+
doneReq := &dashapi.AIJobDoneReq{
172+
ID: resp.ID,
173+
}
174+
results, jobErr := executeJob(ctx, cfg, dash, cache, flow, resp)
175+
doneReq.Results = results
176+
if jobErr != nil {
177+
doneReq.Error = jobErr.Error()
178+
}
179+
if err := dash.AIJobDone(doneReq); err != nil {
180+
return false, err
181+
}
182+
if jobErr != nil {
183+
return false, jobErr
184+
}
185+
return true, nil
186+
}
187+
188+
func executeJob(ctx context.Context, cfg *Config, dash *dashapi.Dashboard, cache *aflow.Cache,
189+
flow *aflow.Flow, req *dashapi.AIJobPollResp) (map[string]any, error) {
190+
inputs := map[string]any{
191+
"CodesearchToolBin": cfg.CodesearchToolBin,
192+
"Syzkaller": osutil.Abs(filepath.FromSlash("syzkaller/current")),
193+
"Image": cfg.Image,
194+
"Type": cfg.Type,
195+
"VM": cfg.VM,
196+
"FixedBaseCommit": cfg.FixedBaseCommit,
197+
}
198+
maps.Insert(inputs, maps.All(req.Args))
199+
onEvent := func(span *trajectory.Span) error {
200+
log.Logf(0, "%v", span)
201+
return dash.AITrajectoryLog(&dashapi.AITrajectoryReq{
202+
JobID: req.ID,
203+
Span: span,
204+
})
205+
}
206+
return flow.Execute(ctx, cfg.Model, inputs, cache, onEvent)
207+
}

0 commit comments

Comments
 (0)