@@ -7,13 +7,15 @@ import (
77 "context"
88 "fmt"
99 "os"
10+ "sync/atomic"
1011 "time"
1112
1213 "cloud.google.com/go/civil"
1314 "cloud.google.com/go/spanner"
1415 "github.com/google/syzkaller/pkg/subsystem"
1516 _ "github.com/google/syzkaller/pkg/subsystem/lists"
1617 "github.com/google/uuid"
18+ "golang.org/x/sync/errgroup"
1719 "google.golang.org/api/iterator"
1820)
1921
@@ -248,3 +250,75 @@ func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []in
248250 }
249251 return periods , totalRows , nil
250252}
253+
254+ // DeleteGarbage removes orphaned file entries from the database.
255+ //
256+ // It identifies files in the "files" table that are not referenced by any entries in the "merge_history" table,
257+ // indicating they are no longer associated with an active merge session.
258+ //
259+ // To avoid exceeding Spanner transaction limits, orphaned files are deleted in batches of 10,000.
260+ // Note that in case of an error during batch deletion, some files may be deleted but not counted in the total.
261+ //
262+ // Returns the number of orphaned file entries successfully deleted.
263+ func DeleteGarbage (ctx context.Context ) (int64 , error ) {
264+ batchSize := 10_000
265+ client , err := NewClient (ctx , os .Getenv ("GOOGLE_CLOUD_PROJECT" ))
266+ if err != nil {
267+ return 0 , fmt .Errorf ("coveragedb.NewClient: %w" , err )
268+ }
269+ defer client .Close ()
270+
271+ iter := client .Single ().Query (ctx , spanner.Statement {
272+ SQL : `SELECT session, filepath
273+ FROM files
274+ WHERE NOT EXISTS (
275+ SELECT 1
276+ FROM merge_history
277+ WHERE merge_history.session = files.session
278+ )` })
279+ defer iter .Stop ()
280+
281+ var totalDeleted atomic.Int64
282+ eg , _ := errgroup .WithContext (ctx )
283+ var batch []spanner.Key
284+ for {
285+ row , err := iter .Next ()
286+ if err == iterator .Done {
287+ break
288+ }
289+ if err != nil {
290+ return 0 , fmt .Errorf ("iter.Next: %w" , err )
291+ }
292+ var r struct {
293+ Session string
294+ Filepath string
295+ }
296+ if err = row .ToStruct (& r ); err != nil {
297+ return 0 , fmt .Errorf ("row.ToStruct: %w" , err )
298+ }
299+ batch = append (batch , spanner.Key {r .Session , r .Filepath })
300+ if len (batch ) > batchSize {
301+ goSpannerDelete (ctx , batch , eg , client , & totalDeleted )
302+ batch = nil
303+ }
304+ }
305+ goSpannerDelete (ctx , batch , eg , client , & totalDeleted )
306+ if err = eg .Wait (); err != nil {
307+ return 0 , fmt .Errorf ("spanner.Delete: %w" , err )
308+ }
309+ return totalDeleted .Load (), nil
310+ }
311+
312+ func goSpannerDelete (ctx context.Context , batch []spanner.Key , eg * errgroup.Group , client * spanner.Client ,
313+ totalDeleted * atomic.Int64 ) {
314+ ks := spanner .KeySetFromKeys (batch ... )
315+ ksSize := len (batch )
316+ eg .Go (func () error {
317+ mutation := spanner .Delete ("files" , ks )
318+ _ , err := client .Apply (ctx , []* spanner.Mutation {mutation })
319+ if err == nil {
320+ totalDeleted .Add (int64 (ksSize ))
321+ }
322+ return err
323+ })
324+ }
0 commit comments