Skip to content

Commit 9b47664

Browse files
committed
syz-ci/manager.go: compress programs coverage data
Each fuzzing session costs 2G-13G now. It looks too much. The data is highly redundant (jsonl) thus compression should help.
1 parent 89d30d7 commit 9b47664

File tree

2 files changed

+194
-31
lines changed

2 files changed

+194
-31
lines changed

syz-ci/manager.go

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package main
55

66
import (
7+
"compress/gzip"
78
"context"
89
"crypto/sha256"
910
"encoding/json"
@@ -36,6 +37,7 @@ import (
3637
"github.com/google/syzkaller/prog"
3738
_ "github.com/google/syzkaller/sys"
3839
"github.com/google/syzkaller/sys/targets"
40+
"golang.org/x/sync/errgroup"
3941
)
4042

4143
// This is especially slightly longer than syzkaller rebuild period.
@@ -831,15 +833,19 @@ func (mgr *Manager) uploadBuildAssets(buildInfo *dashapi.Build, assetFolder stri
831833
return ret, nil
832834
}
833835

834-
func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) {
836+
func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Response, err error) {
835837
addr := mgr.managercfg.HTTP
836838
if addr != "" && addr[0] == ':' {
837839
addr = "127.0.0.1" + addr // in case addr is ":port"
838840
}
839841
client := &http.Client{
840842
Timeout: time.Hour,
841843
}
842-
return client.Get(fmt.Sprintf("http://%s%s", addr, path))
844+
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s%s", addr, path), nil)
845+
if err != nil {
846+
return nil, err
847+
}
848+
return client.Do(req)
843849
}
844850

845851
func (mgr *Manager) uploadCoverReport() error {
@@ -859,13 +865,13 @@ func (mgr *Manager) uploadCoverReport() error {
859865
}
860866
defer buildSem.Signal()
861867

862-
resp, err := mgr.httpGET("/cover")
868+
resp, err := mgr.httpGET(context.Background(), "/cover")
863869
if err != nil {
864870
return fmt.Errorf("failed to get report: %w", err)
865871
}
866872
defer resp.Body.Close()
867873
if directUpload {
868-
return uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS)
874+
return uploadFile(context.Background(), nil, mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS)
869875
}
870876
// Upload via the asset storage.
871877
newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html",
@@ -880,8 +886,8 @@ func (mgr *Manager) uploadCoverReport() error {
880886
return nil
881887
}
882888

883-
func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.Time, publish bool,
884-
f func(io.Writer, *json.Decoder) error) error {
889+
func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest string, curTime time.Time,
890+
publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
885891
if !mgr.managercfg.Cover || gcsDest == "" {
886892
return nil
887893
}
@@ -895,7 +901,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T
895901
}
896902
defer buildSem.Signal()
897903

898-
resp, err := mgr.httpGET(mgrSrc)
904+
eg, egCtx := errgroup.WithContext(context.Background())
905+
resp, err := mgr.httpGET(egCtx, mgrSrc)
899906
if err != nil {
900907
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
901908
}
@@ -908,36 +915,48 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T
908915
}
909916

910917
pr, pw := io.Pipe()
911-
defer pr.Close()
912-
go func() {
918+
eg.Go(func() error {
919+
defer pw.Close()
920+
var w io.Writer
921+
w = pw
922+
if compress {
923+
gzw := gzip.NewWriter(pw)
924+
defer gzw.Close()
925+
w = gzw
926+
}
913927
decoder := json.NewDecoder(resp.Body)
914928
for decoder.More() {
915-
if err := f(pw, decoder); err != nil {
916-
pw.CloseWithError(fmt.Errorf("callback: %w", err))
917-
return
929+
if err := f(w, decoder); err != nil {
930+
return fmt.Errorf("callback: %w", err)
918931
}
919932
}
920-
pw.Close()
921-
}()
922-
fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl",
923-
mgr.mgrcfg.DashboardClient,
924-
mgr.name, curTime.Format(time.DateOnly),
925-
curTime.Hour(), curTime.Minute())
926-
if err := uploadFile(gcsDest, fileName, pr, publish); err != nil {
927-
return fmt.Errorf("failed to uploadFileGCS(): %w", err)
928-
}
929-
return nil
933+
return nil
934+
})
935+
eg.Go(func() error {
936+
defer pr.Close()
937+
fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl",
938+
mgr.mgrcfg.DashboardClient,
939+
mgr.name, curTime.Format(time.DateOnly),
940+
curTime.Hour(), curTime.Minute())
941+
if err := uploadFile(egCtx, nil, gcsDest, fileName, pr, publish); err != nil {
942+
return fmt.Errorf("uploadFile: %w", err)
943+
}
944+
return nil
945+
})
946+
return eg.Wait()
930947
}
931948

