diff --git a/syz-ci/jobs.go b/syz-ci/jobs.go index 0c2df74d432b..a9bb23abcb2e 100644 --- a/syz-ci/jobs.go +++ b/syz-ci/jobs.go @@ -5,6 +5,7 @@ package main import ( "bytes" + "context" "errors" "fmt" "io" @@ -61,24 +62,16 @@ func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct }, nil } -// startLoop starts a job loop in parallel and returns a blocking function -// to gracefully stop job processing. -func (jm *JobManager) startLoop(wg *sync.WaitGroup) func() { - stop := make(chan struct{}) - done := make(chan struct{}, 1) +// startLoop starts a job loop in parallel. +func (jm *JobManager) startLoop(ctx context.Context, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() - jm.loop(stop) - done <- struct{}{} + jm.loop(ctx) }() - return func() { - close(stop) - <-done - } } -func (jm *JobManager) loop(stop chan struct{}) { +func (jm *JobManager) loop(ctx context.Context) { if err := jm.resetJobs(); err != nil { if jm.dash != nil { jm.dash.LogError("syz-ci", "reset jobs failed: %v", err) @@ -109,7 +102,7 @@ func (jm *JobManager) loop(stop chan struct{}) { wg.Add(1) go func() { defer wg.Done() - jp.loop(stop) + jp.loop(ctx) }() if !main || !jm.needParallelProcessor() { break @@ -143,14 +136,14 @@ func (jm *JobManager) resetJobs() error { return nil } -func (jp *JobProcessor) loop(stop chan struct{}) { +func (jp *JobProcessor) loop(ctx context.Context) { jp.Logf(0, "job loop started") loop: for { // Check jp.stop separately first, otherwise if stop signal arrives during a job execution, // we can still grab the next job with 50% probability. select { - case <-stop: + case <-ctx.Done(): break loop default: } @@ -166,7 +159,7 @@ loop: jp.pollJobs() case <-jp.commitTicker: jp.pollCommits() - case <-stop: + case <-ctx.Done(): break loop } } diff --git a/syz-ci/manager.go b/syz-ci/manager.go index 297828d79176..2036805c92d7 100644 --- a/syz-ci/manager.go +++ b/syz-ci/manager.go @@ -89,7 +89,6 @@ type Manager struct { dash ManagerDashapi debugStorage bool storage *asset.Storage - stop chan struct{} debug bool lastBuild *dashapi.Build buildFailed bool @@ -105,8 +104,7 @@ type ManagerDashapi interface { UploadCommits(commits []dashapi.Commit) error } -func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{}, - debug bool) (*Manager, error) { +func createManager(cfg *Config, mgrcfg *ManagerConfig, debug bool) (*Manager, error) { dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name)) err := osutil.MkdirAll(dir) if err != nil { @@ -157,7 +155,6 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{}, managercfg: mgrcfg.managercfg, storage: assetStorage, debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug, - stop: stop, debug: debug, } // Leave the dashboard interface value as nil if it does not wrap a valid dashboard pointer. @@ -182,7 +179,7 @@ var testSem = instance.NewSemaphore(1) const fuzzingMinutesBeforeCover = 360 const benchUploadPeriod = 30 * time.Minute -func (mgr *Manager) loop() { +func (mgr *Manager) loop(ctx context.Context) { lastCommit := "" nextBuildTime := time.Now() var managerRestartTime, artifactUploadTime, benchUploadTime time.Time @@ -210,20 +207,20 @@ loop: for { if time.Since(nextBuildTime) >= 0 { var rebuildAfter time.Duration - lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(lastCommit, latestInfo) + lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(ctx, lastCommit, latestInfo) nextBuildTime = time.Now().Add(rebuildAfter) } if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) { artifactUploadTime = time.Time{} - if err := mgr.uploadCoverReport(); err != nil { + if err := mgr.uploadCoverReport(ctx); err != nil { mgr.Errorf("failed to upload cover report: %v", err) } - if err := mgr.uploadProgramsWithCoverage(); err != nil { + if err := mgr.uploadProgramsWithCoverage(ctx); err != nil { mgr.Errorf("failed to upload programs with coverage: %v", err) } // Function uploadCoverStat also forces manager to drop the coverage structures to reduce memory usage. // Should be the last request touching the coverage data. - if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil { + if err := mgr.uploadCoverStat(ctx, fuzzingMinutesBeforeCover); err != nil { mgr.Errorf("failed to upload coverage stat: %v", err) } if err := mgr.uploadCorpus(); err != nil { @@ -232,13 +229,13 @@ loop: } if mgr.cfg.BenchUploadPath != "" && time.Now().After(benchUploadTime) { benchUploadTime = time.Now().Add(benchUploadPeriod) - if err := mgr.uploadBenchData(); err != nil { + if err := mgr.uploadBenchData(ctx); err != nil { mgr.Errorf("failed to upload bench: %v", err) } } select { - case <-mgr.stop: + case <-ctx.Done(): break loop default: } @@ -253,7 +250,7 @@ loop: select { case <-ticker.C: - case <-mgr.stop: + case <-ctx.Done(): break loop } } @@ -275,7 +272,7 @@ func (mgr *Manager) archiveCommit(commit string) { } } -func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) ( +func (mgr *Manager) pollAndBuild(ctx context.Context, lastCommit string, latestInfo *BuildInfo) ( string, *BuildInfo, time.Duration) { rebuildAfter := buildRetryPeriod commit, err := mgr.repo.Poll(mgr.mgrcfg.Repo, mgr.mgrcfg.Branch) @@ -307,7 +304,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) ( } } buildSem.Signal() - case <-mgr.stop: + case <-ctx.Done(): } } } @@ -848,7 +845,7 @@ func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Respon return client.Do(req) } -func (mgr *Manager) uploadCoverReport() error { +func (mgr *Manager) uploadCoverReport(ctx context.Context) error { directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != "" if mgr.storage == nil && !directUpload { // Cover report uploading is disabled. @@ -860,7 +857,7 @@ func (mgr *Manager) uploadCoverReport() error { // Report generation can consume lots of memory. Generate one at a time. select { case <-buildSem.WaitC(): - case <-mgr.stop: + case <-ctx.Done(): return nil } defer buildSem.Signal() @@ -886,8 +883,14 @@ func (mgr *Manager) uploadCoverReport() error { return nil } -func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string, - publish, compress bool, f func(io.Writer, *json.Decoder) error) error { +type uploadOptions struct { + nameSuffix string + publish bool + compress bool +} + +func (mgr *Manager) uploadCoverJSONLToGCS(ctx context.Context, gcsClient gcs.Client, mgrSrc, gcsDest string, + opts uploadOptions, f func(io.Writer, *json.Decoder) error) error { if !mgr.managercfg.Cover || gcsDest == "" { return nil } @@ -896,12 +899,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, // TODO: remove it once #4585 (symbolization tuning) is closed select { case <-buildSem.WaitC(): - case <-mgr.stop: + case <-ctx.Done(): return nil } defer buildSem.Signal() - eg, egCtx := errgroup.WithContext(context.Background()) + eg, egCtx := errgroup.WithContext(ctx) resp, err := mgr.httpGET(egCtx, mgrSrc) if err != nil { return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err) @@ -919,7 +922,7 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, defer pw.Close() var w io.Writer w = pw - if compress { + if opts.compress { gzw := gzip.NewWriter(pw) defer gzw.Close() w = gzw @@ -934,8 +937,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, }) eg.Go(func() error { defer pr.Close() - fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, nameSuffix) - if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil { + fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, opts.nameSuffix) + if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, opts.publish); err != nil { return fmt.Errorf("uploadFile: %w", err) } return nil @@ -943,17 +946,19 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, return eg.Wait() } -func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { +func (mgr *Manager) uploadCoverStat(ctx context.Context, fuzzingMinutes int) error { // Coverage report generation consumes and caches lots of memory. // In the syz-ci context report generation won't be used after this point, // so tell manager to flush report generator. curTime := time.Now() - if err := mgr.uploadCoverJSONLToGCS(nil, + if err := mgr.uploadCoverJSONLToGCS(ctx, nil, "/cover?jsonl=1&flush=1", mgr.cfg.CoverPipelinePath, - time.Now().Format("-2006-01-02-15-04"), - false, - false, + uploadOptions{ + nameSuffix: time.Now().Format("-2006-01-02-15-04"), + publish: false, + compress: false, + }, func(w io.Writer, dec *json.Decoder) error { var covInfo cover.CoverageInfo if err := dec.Decode(&covInfo); err != nil { @@ -979,13 +984,15 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { return nil } -func (mgr *Manager) uploadProgramsWithCoverage() error { - if err := mgr.uploadCoverJSONLToGCS(nil, +func (mgr *Manager) uploadProgramsWithCoverage(ctx context.Context) error { + if err := mgr.uploadCoverJSONLToGCS(ctx, nil, "/coverprogs?jsonl=1", mgr.cfg.CoverProgramsPath, - "", - mgr.cfg.PublishGCS, - true, + uploadOptions{ + nameSuffix: "", + publish: mgr.cfg.PublishGCS, + compress: true, + }, func(w io.Writer, dec *json.Decoder) error { var programCoverage cover.ProgramCoverage if err := dec.Decode(&programCoverage); err != nil { @@ -1015,7 +1022,7 @@ func (mgr *Manager) uploadCorpus() error { return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS) } -func (mgr *Manager) uploadBenchData() error { +func (mgr *Manager) uploadBenchData(ctx context.Context) error { if mgr.lastRestarted.IsZero() { return nil } @@ -1030,7 +1037,7 @@ func (mgr *Manager) uploadBenchData() error { return fmt.Errorf("failed to open bench file: %w", err) } defer f.Close() - err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name, + err = uploadFile(ctx, nil, mgr.cfg.BenchUploadPath+"/"+mgr.name, mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false) if err != nil { return fmt.Errorf("failed to upload the bench file: %w", err) diff --git a/syz-ci/manager_test.go b/syz-ci/manager_test.go index 9fd980ed2c97..e9ca7071dd64 100644 --- a/syz-ci/manager_test.go +++ b/syz-ci/manager_test.go @@ -6,6 +6,7 @@ package main import ( "bytes" "compress/gzip" + "context" "encoding/json" "fmt" "io" @@ -200,11 +201,15 @@ func TestUploadCoverJSONLToGCS(t *testing.T) { gcsMock.On("Publish", test.wantGCSFileName). Return(nil).Once() } - err := mgr.uploadCoverJSONLToGCS(gcsMock, + err := mgr.uploadCoverJSONLToGCS(context.Background(), gcsMock, "/teststream&jsonl=1", "gs://test-bucket", - test.inputNameSuffix, - test.inputPublish, test.inputCompress, func(w io.Writer, dec *json.Decoder) error { + uploadOptions{ + nameSuffix: test.inputNameSuffix, + publish: test.inputPublish, + compress: test.inputCompress, + }, + func(w io.Writer, dec *json.Decoder) error { var v any if err := dec.Decode(&v); err != nil { return fmt.Errorf("cover.ProgramCoverage: %w", err) diff --git a/syz-ci/syz-ci.go b/syz-ci/syz-ci.go index 74afa2b89010..3cefab1bc8a8 100644 --- a/syz-ci/syz-ci.go +++ b/syz-ci/syz-ci.go @@ -52,6 +52,7 @@ package main // modification time allows us to understand if we need to rebuild after a restart. import ( + "context" "encoding/json" "flag" "fmt" @@ -281,10 +282,10 @@ func main() { }() } - stop := make(chan struct{}) + ctx, stop := context.WithCancel(context.Background()) var managers []*Manager for _, mgrcfg := range cfg.Managers { - mgr, err := createManager(cfg, mgrcfg, stop, *flagDebug) + mgr, err := createManager(cfg, mgrcfg, *flagDebug) if err != nil { log.Errorf("failed to create manager %v: %v", mgrcfg.Name, err) continue @@ -300,7 +301,7 @@ func main() { wg.Add(1) go func() { defer wg.Done() - mgr.loop() + mgr.loop(ctx) }() } } @@ -308,12 +309,14 @@ func main() { if err != nil { log.Fatalf("failed to create dashapi connection %v", err) } - stopJobs := jp.startLoop(&wg) + ctxJobs, stopJobs := context.WithCancel(ctx) + wgJobs := sync.WaitGroup{} + jp.startLoop(ctxJobs, &wgJobs) // For testing. Racy. Use with care. http.HandleFunc("/upload_cover", func(w http.ResponseWriter, r *http.Request) { for _, mgr := range managers { - if err := mgr.uploadCoverReport(); err != nil { + if err := mgr.uploadCoverReport(ctx); err != nil { w.Write([]byte(fmt.Sprintf("failed for %v: %v
\n", mgr.name, err))) return } @@ -322,14 +325,15 @@ func main() { }) wg.Add(1) - go deprecateAssets(cfg, stop, &wg) + go deprecateAssets(ctx, cfg, &wg) select { case <-shutdownPending: case <-updatePending: } - stopJobs() // Gracefully wait for the running jobs to finish. - close(stop) + stopJobs() + wgJobs.Wait() + stop() wg.Wait() select { @@ -339,7 +343,7 @@ func main() { } } -func deprecateAssets(cfg *Config, stop chan struct{}, wg *sync.WaitGroup) { +func deprecateAssets(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { defer wg.Done() if cfg.DashboardAddr == "" || cfg.AssetStorage.IsEmpty() || !cfg.AssetStorage.DoDeprecation { @@ -359,7 +363,7 @@ loop: for { const sleepDuration = 6 * time.Hour select { - case <-stop: + case <-ctx.Done(): break loop case <-time.After(sleepDuration): }