Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Client interface {
FileExists(path string) (bool, error)
ListObjects(path string) ([]*Object, error)

publish(path string) error
Publish(path string) error
}

type UploadOptions struct {
Expand Down Expand Up @@ -63,7 +63,7 @@ func UploadFile(ctx context.Context, srcFile io.Reader, destURL string, opts Upl
return fmt.Errorf("gcsWriter.Close: %w", err)
}
if opts.Publish {
return gcsClient.publish(destURL)
return gcsClient.Publish(destURL)
}
return nil
}
Expand Down Expand Up @@ -143,8 +143,8 @@ func (c *client) FileWriter(gcsFile, contentType, contentEncoding string) (io.Wr
return w, nil
}

// publish lets any user read gcsFile.
func (c *client) publish(gcsFile string) error {
// Publish lets any user read gcsFile.
func (c *client) Publish(gcsFile string) error {
bucket, filename, err := split(gcsFile)
if err != nil {
return err
Expand Down
36 changes: 18 additions & 18 deletions pkg/gcs/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 55 additions & 31 deletions syz-ci/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"compress/gzip"
"context"
"crypto/sha256"
"encoding/json"
Expand Down Expand Up @@ -36,6 +37,7 @@ import (
"github.com/google/syzkaller/prog"
_ "github.com/google/syzkaller/sys"
"github.com/google/syzkaller/sys/targets"
"golang.org/x/sync/errgroup"
)

// This is especially slightly longer than syzkaller rebuild period.
Expand Down Expand Up @@ -831,15 +833,19 @@ func (mgr *Manager) uploadBuildAssets(buildInfo *dashapi.Build, assetFolder stri
return ret, nil
}

func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) {
func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Response, err error) {
addr := mgr.managercfg.HTTP
if addr != "" && addr[0] == ':' {
addr = "127.0.0.1" + addr // in case addr is ":port"
}
client := &http.Client{
Timeout: time.Hour,
}
return client.Get(fmt.Sprintf("http://%s%s", addr, path))
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s%s", addr, path), nil)
if err != nil {
return nil, err
}
return client.Do(req)
}

func (mgr *Manager) uploadCoverReport() error {
Expand All @@ -859,13 +865,13 @@ func (mgr *Manager) uploadCoverReport() error {
}
defer buildSem.Signal()

resp, err := mgr.httpGET("/cover")
resp, err := mgr.httpGET(context.Background(), "/cover")
if err != nil {
return fmt.Errorf("failed to get report: %w", err)
}
defer resp.Body.Close()
if directUpload {
return uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS)
return uploadFile(context.Background(), nil, mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS)
}
// Upload via the asset storage.
newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html",
Expand All @@ -880,8 +886,8 @@ func (mgr *Manager) uploadCoverReport() error {
return nil
}

func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.Time, publish bool,
f func(io.Writer, *json.Decoder) error) error {
func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest string, curTime time.Time,
publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
if !mgr.managercfg.Cover || gcsDest == "" {
return nil
}
Expand All @@ -895,7 +901,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T
}
defer buildSem.Signal()

resp, err := mgr.httpGET(mgrSrc)
eg, egCtx := errgroup.WithContext(context.Background())
resp, err := mgr.httpGET(egCtx, mgrSrc)
if err != nil {
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
}
Expand All @@ -908,36 +915,48 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T
}