932949
func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
933950
// Coverage report generation consumes and caches lots of memory.
934951
// In the syz-ci context report generation won't be used after this point,
935952
// so tell manager to flush report generator.
936953
curTime := time.Now()
937-
if err := mgr.uploadCoverJSONLToGCS("/cover?jsonl=1&flush=1",
954+
if err := mgr.uploadCoverJSONLToGCS(nil,
955+
"/cover?jsonl=1&flush=1",
938956
mgr.cfg.CoverPipelinePath,
939957
curTime,
940958
false,
959+
false,
941960
func(w io.Writer, dec *json.Decoder) error {
942961
var covInfo cover.CoverageInfo
943962
if err := dec.Decode(&covInfo); err != nil {
@@ -964,10 +983,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
964983
}
965984

966985
func (mgr *Manager) uploadProgramsWithCoverage() error {
967-
if err := mgr.uploadCoverJSONLToGCS("/coverprogs?jsonl=1",
986+
if err := mgr.uploadCoverJSONLToGCS(nil,
987+
"/coverprogs?jsonl=1",
968988
mgr.cfg.CoverProgramsPath,
969989
time.Now(),
970990
mgr.cfg.PublishGCS,
991+
true,
971992
func(w io.Writer, dec *json.Decoder) error {
972993
var programCoverage cover.ProgramCoverage
973994
if err := dec.Decode(&programCoverage); err != nil {
@@ -994,7 +1015,7 @@ func (mgr *Manager) uploadCorpus() error {
9941015
return err
9951016
}
9961017
defer f.Close()
997-
return uploadFile(mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
1018+
return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
9981019
}
9991020

10001021
func (mgr *Manager) uploadBenchData() error {
@@ -1012,15 +1033,15 @@ func (mgr *Manager) uploadBenchData() error {
10121033
return fmt.Errorf("failed to open bench file: %w", err)
10131034
}
10141035
defer f.Close()
1015-
err = uploadFile(mgr.cfg.BenchUploadPath+"/"+mgr.name,
1036+
err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
10161037
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
10171038
if err != nil {
10181039
return fmt.Errorf("failed to upload the bench file: %w", err)
10191040
}
10201041
return nil
10211042
}
10221043

1023-
func uploadFile(dstPath, name string, file io.Reader, publish bool) error {
1044+
func uploadFile(ctx context.Context, gcsClient gcs.Client, dstPath, name string, file io.Reader, publish bool) error {
10241045
URL, err := url.Parse(dstPath)
10251046
if err != nil {
10261047
return fmt.Errorf("failed to parse upload path: %w", err)
@@ -1030,13 +1051,16 @@ func uploadFile(dstPath, name string, file io.Reader, publish bool) error {
10301051
log.Logf(0, "uploading %v to %v", name, URLStr)
10311052
if strings.HasPrefix(URLStr, "http://") ||
10321053
strings.HasPrefix(URLStr, "https://") {
1033-
return uploadFileHTTPPut(URLStr, file)
1054+
if gcsClient != nil {
1055+
return fmt.Errorf("gcsClient is expected to be nil for the http* requests")
1056+
}
1057+
return uploadFileHTTPPut(ctx, URLStr, file)
10341058
}
1035-
return gcs.UploadFile(context.Background(), file, URLStr, gcs.UploadOptions{Publish: publish})
1059+
return gcs.UploadFile(ctx, file, URLStr, gcs.UploadOptions{Publish: publish, GCSClientMock: gcsClient})
10361060
}
10371061

1038-
func uploadFileHTTPPut(URL string, file io.Reader) error {
1039-
req, err := http.NewRequest(http.MethodPut, URL, file)
1062+
func uploadFileHTTPPut(ctx context.Context, URL string, file io.Reader) error {
1063+
req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, file)
10401064
if err != nil {
10411065
return fmt.Errorf("failed to create HTTP PUT request: %w", err)
10421066
}

syz-ci/manager_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,23 @@
44
package main
55

66
import (
7+
"bytes"
8+
"compress/gzip"
9+
"context"
10+
"encoding/json"
711
"fmt"
12+
"io"
13+
"net/http"
14+
"net/http/httptest"
15+
"strings"
816
"testing"
17+
"time"
918

1019
"github.com/google/syzkaller/dashboard/dashapi"
20+
"github.com/google/syzkaller/pkg/cover"
21+
"github.com/google/syzkaller/pkg/gcs"
22+
gcsmocks "github.com/google/syzkaller/pkg/gcs/mocks"
23+
"github.com/google/syzkaller/pkg/mgrconfig"
1124
"github.com/google/syzkaller/pkg/vcs"
1225
"github.com/google/syzkaller/sys/targets"
1326
"github.com/stretchr/testify/assert"
@@ -106,3 +119,129 @@ Reported-by: foo+abcd000@bar.com`,
106119
assert.Equal(t, commit.Title, "title with fix")
107120
assert.ElementsMatch(t, commit.BugIDs, []string{"abcd000"})
108121
}
122+
123+
func TestUploadCoverJSONLToGCS(t *testing.T) {
124+
tests := []struct {
125+
name string
126+
127+
inputJSONL string
128+
inputTime time.Time
129+
130+
inputCompress bool
131+
inputPublish bool
132+
133+
wantGCSFileName string
134+
wantGCSFileContent string
135+
wantCompressed bool
136+
wantPublish bool
137+
wantError string
138+
}{
139+
{
140+
name: "upload single object",
141+
inputJSONL: "{}",
142+
inputTime: time.Time{},
143+
wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
144+
wantGCSFileContent: "{}\n",
145+
},
146+
{
147+
name: "upload single object, compress",
148+
inputJSONL: "{}",
149+
inputTime: time.Time{},
150+
inputCompress: true,
151+
wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
152+
wantGCSFileContent: "{}\n",
153+
wantCompressed: true,
154+
},
155+
{
156+
name: "upload single object, publish",
157+
inputJSONL: "{}",
158+
inputTime: time.Time{},
159+
inputPublish: true,
160+
wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
161+
wantGCSFileContent: "{}\n",
162+
wantPublish: true,
163+
},
164+
{
165+
name: "upload single object, error",
166+
inputJSONL: "{",
167+
inputTime: time.Time{},
168+
wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
169+
wantError: "callback: cover.ProgramCoverage: unexpected EOF",
170+
},
171+
}
172+
173+
for _, test := range tests {
174+
t.Run(test.name, func(t *testing.T) {
175+
httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
176+
w.Write([]byte(test.inputJSONL))
177+
}))
178+
defer httpServer.Close()
179+
180+
testSetverAddrPort, _ := strings.CutPrefix(httpServer.URL, "http://")
181+
mgr := Manager{
182+
name: "mgr-name",
183+
managercfg: &mgrconfig.Config{
184+
HTTP: testSetverAddrPort,
185+
Cover: true,
186+
},
187+
mgrcfg: &ManagerConfig{
188+
DashboardClient: "test-namespace",
189+
},
190+
}
191+
192+
gcsMock := gcsmocks.NewClient(t)
193+
gotBytes := mockWriteCloser{}
194+
195+
gcsMock.On("FileWriter", test.wantGCSFileName).
196+
Return(&gotBytes, nil).Once()
197+
gcsMock.On("Close").Return().Once()
198+
if test.wantPublish {
199+
gcsMock.On("Publish", test.wantGCSFileName).
200+
Return(nil).Once()
201+
}
202+
err := mgr.uploadCoverJSONLToGCS(gcsMock,
203+
"/teststream&jsonl=1",
204+
"gs://test-bucket",
205+
time.Time{}, test.inputPublish, test.inputCompress, func(w io.Writer, dec *json.Decoder) error {
206+
var v any
207+
if err := dec.Decode(&v); err != nil {
208+
return fmt.Errorf("cover.ProgramCoverage: %w", err)
209+
}
210+
if err := cover.WriteJSLine(w, &v); err != nil {
211+
return fmt.Errorf("cover.WriteJSLine: %w", err)
212+
}
213+
return nil
214+
})
215+
if test.wantError != "" {
216+
assert.Equal(t, test.wantError, err.Error())
217+
} else {
218+
assert.NoError(t, err)
219+
}
220+
assert.Equal(t, 1, gotBytes.closedTimes)
221+
if test.wantCompressed {
222+
gzReader, err := gzip.NewReader(&gotBytes.buf)
223+
assert.NoError(t, err)
224+
defer gzReader.Close()
225+
plainBytes := mockWriteCloser{}
226+
_, err = io.Copy(&plainBytes, gzReader)
227+
assert.NoError(t, err)
228+
gotBytes = plainBytes
229+
}
230+
assert.Equal(t, test.wantGCSFileContent, gotBytes.buf.String())
231+
})
232+
}
233+
}
234+
235+
type mockWriteCloser struct {
236+
buf bytes.Buffer
237+
closedTimes int
238+
}
239+
240+
func (m *mockWriteCloser) Write(p []byte) (n int, err error) {
241+
return m.buf.Write(p)
242+
}
243+
244+
func (m *mockWriteCloser) Close() error {
245+
m.closedTimes++
246+
return nil
247+
}

0 commit comments

Comments
 (0)