Skip to content

Commit be34127

Browse files
committed
dashboard/app: test flusher
1 parent 6e50d07 commit be34127

File tree

2 files changed

+34
-6
lines changed

2 files changed

+34
-6
lines changed

dashboard/app/batch_coverage.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"net/http"
1010
"strconv"
11+
"time"
1112

1213
"cloud.google.com/go/batch/apiv1/batchpb"
1314
"cloud.google.com/go/bigquery"
@@ -152,9 +153,29 @@ func nsDataAvailable(ctx context.Context, ns string) ([]coveragedb.TimePeriod, [
152153
return periods, recordsCount, nil
153154
}
154155

156+
func autoFlush(ctx context.Context, w http.ResponseWriter) {
157+
go func() {
158+
ticker := time.NewTicker(time.Second)
159+
defer ticker.Stop()
160+
for {
161+
select {
162+
case <-ticker.C:
163+
if f, ok := w.(http.Flusher); ok {
164+
f.Flush()
165+
}
166+
case <-ctx.Done():
167+
return
168+
}
169+
}
170+
}()
171+
}
172+
155173
func handleBatchCoverageClean(w http.ResponseWriter, r *http.Request) {
156-
ctx := context.Background()
157-
totalDeleted, err := coveragedb.DeleteGarbage(ctx)
174+
ctx, cancel := context.WithCancel(appengine.NewContext(r))
175+
defer cancel()
176+
autoFlush(ctx, w)
177+
w.Write([]byte("start\n"))
178+
totalDeleted, err := coveragedb.DeleteGarbage(ctx, w)
158179
if err != nil {
159180
errMsg := fmt.Sprintf("failed to coveragedb.DeleteGarbage: %s", err.Error())
160181
log.Errorf(ctx, "%s", errMsg)

pkg/coveragedb/spanner.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package coveragedb
66
import (
77
"context"
88
"fmt"
9+
"io"
910
"os"
1011
"sync/atomic"
1112
"time"
@@ -260,7 +261,7 @@ func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []in
260261
// Note that in case of an error during batch deletion, some files may be deleted but not counted in the total.
261262
//
262263
// Returns the number of orphaned file entries successfully deleted.
263-
func DeleteGarbage(ctx context.Context) (int64, error) {
264+
func DeleteGarbage(ctx context.Context, w io.Writer) (int64, error) {
264265
batchSize := 10_000
265266
client, err := NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
266267
if err != nil {
@@ -277,6 +278,7 @@ func DeleteGarbage(ctx context.Context) (int64, error) {
277278
WHERE merge_history.session = files.session
278279
)`})
279280
defer iter.Stop()
281+
w.Write([]byte("reading session+filepath...\n"))
280282

281283
var totalDeleted atomic.Int64
282284
eg, _ := errgroup.WithContext(ctx)
@@ -298,26 +300,31 @@ func DeleteGarbage(ctx context.Context) (int64, error) {
298300
}
299301
batch = append(batch, spanner.Key{r.Session, r.Filepath})
300302
if len(batch) > batchSize {
301-
goSpannerDelete(ctx, batch, eg, client, &totalDeleted)
303+
goSpannerDelete(ctx, batch, eg, client, &totalDeleted, w)
302304
batch = nil
303305
}
304306
}
305-
goSpannerDelete(ctx, batch, eg, client, &totalDeleted)
307+
goSpannerDelete(ctx, batch, eg, client, &totalDeleted, w)
308+
w.Write([]byte("waiting all deletion...\n"))
306309
if err = eg.Wait(); err != nil {
307310
return 0, fmt.Errorf("spanner.Delete: %w", err)
308311
}
312+
w.Write([]byte("deleteions done\n"))
309313
return totalDeleted.Load(), nil
310314
}
311315

312316
func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client *spanner.Client,
313-
totalDeleted *atomic.Int64) {
317+
totalDeleted *atomic.Int64, w io.Writer) {
314318
ks := spanner.KeySetFromKeys(batch...)
315319
ksSize := len(batch)
320+
w.Write([]byte(fmt.Sprintf("deleting %d records\n", ksSize)))
316321
eg.Go(func() error {
317322
mutation := spanner.Delete("files", ks)
318323
_, err := client.Apply(ctx, []*spanner.Mutation{mutation})
319324
if err == nil {
320325
totalDeleted.Add(int64(ksSize))
326+
} else {
327+
w.Write([]byte(fmt.Sprintf("err deleting records: %v\n", err.Error())))
321328
}
322329
return err
323330
})

0 commit comments

Comments
 (0)