pr, pw := io.Pipe()
defer pr.Close()
go func() {
eg.Go(func() error {
defer pw.Close()
var w io.Writer
w = pw
if compress {
gzw := gzip.NewWriter(pw)
defer gzw.Close()
w = gzw
}
decoder := json.NewDecoder(resp.Body)
for decoder.More() {
if err := f(pw, decoder); err != nil {
pw.CloseWithError(fmt.Errorf("callback: %w", err))
return
if err := f(w, decoder); err != nil {
return fmt.Errorf("callback: %w", err)
}
}
pw.Close()
}()
fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl",
mgr.mgrcfg.DashboardClient,
mgr.name, curTime.Format(time.DateOnly),
curTime.Hour(), curTime.Minute())
if err := uploadFile(gcsDest, fileName, pr, publish); err != nil {
return fmt.Errorf("failed to uploadFileGCS(): %w", err)
}
return nil
return nil
})
eg.Go(func() error {
defer pr.Close()
fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl",
mgr.mgrcfg.DashboardClient,
mgr.name, curTime.Format(time.DateOnly),
curTime.Hour(), curTime.Minute())
if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil {
return fmt.Errorf("uploadFile: %w", err)
}
return nil
})
return eg.Wait()
}

func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
// Coverage report generation consumes and caches lots of memory.
// In the syz-ci context report generation won't be used after this point,
// so tell manager to flush report generator.
curTime := time.Now()
if err := mgr.uploadCoverJSONLToGCS("/cover?jsonl=1&flush=1",
if err := mgr.uploadCoverJSONLToGCS(nil,
"/cover?jsonl=1&flush=1",
mgr.cfg.CoverPipelinePath,
curTime,
false,
false,
func(w io.Writer, dec *json.Decoder) error {
var covInfo cover.CoverageInfo
if err := dec.Decode(&covInfo); err != nil {
Expand All @@ -964,10 +983,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
}

func (mgr *Manager) uploadProgramsWithCoverage() error {
if err := mgr.uploadCoverJSONLToGCS("/coverprogs?jsonl=1",
if err := mgr.uploadCoverJSONLToGCS(nil,
"/coverprogs?jsonl=1",
mgr.cfg.CoverProgramsPath,
time.Now(),
mgr.cfg.PublishGCS,
true,
func(w io.Writer, dec *json.Decoder) error {
var programCoverage cover.ProgramCoverage
if err := dec.Decode(&programCoverage); err != nil {
Expand All @@ -994,7 +1015,7 @@ func (mgr *Manager) uploadCorpus() error {
return err
}
defer f.Close()
return uploadFile(mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
}

func (mgr *Manager) uploadBenchData() error {
Expand All @@ -1012,15 +1033,15 @@ func (mgr *Manager) uploadBenchData() error {
return fmt.Errorf("failed to open bench file: %w", err)
}
defer f.Close()
err = uploadFile(mgr.cfg.BenchUploadPath+"/"+mgr.name,
err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
if err != nil {
return fmt.Errorf("failed to upload the bench file: %w", err)
}
return nil
}

func uploadFile(dstPath, name string, file io.Reader, publish bool) error {
func uploadFile(ctx context.Context, gcsClient gcs.Client, dstPath, name string, file io.Reader, publish bool) error {
URL, err := url.Parse(dstPath)
if err != nil {
return fmt.Errorf("failed to parse upload path: %w", err)
Expand All @@ -1030,13 +1051,16 @@ func uploadFile(dstPath, name string, file io.Reader, publish bool) error {
log.Logf(0, "uploading %v to %v", name, URLStr)
if strings.HasPrefix(URLStr, "http://") ||
strings.HasPrefix(URLStr, "https://") {
return uploadFileHTTPPut(URLStr, file)
if gcsClient != nil {
return fmt.Errorf("gcsClient is expected to be nil for the http* requests")
}
return uploadFileHTTPPut(ctx, URLStr, file)
}
return gcs.UploadFile(context.Background(), file, URLStr, gcs.UploadOptions{Publish: publish})
return gcs.UploadFile(ctx, file, URLStr, gcs.UploadOptions{Publish: publish, GCSClientMock: gcsClient})
}

func uploadFileHTTPPut(URL string, file io.Reader) error {
req, err := http.NewRequest(http.MethodPut, URL, file)
func uploadFileHTTPPut(ctx context.Context, URL string, file io.Reader) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, file)
if err != nil {
return fmt.Errorf("failed to create HTTP PUT request: %w", err)
}
Expand Down
Loading