Skip to content

Commit 390ff24

Browse files
committed
tools/syz-covermerger: upload coverage as jsonl
Previous implementation store only the summary of processed records. The summary was <1GB and single processing node was able to manipulate the data. Current implementation stores all the details about records read to make post-processing more flexible. This change was needed to get access to the source manager name and will help to analyze other details. This new implementation requires 20GB mem to process single day records. CSV log interning experiment allowed to merge using 10G. Quarter data aggregation will cost ~100 times more. The alternative is to use stream processing. We can process data kernel-file-by-file. It allows to /15000 memory consumption. This approach is implemented here. We're batching coverage signals by file and store per-file results in GCS JSONL file. See https://jsonlines.org/ to learn about jsonl.
1 parent d232d36 commit 390ff24

File tree

7 files changed

+222
-184
lines changed

7 files changed

+222
-184
lines changed

dashboard/app/api.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1938,37 +1938,27 @@ func apiCreateUploadURL(c context.Context, payload io.Reader) (interface{}, erro
19381938
}
19391939
return fmt.Printf("%s/%s.upload", bucket, uuid.New().String())
19401940
}
1941+
1942+
// apiSaveCoverage reads jsonl data from payload and stores it to coveragedb.
1943+
// First payload jsonl line is a coveragedb.HistoryRecord (w/o session and time).
1944+
// Second+ records are coveragedb.MergedCoverageRecord.
19411945
func apiSaveCoverage(c context.Context, payload io.Reader) (interface{}, error) {
1942-
req := new(dashapi.SaveCoverageReq)
1943-
if err := json.NewDecoder(payload).Decode(req); err != nil {
1944-
return nil, fmt.Errorf("failed to unmarshal request: %w", err)
1946+
descr := new(coveragedb.HistoryRecord)
1947+
if err := json.NewDecoder(payload).Decode(descr); err != nil {
1948+
return 0, fmt.Errorf("json.NewDecoder(dashapi.MergedCoverageDescription).Decode: %w", err)
19451949
}
1946-
coverage := req.Coverage
19471950
var sss []*subsystem.Subsystem
1948-
if service := getNsConfig(c, coverage.Namespace).Subsystems.Service; service != nil {
1951+
if service := getNsConfig(c, descr.Namespace).Subsystems.Service; service != nil {
19491952
sss = service.List()
1950-
log.Infof(c, "found %d subsystems for %s namespace", len(sss), coverage.Namespace)
1951-
}
1952-
rowsCreated, err := coveragedb.SaveMergeResult(
1953-
context.Background(),
1954-
appengine.AppID(context.Background()),
1955-
coverage.FileData,
1956-
&coveragedb.HistoryRecord{
1957-
Namespace: coverage.Namespace,
1958-
Repo: coverage.Repo,
1959-
Commit: coverage.Commit,
1960-
Duration: coverage.Duration,
1961-
DateTo: coverage.DateTo,
1962-
},
1963-
coverage.TotalRows,
1964-
sss,
1965-
)
1953+
log.Infof(c, "found %d subsystems for %s namespace", len(sss), descr.Namespace)
1954+
}
1955+
rowsCreated, err := coveragedb.SaveMergeResult(c, appengine.AppID(context.Background()), descr, payload, sss)
19661956
if err != nil {
19671957
log.Errorf(c, "error storing coverage for ns %s, date %s: %v",
1968-
coverage.Namespace, coverage.DateTo.String(), err)
1958+
descr.Namespace, descr.DateTo.String(), err)
19691959
} else {
19701960
log.Infof(c, "updated coverage for ns %s, date %s to %d rows",
1971-
coverage.Namespace, coverage.DateTo.String(), coverage.TotalRows)
1961+
descr.Namespace, descr.DateTo.String(), descr.TotalRows)
19721962
}
19731963
return &rowsCreated, err
19741964
}

