Skip to content

Commit e32e893

Browse files
committed
syz-ci: use context for termination
1. Use context for the goroutines termination. 2. uploadCoverJSONLToGCS reached 8 params, refactor.
1 parent 874a138 commit e32e893

File tree

4 files changed

+73
-64
lines changed

4 files changed

+73
-64
lines changed

syz-ci/jobs.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package main
55

66
import (
77
"bytes"
8+
"context"
89
"errors"
910
"fmt"
1011
"io"
@@ -61,24 +62,16 @@ func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct
6162
}, nil
6263
}
6364

64-
// startLoop starts a job loop in parallel and returns a blocking function
65-
// to gracefully stop job processing.
66-
func (jm *JobManager) startLoop(wg *sync.WaitGroup) func() {
67-
stop := make(chan struct{})
68-
done := make(chan struct{}, 1)
65+
// startLoop starts a job loop in parallel.
66+
func (jm *JobManager) startLoop(ctx context.Context, wg *sync.WaitGroup) {
6967
wg.Add(1)
7068
go func() {
7169
defer wg.Done()
72-
jm.loop(stop)
73-
done <- struct{}{}
70+
jm.loop(ctx)
7471
}()
75-
return func() {
76-
close(stop)
77-
<-done
78-
}
7972
}
8073

81-
func (jm *JobManager) loop(stop chan struct{}) {
74+
func (jm *JobManager) loop(ctx context.Context) {
8275
if err := jm.resetJobs(); err != nil {
8376
if jm.dash != nil {
8477
jm.dash.LogError("syz-ci", "reset jobs failed: %v", err)
@@ -109,7 +102,7 @@ func (jm *JobManager) loop(stop chan struct{}) {
109102
wg.Add(1)
110103
go func() {
111104
defer wg.Done()
112-
jp.loop(stop)
105+
jp.loop(ctx)
113106
}()
114107
if !main || !jm.needParallelProcessor() {
115108
break
@@ -143,14 +136,14 @@ func (jm *JobManager) resetJobs() error {
143136
return nil
144137
}
145138

146-
func (jp *JobProcessor) loop(stop chan struct{}) {
139+
func (jp *JobProcessor) loop(ctx context.Context) {
147140
jp.Logf(0, "job loop started")
148141
loop:
149142
for {
150143
// Check jp.stop separately first, otherwise if stop signal arrives during a job execution,
151144
// we can still grab the next job with 50% probability.
152145
select {
153-
case <-stop:
146+
case <-ctx.Done():
154147
break loop
155148
default:
156149
}
@@ -166,7 +159,7 @@ loop:
166159
jp.pollJobs()
167160
case <-jp.commitTicker:
168161
jp.pollCommits()
169-
case <-stop:
162+
case <-ctx.Done():
170163
break loop
171164
}
172165
}

syz-ci/manager.go

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Manager struct {
8989
dash ManagerDashapi
9090
debugStorage bool
9191
storage *asset.Storage
92-
stop chan struct{}
9392
debug bool
9493
lastBuild *dashapi.Build
9594
buildFailed bool
@@ -105,8 +104,7 @@ type ManagerDashapi interface {
105104
UploadCommits(commits []dashapi.Commit) error
106105
}
107106

108-
func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
109-
debug bool) (*Manager, error) {
107+
func createManager(cfg *Config, mgrcfg *ManagerConfig, debug bool) (*Manager, error) {
110108
dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name))
111109
err := osutil.MkdirAll(dir)
112110
if err != nil {
@@ -157,7 +155,6 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
157155
managercfg: mgrcfg.managercfg,
158156
storage: assetStorage,
159157
debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug,
160-
stop: stop,
161158
debug: debug,
162159
}
163160
// Leave the dashboard interface value as nil if it does not wrap a valid dashboard pointer.
@@ -182,7 +179,7 @@ var testSem = instance.NewSemaphore(1)
182179
const fuzzingMinutesBeforeCover = 360
183180
const benchUploadPeriod = 30 * time.Minute
184181

185-
func (mgr *Manager) loop() {
182+
func (mgr *Manager) loop(ctx context.Context) {
186183
lastCommit := ""
187184
nextBuildTime := time.Now()
188185
var managerRestartTime, artifactUploadTime, benchUploadTime time.Time
@@ -210,20 +207,20 @@ loop:
210207
for {
211208
if time.Since(nextBuildTime) >= 0 {
212209
var rebuildAfter time.Duration
213-
lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(lastCommit, latestInfo)
210+
lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(ctx, lastCommit, latestInfo)
214211
nextBuildTime = time.Now().Add(rebuildAfter)
215212
}
216213
if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) {
217214
artifactUploadTime = time.Time{}
218-
if err := mgr.uploadCoverReport(); err != nil {
215+
if err := mgr.uploadCoverReport(ctx); err != nil {
219216
mgr.Errorf("failed to upload cover report: %v", err)
220217
}
221-
if err := mgr.uploadProgramsWithCoverage(); err != nil {
218+
if err := mgr.uploadProgramsWithCoverage(ctx); err != nil {
222219
mgr.Errorf("failed to upload programs with coverage: %v", err)
223220
}
224221
// Function uploadCoverStat also forces manager to drop the coverage structures to reduce memory usage.
225222
// Should be the last request touching the coverage data.
226-
if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil {
223+
if err := mgr.uploadCoverStat(ctx, fuzzingMinutesBeforeCover); err != nil {
227224
mgr.Errorf("failed to upload coverage stat: %v", err)
228225
}
229226
if err := mgr.uploadCorpus(); err != nil {
@@ -232,13 +229,13 @@ loop:
232229
}
233230
if mgr.cfg.BenchUploadPath != "" && time.Now().After(benchUploadTime) {
234231
benchUploadTime = time.Now().Add(benchUploadPeriod)
235-
if err := mgr.uploadBenchData(); err != nil {
232+
if err := mgr.uploadBenchData(ctx); err != nil {
236233
mgr.Errorf("failed to upload bench: %v", err)
237234
}
238235
}
239236

240237
select {
241-
case <-mgr.stop:
238+
case <-ctx.Done():
242239
break loop
243240
default:
244241
}
@@ -253,7 +250,7 @@ loop:
253250

254251
select {
255252
case <-ticker.C:
256-
case <-mgr.stop:
253+
case <-ctx.Done():
257254
break loop
258255
}
259256
}
@@ -275,7 +272,7 @@ func (mgr *Manager) archiveCommit(commit string) {
275272
}
276273
}
277274

278-
func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
275+
func (mgr *Manager) pollAndBuild(ctx context.Context, lastCommit string, latestInfo *BuildInfo) (
279276
string, *BuildInfo, time.Duration) {
280277
rebuildAfter := buildRetryPeriod
281278
commit, err := mgr.repo.Poll(mgr.mgrcfg.Repo, mgr.mgrcfg.Branch)
@@ -307,7 +304,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
307304
}
308305
}
309306
buildSem.Signal()
310-
case <-mgr.stop:
307+
case <-ctx.Done():
311308
}
312309
}
313310
}
@@ -848,7 +845,7 @@ func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Respon
848845
return client.Do(req)
849846
}
850847

851-
func (mgr *Manager) uploadCoverReport() error {
848+
func (mgr *Manager) uploadCoverReport(ctx context.Context) error {
852849
directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != ""
853850
if mgr.storage == nil && !directUpload {
854851
// Cover report uploading is disabled.
@@ -860,7 +857,7 @@ func (mgr *Manager) uploadCoverReport() error {
860857
// Report generation can consume lots of memory. Generate one at a time.
861858
select {
862859
case <-buildSem.WaitC():
863-
case <-mgr.stop:
860+
case <-ctx.Done():
864861
return nil
865862
}
866863
defer buildSem.Signal()
@@ -886,8 +883,14 @@ func (mgr *Manager) uploadCoverReport() error {
886883
return nil
887884
}
888885

889-
func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string,
890-
publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
886+
type uploadOptions struct {
887+
nameSuffix string
888+
publish bool
889+
compress bool
890+
}
891+
892+
func (mgr *Manager) uploadCoverJSONLToGCS(ctx context.Context, gcsClient gcs.Client, mgrSrc, gcsDest string,
893+
opts uploadOptions, f func(io.Writer, *json.Decoder) error) error {
891894
if !mgr.managercfg.Cover || gcsDest == "" {
892895
return nil
893896
}
@@ -896,12 +899,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
896899
// TODO: remove it once #4585 (symbolization tuning) is closed
897900
select {
898901
case <-buildSem.WaitC():
899-
case <-mgr.stop:
902+
case <-ctx.Done():
900903
return nil
901904
}
902905
defer buildSem.Signal()
903906

904-
eg, egCtx := errgroup.WithContext(context.Background())
907+
eg, egCtx := errgroup.WithContext(ctx)
905908
resp, err := mgr.httpGET(egCtx, mgrSrc)
906909
if err != nil {
907910
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
@@ -919,7 +922,7 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
919922
defer pw.Close()
920923
var w io.Writer
921924
w = pw
922-
if compress {
925+
if opts.compress {
923926
gzw := gzip.NewWriter(pw)
924927
defer gzw.Close()
925928
w = gzw
@@ -934,26 +937,28 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
934937
})
935938
eg.Go(func() error {
936939
defer pr.Close()
937-
fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, nameSuffix)
938-
if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil {
940+
fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, opts.nameSuffix)
941+
if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, opts.publish); err != nil {
939942
return fmt.Errorf("uploadFile: %w", err)
940943
}
941944
return nil
942945
})
943946
return eg.Wait()
944947
}
945948

946-
func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
949+
func (mgr *Manager) uploadCoverStat(ctx context.Context, fuzzingMinutes int) error {
947950
// Coverage report generation consumes and caches lots of memory.
948951
// In the syz-ci context report generation won't be used after this point,
949952
// so tell manager to flush report generator.
950953
curTime := time.Now()
951-
if err := mgr.uploadCoverJSONLToGCS(nil,
954+
if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
952955
"/cover?jsonl=1&flush=1",
953956
mgr.cfg.CoverPipelinePath,
954-
time.Now().Format("-2006-01-02-15-04"),
955-
false,
956-
false,
957+
uploadOptions{
958+
nameSuffix: time.Now().Format("-2006-01-02-15-04"),
959+
publish: false,
960+
compress: false,
961+
},
957962
func(w io.Writer, dec *json.Decoder) error {
958963
var covInfo cover.CoverageInfo
959964
if err := dec.Decode(&covInfo); err != nil {
@@ -979,13 +984,15 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
979984
return nil
980985
}
981986

982-
func (mgr *Manager) uploadProgramsWithCoverage() error {
983-
if err := mgr.uploadCoverJSONLToGCS(nil,
987+
func (mgr *Manager) uploadProgramsWithCoverage(ctx context.Context) error {
988+
if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
984989
"/coverprogs?jsonl=1",
985990
mgr.cfg.CoverProgramsPath,
986-
"",
987-
mgr.cfg.PublishGCS,
988-
true,
991+
uploadOptions{
992+
nameSuffix: "",
993+
publish: mgr.cfg.PublishGCS,
994+
compress: true,
995+
},
989996
func(w io.Writer, dec *json.Decoder) error {
990997
var programCoverage cover.ProgramCoverage
991998
if err := dec.Decode(&programCoverage); err != nil {
@@ -1015,7 +1022,7 @@ func (mgr *Manager) uploadCorpus() error {
10151022
return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
10161023
}
10171024

1018-
func (mgr *Manager) uploadBenchData() error {
1025+
func (mgr *Manager) uploadBenchData(ctx context.Context) error {
10191026
if mgr.lastRestarted.IsZero() {
10201027
return nil
10211028
}
@@ -1030,7 +1037,7 @@ func (mgr *Manager) uploadBenchData() error {
10301037
return fmt.Errorf("failed to open bench file: %w", err)
10311038
}
10321039
defer f.Close()
1033-
err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
1040+
err = uploadFile(ctx, nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
10341041
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
10351042
if err != nil {
10361043
return fmt.Errorf("failed to upload the bench file: %w", err)

syz-ci/manager_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package main
66
import (
77
"bytes"
88
"compress/gzip"
9+
"context"
910
"encoding/json"
1011
"fmt"
1112
"io"
@@ -200,11 +201,15 @@ func TestUploadCoverJSONLToGCS(t *testing.T) {
200201
gcsMock.On("Publish", test.wantGCSFileName).
201202
Return(nil).Once()
202203
}
203-
err := mgr.uploadCoverJSONLToGCS(gcsMock,
204+
err := mgr.uploadCoverJSONLToGCS(context.Background(), gcsMock,
204205
"/teststream&jsonl=1",
205206
"gs://test-bucket",
206-
test.inputNameSuffix,
207-
test.inputPublish, test.inputCompress, func(w io.Writer, dec *json.Decoder) error {
207+
uploadOptions{
208+
nameSuffix: test.inputNameSuffix,
209+
publish: test.inputPublish,
210+
compress: test.inputCompress,
211+
},
212+
func(w io.Writer, dec *json.Decoder) error {
208213
var v any
209214
if err := dec.Decode(&v); err != nil {
210215
return fmt.Errorf("cover.ProgramCoverage: %w", err)

0 commit comments

Comments
 (0)