Skip to content

Commit c53ea9c

Browse files
committed
pkg/covermerger: read gcs files sequentially
Quarter long aggregation means thousands of gzip files. Opening all the files in parallel we struggle from: 1. Memory overhead. 2. GCS API errors. It can't read Attrs for 1500+ files.
1 parent 1c4febd commit c53ea9c

File tree

4 files changed

+194
-63
lines changed

4 files changed

+194
-63
lines changed

pkg/cover/file.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,11 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa
6363
}
6464

6565
fromDate, toDate := tp.DatesFromTo()
66-
dbReader := covermerger.MakeBQCSVReader()
67-
if err := dbReader.InitNsRecords(c,
68-
ns,
69-
filePath,
70-
sourceCommit,
71-
fromDate,
72-
toDate,
73-
); err != nil {
74-
return nil, fmt.Errorf("failed to dbReader.InitNsRecords: %w", err)
75-
}
76-
defer dbReader.Close()
77-
csvReader, err := dbReader.Reader()
66+
csvReader, err := covermerger.InitNsRecords(c, ns, filePath, sourceCommit, fromDate, toDate)
7867
if err != nil {
79-
return nil, fmt.Errorf("failed to dbReader.Reader: %w", err)
68+
return nil, fmt.Errorf("failed to covermerger.InitNsRecords: %w", err)
8069
}
70+
defer csvReader.Close()
8171

