Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions syz-ci/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -61,24 +62,16 @@ func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct
}, nil
}

// startLoop starts a job loop in parallel and returns a blocking function
// to gracefully stop job processing.
func (jm *JobManager) startLoop(wg *sync.WaitGroup) func() {
stop := make(chan struct{})
done := make(chan struct{}, 1)
// startLoop starts a job loop in parallel.
func (jm *JobManager) startLoop(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
jm.loop(stop)
done <- struct{}{}
jm.loop(ctx)
}()
return func() {
close(stop)
<-done
}
}

func (jm *JobManager) loop(stop chan struct{}) {
func (jm *JobManager) loop(ctx context.Context) {
if err := jm.resetJobs(); err != nil {
if jm.dash != nil {
jm.dash.LogError("syz-ci", "reset jobs failed: %v", err)
Expand Down Expand Up @@ -109,7 +102,7 @@ func (jm *JobManager) loop(stop chan struct{}) {
wg.Add(1)
go func() {
defer wg.Done()
jp.loop(stop)
jp.loop(ctx)
}()
if !main || !jm.needParallelProcessor() {
break
Expand Down Expand Up @@ -143,14 +136,14 @@ func (jm *JobManager) resetJobs() error {
return nil
}

func (jp *JobProcessor) loop(stop chan struct{}) {
func (jp *JobProcessor) loop(ctx context.Context) {
jp.Logf(0, "job loop started")
loop:
for {
// Check jp.stop separately first, otherwise if stop signal arrives during a job execution,
// we can still grab the next job with 50% probability.
select {
case <-stop:
case <-ctx.Done():
break loop
default:
}
Expand All @@ -166,7 +159,7 @@ loop:
jp.pollJobs()
case <-jp.commitTicker:
jp.pollCommits()
case <-stop:
case <-ctx.Done():
break loop
}
}
Expand Down
77 changes: 42 additions & 35 deletions syz-ci/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ type Manager struct {
dash ManagerDashapi
debugStorage bool
storage *asset.Storage
stop chan struct{}
debug bool
lastBuild *dashapi.Build
buildFailed bool
Expand All @@ -105,8 +104,7 @@ type ManagerDashapi interface {
UploadCommits(commits []dashapi.Commit) error
}

func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
debug bool) (*Manager, error) {
func createManager(cfg *Config, mgrcfg *ManagerConfig, debug bool) (*Manager, error) {
dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name))
err := osutil.MkdirAll(dir)
if err != nil {
Expand Down Expand Up @@ -157,7 +155,6 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
managercfg: mgrcfg.managercfg,
storage: assetStorage,
debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug,
stop: stop,
debug: debug,
}
// Leave the dashboard interface value as nil if it does not wrap a valid dashboard pointer.
Expand All @@ -182,7 +179,7 @@ var testSem = instance.NewSemaphore(1)
const fuzzingMinutesBeforeCover = 360
const benchUploadPeriod = 30 * time.Minute

func (mgr *Manager) loop() {
func (mgr *Manager) loop(ctx context.Context) {
lastCommit := ""
nextBuildTime := time.Now()
var managerRestartTime, artifactUploadTime, benchUploadTime time.Time
Expand Down Expand Up @@ -210,20 +207,20 @@ loop:
for {
if time.Since(nextBuildTime) >= 0 {
var rebuildAfter time.Duration
lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(lastCommit, latestInfo)
lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(ctx, lastCommit, latestInfo)
nextBuildTime = time.Now().Add(rebuildAfter)
}
if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) {
artifactUploadTime = time.Time{}
if err := mgr.uploadCoverReport(); err != nil {
if err := mgr.uploadCoverReport(ctx); err != nil {
mgr.Errorf("failed to upload cover report: %v", err)
}
if err := mgr.uploadProgramsWithCoverage(); err != nil {
if err := mgr.uploadProgramsWithCoverage(ctx); err != nil {
mgr.Errorf("failed to upload programs with coverage: %v", err)
}
// Function uploadCoverStat also forces manager to drop the coverage structures to reduce memory usage.
// Should be the last request touching the coverage data.
if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil {
if err := mgr.uploadCoverStat(ctx, fuzzingMinutesBeforeCover); err != nil {
mgr.Errorf("failed to upload coverage stat: %v", err)
}
if err := mgr.uploadCorpus(); err != nil {
Expand All @@ -232,13 +229,13 @@ loop:
}
if mgr.cfg.BenchUploadPath != "" && time.Now().After(benchUploadTime) {
benchUploadTime = time.Now().Add(benchUploadPeriod)
if err := mgr.uploadBenchData(); err != nil {
if err := mgr.uploadBenchData(ctx); err != nil {
mgr.Errorf("failed to upload bench: %v", err)
}
}

select {
case <-mgr.stop:
case <-ctx.Done():
break loop
default:
}
Expand All @@ -253,7 +250,7 @@ loop:

select {
case <-ticker.C:
case <-mgr.stop:
case <-ctx.Done():
break loop
}
}
Expand All @@ -275,7 +272,7 @@ func (mgr *Manager) archiveCommit(commit string) {
}
}

func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
func (mgr *Manager) pollAndBuild(ctx context.Context, lastCommit string, latestInfo *BuildInfo) (
string, *BuildInfo, time.Duration) {
rebuildAfter := buildRetryPeriod
commit, err := mgr.repo.Poll(mgr.mgrcfg.Repo, mgr.mgrcfg.Branch)
Expand Down Expand Up @@ -307,7 +304,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
}
}
buildSem.Signal()
case <-mgr.stop:
case <-ctx.Done():
}
}
}
Expand Down Expand Up @@ -848,7 +845,7 @@ func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Respon
return client.Do(req)
}

func (mgr *Manager) uploadCoverReport() error {
func (mgr *Manager) uploadCoverReport(ctx context.Context) error {
directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != ""
if mgr.storage == nil && !directUpload {
// Cover report uploading is disabled.
Expand All @@ -860,7 +857,7 @@ func (mgr *Manager) uploadCoverReport() error {
// Report generation can consume lots of memory. Generate one at a time.
select {
case <-buildSem.WaitC():
case <-mgr.stop:
case <-ctx.Done():
return nil
}
defer buildSem.Signal()
Expand All @@ -886,8 +883,14 @@ func (mgr *Manager) uploadCoverReport() error {
return nil
}

func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string,
publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
type uploadOptions struct {
nameSuffix string
publish bool
compress bool
}

func (mgr *Manager) uploadCoverJSONLToGCS(ctx context.Context, gcsClient gcs.Client, mgrSrc, gcsDest string,
opts uploadOptions, f func(io.Writer, *json.Decoder) error) error {
if !mgr.managercfg.Cover || gcsDest == "" {
return nil
}
Expand All @@ -896,12 +899,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
// TODO: remove it once #4585 (symbolization tuning) is closed
select {
case <-buildSem.WaitC():
case <-mgr.stop:
case <-ctx.Done():
return nil
}
defer buildSem.Signal()

eg, egCtx := errgroup.WithContext(context.Background())
eg, egCtx := errgroup.WithContext(ctx)
resp, err := mgr.httpGET(egCtx, mgrSrc)
if err != nil {
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
Expand All @@ -919,7 +922,7 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
defer pw.Close()
var w io.Writer
w = pw
if compress {
if opts.compress {
gzw := gzip.NewWriter(pw)
defer gzw.Close()
w = gzw
Expand All @@ -934,26 +937,28 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
})
eg.Go(func() error {
defer pr.Close()
fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, nameSuffix)
if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil {
fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, opts.nameSuffix)
if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, opts.publish); err != nil {
return fmt.Errorf("uploadFile: %w", err)
}
return nil
})
return eg.Wait()
}

func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
func (mgr *Manager) uploadCoverStat(ctx context.Context, fuzzingMinutes int) error {
// Coverage report generation consumes and caches lots of memory.
// In the syz-ci context report generation won't be used after this point,
// so tell manager to flush report generator.
curTime := time.Now()
if err := mgr.uploadCoverJSONLToGCS(nil,
if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
"/cover?jsonl=1&flush=1",
mgr.cfg.CoverPipelinePath,
time.Now().Format("-2006-01-02-15-04"),
false,
false,
uploadOptions{
nameSuffix: time.Now().Format("-2006-01-02-15-04"),
publish: false,
compress: false,
},
func(w io.Writer, dec *json.Decoder) error {
var covInfo cover.CoverageInfo
if err := dec.Decode(&covInfo); err != nil {
Expand All @@ -979,13 +984,15 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
return nil
}

func (mgr *Manager) uploadProgramsWithCoverage() error {
if err := mgr.uploadCoverJSONLToGCS(nil,
func (mgr *Manager) uploadProgramsWithCoverage(ctx context.Context) error {
if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
"/coverprogs?jsonl=1",
mgr.cfg.CoverProgramsPath,
"",
mgr.cfg.PublishGCS,
true,
uploadOptions{
nameSuffix: "",
publish: mgr.cfg.PublishGCS,
compress: true,
},
func(w io.Writer, dec *json.Decoder) error {
var programCoverage cover.ProgramCoverage
if err := dec.Decode(&programCoverage); err != nil {
Expand Down Expand Up @@ -1015,7 +1022,7 @@ func (mgr *Manager) uploadCorpus() error {
return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
}

func (mgr *Manager) uploadBenchData() error {
func (mgr *Manager) uploadBenchData(ctx context.Context) error {
if mgr.lastRestarted.IsZero() {
return nil
}
Expand All @@ -1030,7 +1037,7 @@ func (mgr *Manager) uploadBenchData() error {
return fmt.Errorf("failed to open bench file: %w", err)
}
defer f.Close()
err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
err = uploadFile(ctx, nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
if err != nil {
return fmt.Errorf("failed to upload the bench file: %w", err)
Expand Down
11 changes: 8 additions & 3 deletions syz-ci/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -200,11 +201,15 @@ func TestUploadCoverJSONLToGCS(t *testing.T) {
gcsMock.On("Publish", test.wantGCSFileName).
Return(nil).Once()
}
err := mgr.uploadCoverJSONLToGCS(gcsMock,
err := mgr.uploadCoverJSONLToGCS(context.Background(), gcsMock,
"/teststream&jsonl=1",
"gs://test-bucket",
test.inputNameSuffix,
test.inputPublish, test.inputCompress, func(w io.Writer, dec *json.Decoder) error {
uploadOptions{
nameSuffix: test.inputNameSuffix,
publish: test.inputPublish,
compress: test.inputCompress,
},
func(w io.Writer, dec *json.Decoder) error {
var v any
if err := dec.Decode(&v); err != nil {
return fmt.Errorf("cover.ProgramCoverage: %w", err)
Expand Down
Loading