dashboard/dashapi/dashapi.go

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package dashapi
88
import (
99
"bytes"
1010
"compress/gzip"
11-
"context"
1211
"encoding/json"
1312
"fmt"
1413
"io"
@@ -18,10 +17,7 @@ import (
1817
"reflect"
1918
"time"
2019

21-
"cloud.google.com/go/civil"
2220
"github.com/google/syzkaller/pkg/auth"
23-
"github.com/google/syzkaller/pkg/coveragedb"
24-
"github.com/google/syzkaller/pkg/gcs"
2521
)
2622

2723
type Dashboard struct {
@@ -692,42 +688,18 @@ func (dash *Dashboard) SaveDiscussion(req *SaveDiscussionReq) error {
692688
return dash.Query("save_discussion", req, nil)
693689
}
694690

695-
type MergedCoverage struct {
696-
Namespace string
697-
Repo string
698-
Commit string
699-
Duration int64
700-
DateTo civil.Date
701-
TotalRows int64
702-
FileData coveragedb.ManagersCoverage
703-
}
704-
705-
type SaveCoverageReq struct {
706-
Coverage *MergedCoverage
707-
}
708-
709-
// SaveCoverage returns amount of records created in db.
710-
func (dash *Dashboard) SaveCoverage(req *SaveCoverageReq) (int, error) {
691+
func (dash *Dashboard) CreateUploadURL() (string, error) {
711692
uploadURL := new(string)
712-
if err := dash.Query("create_upload_url", req, uploadURL); err != nil {
713-
return 0, fmt.Errorf("create_upload_url: %w", err)
714-
}
715-
716-
gcsClient, err := gcs.NewClient(context.Background())
717-
if err != nil {
718-
return 0, fmt.Errorf("gcs.NewClient: %w", err)
719-
}
720-
w, err := gcsClient.FileWriter(*uploadURL)
721-
if err != nil {
722-
return 0, fmt.Errorf("gcsClient.FileWriter: %w", err)
723-
}
724-
defer w.Close()
725-
if err := json.NewEncoder(gzip.NewWriter(w)).Encode(req); err != nil {
726-
return 0, fmt.Errorf("json.NewEncoder(gzip.NewWriter(w)).Encode: %w", err)
693+
if err := dash.Query("create_upload_url", nil, uploadURL); err != nil {
694+
return "", fmt.Errorf("create_upload_url: %w", err)
727695
}
696+
return *uploadURL, nil
697+
}
728698

699+
// SaveCoverage returns amount of records created in db.
700+
func (dash *Dashboard) SaveCoverage(gcpURL string) (int, error) {
729701
rowsWritten := new(int)
730-
if err := dash.Query("save_coverage", *uploadURL, rowsWritten); err != nil {
702+
if err := dash.Query("save_coverage", gcpURL, rowsWritten); err != nil {
731703
return 0, fmt.Errorf("save_coverage: %w", err)
732704
}
733705
return *rowsWritten, nil

pkg/cover/file.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,24 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa
7878
return nil, fmt.Errorf("failed to dbReader.Reader: %w", err)
7979
}
8080

81-
mergeResult, err := covermerger.MergeCSVData(config, csvReader)
82-
if err != nil {
81+
ch := make(chan *covermerger.FileMergeResult, 1)
82+
if err := covermerger.MergeCSVData(config, csvReader, ch); err != nil {
8383
return nil, fmt.Errorf("error merging coverage: %w", err)
8484
}
85-
if _, exist := mergeResult[filePath]; !exist {
85+
86+
var mr *covermerger.MergeResult
87+
select {
88+
case fmr := <-ch:
89+
if fmr != nil {
90+
mr = fmr.MergeResult
91+
}
92+
default:
93+
}
94+
95+
if mr != nil {
8696
return nil, fmt.Errorf("no merge result for file %s", filePath)
8797
}
88-
return mergeResult[filePath], nil
98+
return mr, nil
8999
}
90100

91101
func rendResult(content string, coverage *covermerger.MergeResult, renderConfig *CoverageRenderConfig) string {

pkg/coveragedb/spanner.go

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
package coveragedb
55

66
import (
7+
"bufio"
78
"context"
9+
"encoding/json"
810
"fmt"
11+
"io"
912
"os"
1013
"sync/atomic"
1114
"time"
@@ -51,11 +54,6 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) {
5154
return spanner.NewClient(ctx, database)
5255
}
5356

54-
type ManagersCoverage map[string]ManagerCoverage
55-
56-
// ManagerCoverage is a file to coverage mapping.
57-
type ManagerCoverage map[string]*Coverage
58-
5957
type Coverage struct {
6058
Instrumented int64
6159
Covered int64
@@ -72,8 +70,14 @@ func (c *Coverage) AddLineHitCount(line, hitCount int) {
7270
}
7371
}
7472

75-
func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage,
76-
template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) (int, error) {
73+
type MergedCoverageRecord struct {
74+
Manager string
75+
FilePath string
76+
FileData *Coverage
77+
}
78+
79+
func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader,
80+
sss []*subsystem.Subsystem) (int, error) {
7781
client, err := NewClient(ctx, projectID)
7882
if err != nil {
7983
return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
@@ -87,25 +91,30 @@ func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCo
8791
session := uuid.New().String()
8892
mutations := []*spanner.Mutation{}
8993

90-
for manager, covMap := range manCovMap {
91-
for filePath, record := range covMap {
92-
mutations = append(mutations, fileRecordMutation(session, manager, filePath, record))
93-
subsystems := fileSubsystems(filePath, ssMatcher, ssCache)
94-
mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems))
95-
// There is a limit on the number of mutations per transaction (80k) imposed by the DB.
96-
// This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
97-
// and implicit index mutations.
98-
// We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
99-
if len(mutations) > 1000 {
100-
if _, err = client.Apply(ctx, mutations); err != nil {
101-
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
102-
}
103-
rowsCreated += len(mutations)
104-
mutations = nil
94+
jsonlScanner := bufio.NewScanner(jsonl)
95+
96+
for jsonlScanner.Scan() {
97+
var mcr MergedCoverageRecord
98+
if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil {
99+
return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err)
100+
}
101+
mutations = append(mutations, fileRecordMutation(session, &mcr))
102+
subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache)
103+
mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems))
104+
// There is a limit on the number of mutations per transaction (80k) imposed by the DB.
105+
// This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
106+
// and implicit index mutations.
107+
// We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
108+
if len(mutations) > 1000 {
109+
if _, err = client.Apply(ctx, mutations); err != nil {
110+
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
105111
}
112+
rowsCreated += len(mutations)
113+
mutations = nil
106114
}
107115
}
108-
mutations = append(mutations, historyMutation(session, template, totalRows))
116+
117+
mutations = append(mutations, historyMutation(session, descr))
109118
if _, err = client.Apply(ctx, mutations); err != nil {
110119
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
111120
}
@@ -170,7 +179,7 @@ func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeri
170179
return res, nil
171180
}
172181

173-
func historyMutation(session string, template *HistoryRecord, totalRows int64) *spanner.Mutation {
182+
func historyMutation(session string, template *HistoryRecord) *spanner.Mutation {
174183
historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{
175184
Session: session,
176185
Time: time.Now(),
@@ -179,23 +188,23 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
179188
Commit: template.Commit,
180189
Duration: template.Duration,
181190
DateTo: template.DateTo,
182-
TotalRows: totalRows,
191+
TotalRows: template.TotalRows,
183192
})
184193
if err != nil {
185194
panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error()))
186195
}
187196
return historyInsert
188197
}
189198

190-
func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation {
199+
func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation {
191200
insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{
192201
Session: session,
193-
FilePath: filePath,
194-
Instrumented: record.Instrumented,
195-
Covered: record.Covered,
196-
LinesInstrumented: record.LinesInstrumented,
197-
HitCounts: record.HitCounts,
198-
Manager: manager,
202+
FilePath: mcr.FilePath,
203+
Instrumented: mcr.FileData.Instrumented,
204+
Covered: mcr.FileData.Covered,
205+
LinesInstrumented: mcr.FileData.LinesInstrumented,
206+
HitCounts: mcr.FileData.HitCounts,
207+
Manager: mcr.Manager,
199208
})
200209
if err != nil {
201210
panic(fmt.Sprintf("failed to fileRecordMutation: %v", err))

pkg/covermerger/covermerger.go

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"fmt"
1111
"io"
1212
"strconv"
13-
"sync"
1413

1514
"github.com/google/syzkaller/pkg/log"
1615
"golang.org/x/exp/maps"
@@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool {
127126
return true
128127
}
129128

130-
type FilesMergeResults map[string]*MergeResult
129+
type FileMergeResult struct {
130+
FilePath string
131+
*MergeResult
132+
}
131133

132-
func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
134+
func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
133135
var schema []string
134136
csvReader := csv.NewReader(reader)
135137
if fields, err := csvReader.Read(); err != nil {
136-
return nil, fmt.Errorf("failed to read schema: %w", err)
138+
return fmt.Errorf("failed to read schema: %w", err)
137139
} else {
138140
schema = fields
139141
}
@@ -163,47 +165,40 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
163165
}
164166
errStreamChan <- nil
165167
}()
166-
mergeResult, errMerging := mergeChanData(config, recordsChan)
168+
errMerging := mergeChanData(config, recordsChan, results)
167169
errStream := <-errStreamChan
168170
if errMerging != nil || errStream != nil {
169-
return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
171+
return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
170172
errMerging, errStream)
171173
}
172-
return mergeResult, nil
174+
return nil
173175
}
174176