8272
ch := make(chan *covermerger.FileMergeResult, 1)
8373
if err := covermerger.MergeCSVData(c, config, csvReader, ch); err != nil {

pkg/covermerger/bq_csv_reader.go

Lines changed: 70 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,24 @@ import (
1616
"github.com/google/uuid"
1717
)
1818

19-
type bqCSVReader struct {
20-
closers []io.Closer
21-
gcsFiles []io.Reader
22-
}
23-
24-
func (r *bqCSVReader) Close() {
25-
for _, c := range r.closers {
26-
c.Close()
27-
}
28-
}
29-
30-
func MakeBQCSVReader() *bqCSVReader {
31-
return &bqCSVReader{}
32-
}
33-
34-
func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) error {
19+
func InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) (io.ReadCloser, error) {
3520
if err := validator.AnyError("input validation failed",
3621
validator.NamespaceName(ns),
3722
validator.AnyOk(validator.EmptyStr(filePath), validator.KernelFilePath(filePath)),
3823
validator.AnyOk(validator.EmptyStr(commit), validator.CommitHash(commit)),
3924
); err != nil {
40-
return err
25+
return nil, err
4126
}
4227
sessionUUID := uuid.New().String()
4328
gsBucket := "syzbot-temp"
4429
gsPath := fmt.Sprintf("bq-exports/%s", sessionUUID)
4530
gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz"
4631
client, err := bigquery.NewClient(ctx, "syzkaller")
4732
if err != nil {
48-
return fmt.Errorf("failed to initialize bigquery client: %w", err)
33+
return nil, fmt.Errorf("failed to initialize bigquery client: %w", err)
4934
}
5035
if err := client.EnableStorageReadClient(ctx); err != nil {
51-
return fmt.Errorf("failed to client.EnableStorageReadClient: %w", err)
36+
return nil, fmt.Errorf("failed to client.EnableStorageReadClient: %w", err)
5237
}
5338
selectCommit := ""
5439
if commit != "" {
@@ -77,48 +62,91 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st
7762
`, gsURI, ns, from.String(), to.String(), filePath, selectCommit))
7863
job, err := q.Run(ctx)
7964
if err != nil {
80-
return fmt.Errorf("err during bigquery.Run: %w", err)
65+
return nil, fmt.Errorf("err during bigquery.Run: %w", err)
8166
}
8267
status, err := job.Wait(ctx)
8368
if err != nil {
84-
return fmt.Errorf("err waiting for the bigquery.Job: %w", err)
69+
return nil, fmt.Errorf("err waiting for the bigquery.Job: %w", err)
8570
}
8671
if status.Err() != nil {
87-
return fmt.Errorf("bigquery job failed with status %w", status.Err())
72+
return nil, fmt.Errorf("bigquery job failed with status %w", status.Err())
8873
}
89-
return r.initGCSFileReaders(ctx, gsBucket, gsPath)
74+
return initGCSMultiReader(ctx, gsBucket, gsPath)
9075
}
9176

92-
func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error {
77+
func initGCSMultiReader(ctx context.Context, bucket, path string) (io.ReadCloser, error) {
9378
var gcsClient gcs.Client
9479
var err error
9580
if gcsClient, err = gcs.NewClient(ctx); err != nil {
96-
return fmt.Errorf("err creating gcs client: %w", err)
81+
return nil, fmt.Errorf("err creating gcs client: %w", err)
9782
}
9883
var gcsFiles []*gcs.Object
9984
if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil {
100-
return fmt.Errorf("err enumerating gcs files: %w", err)
85+
return nil, fmt.Errorf("err enumerating gcs files: %w", err)
10186
}
87+
paths := []string{}
10288
for _, obj := range gcsFiles {
103-
var readCloser io.ReadCloser
104-
if readCloser, err = gcsClient.FileReader(bucket + "/" + obj.Path); err != nil {
105-
return fmt.Errorf("failed to get %s reader: %w", obj.Path, err)
106-
}
107-
r.closers = append(r.closers, readCloser)
108-
r.gcsFiles = append(r.gcsFiles, readCloser)
89+
paths = append(paths, bucket+"/"+obj.Path)
10990
}
110-
return nil
91+
return &gcsGZIPMultiReader{
92+
gcsClient: gcsClient,
93+
gcsFiles: paths,
94+
}, nil
11195
}
11296

113-
func (r *bqCSVReader) Reader() (io.Reader, error) {
114-
var readers []io.Reader
115-
for _, file := range r.gcsFiles {
116-
gzipReaderCloser, err := gzip.NewReader(file)
117-
if err != nil {
118-
return nil, fmt.Errorf("err calling gzip.NewReader: %w", err)
97+
type gcsGZIPMultiReader struct {
98+
gcsClient gcs.Client
99+
gcsFiles []string
100+
101+
curFileReader io.ReadCloser
102+
curGZReadCloser io.ReadCloser
103+
}
104+
105+
func (mr *gcsGZIPMultiReader) Read(p []byte) (int, error) {
106+
for len(mr.gcsFiles) > 0 {
107+
if mr.curFileReader == nil {
108+
var err error
109+
if mr.curFileReader, err = mr.gcsClient.FileReader(mr.gcsFiles[0]); err != nil {
110+
return 0, fmt.Errorf("failed to get %s reader: %w", mr.gcsFiles[0], err)
111+
}
112+
if mr.curGZReadCloser, err = gzip.NewReader(mr.curFileReader); err != nil {
113+
mr.curGZReadCloser = nil // gzip.NewReader returns *Reader(nil) on corrupted header
114+
return 0, fmt.Errorf("err calling gzip.NewReader: %w", err)
115+
}
116+
}
117+
n, err := mr.curGZReadCloser.Read(p)
118+
if err == io.EOF {
119+
mr.gcsFiles = mr.gcsFiles[1:]
120+
if err := mr.Close(); err != nil {
121+
return 0, fmt.Errorf("mr.Close: %w", err)
122+
}
123+
}
124+
if n > 0 || err != io.EOF {
125+
if err == io.EOF && len(mr.gcsFiles) > 0 {
126+
// Don't return EOF yet. More readers remain.
127+
err = nil
128+
}
129+
return n, err
119130
}
120-
r.closers = append(r.closers, gzipReaderCloser)
121-
readers = append(readers, gzipReaderCloser)
122131
}
123-
return io.MultiReader(readers...), nil
132+
return 0, io.EOF
133+
}
134+
135+
func (mr *gcsGZIPMultiReader) Close() error {
136+
var err1, err2 error
137+
if mr.curGZReadCloser != nil {
138+
err1 = mr.curGZReadCloser.Close()
139+
}
140+
if mr.curFileReader != nil {
141+
err2 = mr.curFileReader.Close()
142+
}
143+
mr.curFileReader = nil
144+
mr.curGZReadCloser = nil
145+
if err1 != nil {
146+
return fmt.Errorf("mr.curGZReadCloser.Close: %w", err1)
147+
}
148+
if err2 != nil {
149+
return fmt.Errorf("mr.curFileReader.Close: %w", err2)
150+
}
151+
return nil
124152
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright 2025 syzkaller project authors. All rights reserved.
2+
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
3+
4+
package covermerger
5+
6+
import (
7+
"bytes"
8+
"compress/gzip"
9+
"errors"
10+
"fmt"
11+
"io"
12+
"testing"
13+
14+
"github.com/google/syzkaller/pkg/gcs"
15+
gcsmocks "github.com/google/syzkaller/pkg/gcs/mocks"
16+
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/mock"
18+
)
19+
20+
func TestGCSGZIPMultiReader_Read(t *testing.T) {
21+
tests := []struct {
22+
name string
23+
inputFiles []string
24+
inputBytes [][]byte
25+
26+
wantBytes []byte
27+
wantErr error
28+
}{
29+
{
30+
name: "single file, single read",
31+
inputFiles: []string{"file1"},
32+
inputBytes: [][]byte{gzBytes("1")},
33+
wantBytes: []byte("1"),
34+
wantErr: nil,
35+
},
36+
{
37+
name: "single file, multiple reads",
38+
inputFiles: []string{"file1"},
39+
inputBytes: [][]byte{gzBytes("123")},
40+
wantBytes: []byte("123"),
41+
wantErr: nil,
42+
},
43+
{
44+
name: "multiple files, multiple reads",
45+
inputFiles: []string{"file1", "file2", "file3"},
46+
inputBytes: [][]byte{gzBytes("123"), gzBytes("456"), gzBytes("789")},
47+
wantBytes: []byte("123456789"),
48+
wantErr: nil,
49+
},
50+
{
51+
name: "multiple files, badbytes",
52+
inputFiles: []string{"file1", "file2", "file3"},
53+
inputBytes: [][]byte{gzBytes("123"), gzBytes("456"), []byte("789")},
54+
wantBytes: []byte("123456"),
55+
wantErr: fmt.Errorf("err calling gzip.NewReader: %w", errors.New("unexpected EOF")),
56+
},
57+
}
58+
59+
for _, test := range tests {
60+
t.Run(test.name, func(t *testing.T) {
61+
mr := &gcsGZIPMultiReader{
62+
gcsClient: makeGCSClientMock(t, test.inputFiles, test.inputBytes),
63+
gcsFiles: test.inputFiles,
64+
}
65+
gotBytes, gotErr := io.ReadAll(mr)
66+
assert.NoError(t, mr.Close())
67+
assert.Equal(t, test.wantErr, gotErr)
68+
assert.Equal(t, test.wantBytes, gotBytes)
69+
})
70+
}
71+
}
72+
73+
func makeGCSClientMock(t *testing.T, files []string, bytes [][]byte) gcs.Client {
74+
gcsClientMock := gcsmocks.NewClient(t)
75+
for i, file := range files {
76+
rcMock := &readCloserMock{}
77+
for _, b := range bytes[i] {
78+
rcMock.On("Read", mock.Anything).
79+
Run(func(args mock.Arguments) {
80+
arg := args.Get(0).([]byte)
81+
arg[0] = b
82+
}).
83+
Return(1, nil).Once()
84+
}
85+
rcMock.On("Read", mock.Anything).
86+
Return(0, io.EOF).
87+
On("Close").
88+
Return(nil).Once()
89+
90+
gcsClientMock.EXPECT().
91+
FileReader(file).
92+
Return(rcMock, nil)
93+
}
94+
return gcsClientMock
95+
}
96+
97+
type readCloserMock struct {
98+
mock.Mock
99+
}
100+
101+
func (m *readCloserMock) Read(p []byte) (n int, err error) {
102+
args := m.Called(p)
103+
return args.Int(0), args.Error(1)
104+
}
105+
106+
func (m *readCloserMock) Close() (err error) {
107+
args := m.Called()
108+
return args.Error(0)
109+
}
110+
111+
func gzBytes(str string) []byte {
112+
buf := &bytes.Buffer{}
113+
gzw := gzip.NewWriter(buf)
114+
gzw.Write([]byte(str))
115+
gzw.Close()
116+
return buf.Bytes()
117+
}

tools/syz-covermerger/syz_covermerger.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,17 @@ func do() error {
7171
panic(fmt.Sprintf("failed to parse time_to: %s", err.Error()))
7272
}
7373
dateFrom = dateTo.AddDays(-int(*flagDuration))
74-
dbReader := covermerger.MakeBQCSVReader()
75-
if err = dbReader.InitNsRecords(context.Background(),
74+
csvReader, err := covermerger.InitNsRecords(context.Background(),
7675
*flagNamespace,
7776
*flagFilePathPrefix,
7877
"",
7978
dateFrom,
8079
dateTo,
81-
); err != nil {
80+
)
81+
if err != nil {
8282
panic(fmt.Sprintf("failed to dbReader.InitNsRecords: %v", err.Error()))
8383
}
84-
defer dbReader.Close()
85-
csvReader, errReader := dbReader.Reader()
86-
if errReader != nil {
87-
panic(fmt.Sprintf("failed to dbReader.Reader: %v", errReader.Error()))
88-
}
84+
defer csvReader.Close()
8985
var wc io.WriteCloser
9086
url := *flagToGCS
9187
if *flagToDashAPI != "" {

0 commit comments

Comments
 (0)