Skip to content

Commit 0f61b41

Browse files
committed
pkg/covermerger: propagate context cancellation
The problem is the deadlock happening on GCS storage error. GCS client establishes the connection when it has enough data to write. It is approximately 16M. The error happens on io.Writer access in the middle of the merge. This error terminates errgroup goroutine. Other goroutines remain blocked. This commit propagates termination signal to other goroutines.
1 parent b8e81ee commit 0f61b41

File tree

3 files changed

+33
-19
lines changed

3 files changed

+33
-19
lines changed

pkg/cover/file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa
7979
}
8080

8181
ch := make(chan *covermerger.FileMergeResult, 1)
82-
if err := covermerger.MergeCSVData(config, csvReader, ch); err != nil {
82+
if err := covermerger.MergeCSVData(c, config, csvReader, ch); err != nil {
8383
return nil, fmt.Errorf("error merging coverage: %w", err)
8484
}
8585

pkg/covermerger/covermerger.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ type FileCoverageMerger interface {
5959
// Returns (totalInstrumentedLines, totalCoveredLines, error).
6060
func MergeCSVWriteJSONL(config *Config, descr *coveragedb.HistoryRecord, csvReader io.Reader, w io.Writer,
6161
) (int, int, error) {
62-
eg := errgroup.Group{}
62+
eg, c := errgroup.WithContext(context.Background())
6363
mergeResults := make(chan *FileMergeResult)
6464
eg.Go(func() error {
6565
defer close(mergeResults)
66-
if err := MergeCSVData(config, csvReader, mergeResults); err != nil {
66+
if err := MergeCSVData(c, config, csvReader, mergeResults); err != nil {
6767
return fmt.Errorf("covermerger.MergeCSVData: %w", err)
6868
}
6969
return nil
@@ -223,18 +223,19 @@ type FileMergeResult struct {
223223
*MergeResult
224224
}
225225

226-
func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
226+
func MergeCSVData(c context.Context, config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
227227
var schema []string
228228
csvReader := csv.NewReader(reader)
229229
if fields, err := csvReader.Read(); err != nil {
230230
return fmt.Errorf("failed to read schema: %w", err)
231231
} else {
232232
schema = fields
233233
}
234-
errStreamChan := make(chan error, 1)
234+
errStreamChan := make(chan error, 2)
235235
recordsChan := make(chan *FileRecord)
236236
go func() {
237237
defer close(recordsChan)
238+
defer func() { errStreamChan <- nil }()
238239
for {
239240
fields, err := csvReader.Read()
240241
if err == io.EOF {
@@ -253,11 +254,14 @@ func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeRes
253254
errStreamChan <- fmt.Errorf("makeRecord: %w", err)
254255
return
255256
}
256-
recordsChan <- record
257+
select {
258+
case <-c.Done():
259+
return
260+
case recordsChan <- record:
261+
}
257262
}
258-
errStreamChan <- nil
259263
}()
260-
errMerging := mergeChanData(config, recordsChan, results)
264+
errMerging := mergeChanData(c, config, recordsChan, results)
261265
errStream := <-errStreamChan
262266
if errMerging != nil || errStream != nil {
263267
return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
@@ -271,20 +275,24 @@ type FileRecords struct {
271275
records []*FileRecord
272276
}
273277

274-
func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
275-
g, ctx := errgroup.WithContext(context.Background())
276-
frecordChan := groupFileRecords(recordChan, ctx)
278+
func mergeChanData(c context.Context, cfg *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult,
279+
) error {
280+
g := errgroup.Group{}
281+
frecordChan := groupFileRecords(recordChan, c)
277282

278-
for i := 0; i < c.Jobs; i++ {
283+
for i := 0; i < cfg.Jobs; i++ {
279284
g.Go(func() error {
280285
for frecord := range frecordChan {
281-
mr, err := batchFileData(c, frecord.fileName, frecord.records)
286+
mr, err := batchFileData(cfg, frecord.fileName, frecord.records)
282287
if err != nil {
283288
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
284289
}
285-
results <- &FileMergeResult{
290+
select {
291+
case <-c.Done():
292+
return nil
293+
case results <- &FileMergeResult{
286294
FilePath: frecord.fileName,
287-
MergeResult: mr,
295+
MergeResult: mr}:
288296
}
289297
}
290298
return nil
@@ -310,18 +318,23 @@ func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan F
310318
targetFile = curTargetFile
311319
}
312320
if curTargetFile != targetFile {
313-
frecordChan <- FileRecords{
321+
select {
322+
case <-ctx.Done():
323+
return
324+
case frecordChan <- FileRecords{
314325
fileName: targetFile,
315-
records: records,
326+
records: records}:
316327
}
317328
records = nil
318329
targetFile = curTargetFile
319330
}
320331
records = append(records, record)
321332
}
322-
frecordChan <- FileRecords{
333+
select {
334+
case <-ctx.Done():
335+
case frecordChan <- FileRecords{
323336
fileName: targetFile,
324-
records: records,
337+
records: records}:
325338
}
326339
}()
327340
return frecordChan

pkg/covermerger/covermerger_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
246246
doneCh <- true
247247
}()
248248
assert.NoError(t, MergeCSVData(
249+
context.Background(),
249250
testConfig(test.baseRepo, test.baseCommit, test.workdir),
250251
strings.NewReader(test.bqTable),
251252
mergeResultsCh))

0 commit comments

Comments
 (0)