Skip to content

Commit 0169a21

Browse files
committed
dashboard/app: test flusher
1 parent 6e50d07 commit 0169a21

File tree

2 files changed

+29
-7
lines changed

2 files changed

+29
-7
lines changed

dashboard/app/batch_coverage.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,24 @@ func nsDataAvailable(ctx context.Context, ns string) ([]coveragedb.TimePeriod, [
152152
return periods, recordsCount, nil
153153
}
154154

155-
func handleBatchCoverageClean(w http.ResponseWriter, r *http.Request) {
155+
type writerFlusher struct {
156+
w http.ResponseWriter
157+
}
158+
159+
func (wf writerFlusher) Write(b []byte) (int, error) {
160+
n, err := wf.w.Write(b)
161+
if f, ok := wf.w.(http.Flusher); ok {
162+
f.Flush()
163+
}
164+
return n, err
165+
}
166+
167+
func handleBatchCoverageClean(rw http.ResponseWriter, r *http.Request) {
156168
ctx := context.Background()
157-
totalDeleted, err := coveragedb.DeleteGarbage(ctx)
169+
w := writerFlusher{w: rw}
170+
171+
w.Write([]byte("start\n"))
172+
totalDeleted, err := coveragedb.DeleteGarbage(ctx, w)
158173
if err != nil {
159174
errMsg := fmt.Sprintf("failed to coveragedb.DeleteGarbage: %s", err.Error())
160175
log.Errorf(ctx, "%s", errMsg)

pkg/coveragedb/spanner.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package coveragedb
66
import (
77
"context"
88
"fmt"
9+
"io"
910
"os"
1011
"sync/atomic"
1112
"time"
@@ -260,8 +261,8 @@ func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []in
260261
// Note that in case of an error during batch deletion, some files may be deleted but not counted in the total.
261262
//
262263
// Returns the number of orphaned file entries successfully deleted.
263-
func DeleteGarbage(ctx context.Context) (int64, error) {
264-
batchSize := 10_000
264+
func DeleteGarbage(ctx context.Context, w io.Writer) (int64, error) {
265+
batchSize := 1_000
265266
client, err := NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
266267
if err != nil {
267268
return 0, fmt.Errorf("coveragedb.NewClient: %w", err)
@@ -277,6 +278,7 @@ func DeleteGarbage(ctx context.Context) (int64, error) {
277278
WHERE merge_history.session = files.session
278279
)`})
279280
defer iter.Stop()
281+
w.Write([]byte("reading session+filepath...\n"))
280282

281283
var totalDeleted atomic.Int64
282284
eg, _ := errgroup.WithContext(ctx)
@@ -298,26 +300,31 @@ func DeleteGarbage(ctx context.Context) (int64, error) {
298300
}
299301
batch = append(batch, spanner.Key{r.Session, r.Filepath})
300302
if len(batch) > batchSize {
301-
goSpannerDelete(ctx, batch, eg, client, &totalDeleted)
303+
goSpannerDelete(ctx, batch, eg, client, &totalDeleted, w)
302304
batch = nil
303305
}
304306
}
305-
goSpannerDelete(ctx, batch, eg, client, &totalDeleted)
307+
goSpannerDelete(ctx, batch, eg, client, &totalDeleted, w)
308+
w.Write([]byte("waiting all deletion...\n"))
306309
if err = eg.Wait(); err != nil {
307310
return 0, fmt.Errorf("spanner.Delete: %w", err)
308311
}
312+
w.Write([]byte("deleteions done\n"))
309313
return totalDeleted.Load(), nil
310314
}
311315

312316
func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client *spanner.Client,
313-
totalDeleted *atomic.Int64) {
317+
totalDeleted *atomic.Int64, w io.Writer) {
314318
ks := spanner.KeySetFromKeys(batch...)
315319
ksSize := len(batch)
320+
w.Write([]byte(fmt.Sprintf("deleting %d records\n", ksSize)))
316321
eg.Go(func() error {
317322
mutation := spanner.Delete("files", ks)
318323
_, err := client.Apply(ctx, []*spanner.Mutation{mutation})
319324
if err == nil {
320325
totalDeleted.Add(int64(ksSize))
326+
} else {
327+
w.Write([]byte(fmt.Sprintf("err deleting records: %v\n", err.Error())))
321328
}
322329
return err
323330
})

0 commit comments

Comments
 (0)