175177
type FileRecords struct {
176178
fileName string
177179
records []*FileRecord
178180
}
179181

180-
func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) {
182+
func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
181183
g, ctx := errgroup.WithContext(context.Background())
182184
frecordChan := groupFileRecords(recordChan, ctx)
183-
stat := make(FilesMergeResults)
184-
var mu sync.Mutex
185+
185186
for i := 0; i < c.Jobs; i++ {
186187
g.Go(func() error {
187188
for frecord := range frecordChan {
188-
if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil {
189+
mr, err := batchFileData(c, frecord.fileName, frecord.records)
190+
if err != nil {
189191
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
190-
} else {
191-
mu.Lock()
192-
if _, exist := stat[frecord.fileName]; exist {
193-
mu.Unlock()
194-
return fmt.Errorf("file %s was already processed", frecord.fileName)
195-
}
196-
stat[frecord.fileName] = mr
197-
mu.Unlock()
192+
}
193+
results <- &FileMergeResult{
194+
FilePath: frecord.fileName,
195+
MergeResult: mr,
198196
}
199197
}
200198
return nil
201199
})
202200
}
203-
if err := g.Wait(); err != nil {
204-
return nil, err
205-
}
206-
return stat, nil
201+
return g.Wait()
207202
}
208203

209204
func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords {

0 commit comments

Comments
 (0)