Skip to content

Commit 6fef696

Browse files
committed
syz-ci: use context for termination
1 parent 874a138 commit 6fef696

File tree

4 files changed

+47
-52
lines changed

4 files changed

+47
-52
lines changed

syz-ci/jobs.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package main
55

66
import (
77
"bytes"
8+
"context"
89
"errors"
910
"fmt"
1011
"io"
@@ -61,24 +62,16 @@ func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct
6162
}, nil
6263
}
6364

64-
// startLoop starts a job loop in parallel and returns a blocking function
65-
// to gracefully stop job processing.
66-
func (jm *JobManager) startLoop(wg *sync.WaitGroup) func() {
67-
stop := make(chan struct{})
68-
done := make(chan struct{}, 1)
65+
// startLoop starts a job loop in parallel.
66+
func (jm *JobManager) startLoop(ctx context.Context, wg *sync.WaitGroup) {
6967
wg.Add(1)
7068
go func() {
7169
defer wg.Done()
72-
jm.loop(stop)
73-
done <- struct{}{}
70+
jm.loop(ctx)
7471
}()
75-
return func() {
76-
close(stop)
77-
<-done
78-
}
7972
}
8073

81-
func (jm *JobManager) loop(stop chan struct{}) {
74+
func (jm *JobManager) loop(ctx context.Context) {
8275
if err := jm.resetJobs(); err != nil {
8376
if jm.dash != nil {
8477
jm.dash.LogError("syz-ci", "reset jobs failed: %v", err)
@@ -109,7 +102,7 @@ func (jm *JobManager) loop(stop chan struct{}) {
109102
wg.Add(1)
110103
go func() {
111104
defer wg.Done()
112-
jp.loop(stop)
105+
jp.loop(ctx)
113106
}()
114107
if !main || !jm.needParallelProcessor() {
115108
break
@@ -143,14 +136,14 @@ func (jm *JobManager) resetJobs() error {
143136
return nil
144137
}
145138

146-
func (jp *JobProcessor) loop(stop chan struct{}) {
139+
func (jp *JobProcessor) loop(ctx context.Context) {
147140
jp.Logf(0, "job loop started")
148141
loop:
149142
for {
150143
// Check jp.stop separately first, otherwise if stop signal arrives during a job execution,
151144
// we can still grab the next job with 50% probability.
152145
select {
153-
case <-stop:
146+
case <-ctx.Done():
154147
break loop
155148
default:
156149
}
@@ -166,7 +159,7 @@ loop:
166159
jp.pollJobs()
167160
case <-jp.commitTicker:
168161
jp.pollCommits()
169-
case <-stop:
162+
case <-ctx.Done():
170163
break loop
171164
}
172165
}

syz-ci/manager.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Manager struct {
8989
dash ManagerDashapi
9090
debugStorage bool
9191
storage *asset.Storage
92-
stop chan struct{}
9392
debug bool
9493
lastBuild *dashapi.Build
9594
buildFailed bool
@@ -105,8 +104,7 @@ type ManagerDashapi interface {
105104
UploadCommits(commits []dashapi.Commit) error
106105
}
107106

108-
func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
109-
debug bool) (*Manager, error) {
107+
func createManager(cfg *Config, mgrcfg *ManagerConfig, debug bool) (*Manager, error) {
110108
dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name))
111109
err := osutil.MkdirAll(dir)
112110
if err != nil {
@@ -157,7 +155,6 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
157155
managercfg: mgrcfg.managercfg,
158156
storage: assetStorage,
159157
debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug,
160-
stop: stop,
161158
debug: debug,
162159
}
163160
// Leave the dashboard interface value as nil if it does not wrap a valid dashboard pointer.
@@ -182,7 +179,7 @@ var testSem = instance.NewSemaphore(1)
182179
const fuzzingMinutesBeforeCover = 360
183180
const benchUploadPeriod = 30 * time.Minute
184181

185-
func (mgr *Manager) loop() {
182+
func (mgr *Manager) loop(ctx context.Context) {
186183
lastCommit := ""
187184
nextBuildTime := time.Now()
188185
var managerRestartTime, artifactUploadTime, benchUploadTime time.Time
@@ -210,20 +207,20 @@ loop:
210207
for {
211208
if time.Since(nextBuildTime) >= 0 {
212209
var rebuildAfter time.Duration
213-
lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(lastCommit, latestInfo)
210+
lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(ctx, lastCommit, latestInfo)
214211
nextBuildTime = time.Now().Add(rebuildAfter)
215212
}
216213
if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) {
217214
artifactUploadTime = time.Time{}
218-
if err := mgr.uploadCoverReport(); err != nil {
215+
if err := mgr.uploadCoverReport(ctx); err != nil {
219216
mgr.Errorf("failed to upload cover report: %v", err)
220217
}
221-
if err := mgr.uploadProgramsWithCoverage(); err != nil {
218+
if err := mgr.uploadProgramsWithCoverage(ctx); err != nil {
222219
mgr.Errorf("failed to upload programs with coverage: %v", err)
223220
}
224221
// Function uploadCoverStat also forces manager to drop the coverage structures to reduce memory usage.
225222
// Should be the last request touching the coverage data.
226-
if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil {
223+
if err := mgr.uploadCoverStat(ctx, fuzzingMinutesBeforeCover); err != nil {
227224
mgr.Errorf("failed to upload coverage stat: %v", err)
228225
}
229226
if err := mgr.uploadCorpus(); err != nil {
@@ -232,13 +229,13 @@ loop:
232229
}
233230
if mgr.cfg.BenchUploadPath != "" && time.Now().After(benchUploadTime) {
234231
benchUploadTime = time.Now().Add(benchUploadPeriod)
235-
if err := mgr.uploadBenchData(); err != nil {
232+
if err := mgr.uploadBenchData(ctx); err != nil {
236233
mgr.Errorf("failed to upload bench: %v", err)
237234
}
238235
}
239236

240237
select {
241-
case <-mgr.stop:
238+
case <-ctx.Done():
242239
break loop
243240
default:
244241
}
@@ -253,7 +250,7 @@ loop:
253250

254251
select {
255252
case <-ticker.C:
256-
case <-mgr.stop:
253+
case <-ctx.Done():
257254
break loop
258255
}
259256
}
@@ -275,7 +272,7 @@ func (mgr *Manager) archiveCommit(commit string) {
275272
}
276273
}
277274

278-
func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
275+
func (mgr *Manager) pollAndBuild(ctx context.Context, lastCommit string, latestInfo *BuildInfo) (
279276
string, *BuildInfo, time.Duration) {
280277
rebuildAfter := buildRetryPeriod
281278
commit, err := mgr.repo.Poll(mgr.mgrcfg.Repo, mgr.mgrcfg.Branch)
@@ -307,7 +304,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
307304
}
308305
}
309306
buildSem.Signal()
310-
case <-mgr.stop:
307+
case <-ctx.Done():
311308
}
312309
}
313310
}
@@ -848,7 +845,7 @@ func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Respon
848845
return client.Do(req)
849846
}
850847

851-
func (mgr *Manager) uploadCoverReport() error {
848+
func (mgr *Manager) uploadCoverReport(ctx context.Context) error {
852849
directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != ""
853850
if mgr.storage == nil && !directUpload {
854851
// Cover report uploading is disabled.
@@ -860,7 +857,7 @@ func (mgr *Manager) uploadCoverReport() error {
860857
// Report generation can consume lots of memory. Generate one at a time.
861858
select {
862859
case <-buildSem.WaitC():
863-
case <-mgr.stop:
860+
case <-ctx.Done():
864861
return nil
865862
}
866863
defer buildSem.Signal()
@@ -886,7 +883,7 @@ func (mgr *Manager) uploadCoverReport() error {
886883
return nil
887884
}
888885

889-
func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string,
886+
func (mgr *Manager) uploadCoverJSONLToGCS(ctx context.Context, gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string,
890887
publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
891888
if !mgr.managercfg.Cover || gcsDest == "" {
892889
return nil
@@ -896,12 +893,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
896893
// TODO: remove it once #4585 (symbolization tuning) is closed
897894
select {
898895
case <-buildSem.WaitC():
899-
case <-mgr.stop:
896+
case <-ctx.Done():
900897
return nil
901898
}
902899
defer buildSem.Signal()
903900

904-
eg, egCtx := errgroup.WithContext(context.Background())
901+
eg, egCtx := errgroup.WithContext(ctx)
905902
resp, err := mgr.httpGET(egCtx, mgrSrc)
906903
if err != nil {
907904
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
@@ -943,12 +940,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
943940
return eg.Wait()
944941
}
945942

946-
func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
943+
func (mgr *Manager) uploadCoverStat(ctx context.Context, fuzzingMinutes int) error {
947944
// Coverage report generation consumes and caches lots of memory.
948945
// In the syz-ci context report generation won't be used after this point,
949946
// so tell manager to flush report generator.
950947
curTime := time.Now()
951-
if err := mgr.uploadCoverJSONLToGCS(nil,
948+
if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
952949
"/cover?jsonl=1&flush=1",
953950
mgr.cfg.CoverPipelinePath,
954951
time.Now().Format("-2006-01-02-15-04"),
@@ -979,8 +976,8 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
979976
return nil
980977
}
981978

982-
func (mgr *Manager) uploadProgramsWithCoverage() error {
983-
if err := mgr.uploadCoverJSONLToGCS(nil,
979+
func (mgr *Manager) uploadProgramsWithCoverage(ctx context.Context) error {
980+
if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
984981
"/coverprogs?jsonl=1",
985982
mgr.cfg.CoverProgramsPath,
986983
"",
@@ -1015,7 +1012,7 @@ func (mgr *Manager) uploadCorpus() error {
10151012
return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
10161013
}
10171014

1018-
func (mgr *Manager) uploadBenchData() error {
1015+
func (mgr *Manager) uploadBenchData(ctx context.Context) error {
10191016
if mgr.lastRestarted.IsZero() {
10201017
return nil
10211018
}
@@ -1030,7 +1027,7 @@ func (mgr *Manager) uploadBenchData() error {
10301027
return fmt.Errorf("failed to open bench file: %w", err)
10311028
}
10321029
defer f.Close()
1033-
err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
1030+
err = uploadFile(ctx, nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
10341031
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
10351032
if err != nil {
10361033
return fmt.Errorf("failed to upload the bench file: %w", err)

syz-ci/manager_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package main
66
import (
77
"bytes"
88
"compress/gzip"
9+
"context"
910
"encoding/json"
1011
"fmt"
1112
"io"
@@ -200,7 +201,7 @@ func TestUploadCoverJSONLToGCS(t *testing.T) {
200201
gcsMock.On("Publish", test.wantGCSFileName).
201202
Return(nil).Once()
202203
}
203-
err := mgr.uploadCoverJSONLToGCS(gcsMock,
204+
err := mgr.uploadCoverJSONLToGCS(context.Background(), gcsMock,
204205
"/teststream&jsonl=1",
205206
"gs://test-bucket",
206207
test.inputNameSuffix,

syz-ci/syz-ci.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ package main
5252
// modification time allows us to understand if we need to rebuild after a restart.
5353

5454
import (
55+
"context"
5556
"encoding/json"
5657
"flag"
5758
"fmt"
@@ -281,10 +282,10 @@ func main() {
281282
}()
282283
}
283284

284-
stop := make(chan struct{})
285+
ctx, stop := context.WithCancel(context.Background())
285286
var managers []*Manager
286287
for _, mgrcfg := range cfg.Managers {
287-
mgr, err := createManager(cfg, mgrcfg, stop, *flagDebug)
288+
mgr, err := createManager(cfg, mgrcfg, *flagDebug)
288289
if err != nil {
289290
log.Errorf("failed to create manager %v: %v", mgrcfg.Name, err)
290291
continue
@@ -300,20 +301,22 @@ func main() {
300301
wg.Add(1)
301302
go func() {
302303
defer wg.Done()
303-
mgr.loop()
304+
mgr.loop(ctx)
304305
}()
305306
}
306307
}
307308
jp, err := newJobManager(cfg, managers, shutdownPending)
308309
if err != nil {
309310
log.Fatalf("failed to create dashapi connection %v", err)
310311
}
311-
stopJobs := jp.startLoop(&wg)
312+
ctxJobs, stopJobs := context.WithCancel(ctx)
313+
wgJobs := sync.WaitGroup{}
314+
jp.startLoop(ctxJobs, &wgJobs)
312315

313316
// For testing. Racy. Use with care.
314317
http.HandleFunc("/upload_cover", func(w http.ResponseWriter, r *http.Request) {
315318
for _, mgr := range managers {
316-
if err := mgr.uploadCoverReport(); err != nil {
319+
if err := mgr.uploadCoverReport(ctx); err != nil {
317320
w.Write([]byte(fmt.Sprintf("failed for %v: %v <br>\n", mgr.name, err)))
318321
return
319322
}
@@ -322,14 +325,15 @@ func main() {
322325
})
323326

324327
wg.Add(1)
325-
go deprecateAssets(cfg, stop, &wg)
328+
go deprecateAssets(ctx, cfg, &wg)
326329

327330
select {
328331
case <-shutdownPending:
329332
case <-updatePending:
330333
}
331-
stopJobs() // Gracefully wait for the running jobs to finish.
332-
close(stop)
334+
stopJobs()
335+
wgJobs.Wait()
336+
stop()
333337
wg.Wait()
334338

335339
select {
@@ -339,7 +343,7 @@ func main() {
339343
}
340344
}
341345

342-
func deprecateAssets(cfg *Config, stop chan struct{}, wg *sync.WaitGroup) {
346+
func deprecateAssets(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
343347
defer wg.Done()
344348
if cfg.DashboardAddr == "" || cfg.AssetStorage.IsEmpty() ||
345349
!cfg.AssetStorage.DoDeprecation {
@@ -359,7 +363,7 @@ loop:
359363
for {
360364
const sleepDuration = 6 * time.Hour
361365
select {
362-
case <-stop:
366+
case <-ctx.Done():
363367
break loop
364368
case <-time.After(sleepDuration):
365369
}

0 commit comments

Comments
 (0)