@@ -7,11 +7,13 @@ import (
77 "context"
88 "fmt"
99 "net/http"
10+ "os"
1011 "strconv"
1112
1213 "cloud.google.com/go/batch/apiv1/batchpb"
1314 "cloud.google.com/go/bigquery"
1415 "cloud.google.com/go/civil"
16+ "cloud.google.com/go/spanner"
1517 "github.com/google/syzkaller/pkg/coveragedb"
1618 "google.golang.org/api/iterator"
1719 "google.golang.org/appengine/v2"
@@ -151,3 +153,71 @@ func nsDataAvailable(ctx context.Context, ns string) ([]coveragedb.TimePeriod, [
151153 }
152154 return periods , recordsCount , nil
153155}
156+
157+ // Abandoned DB cleanup is expensive.
158+ // Every namespace creates min 3 sessions every day (yesterday, today, this month).
159+ // It means every day we deprecate 3 sessions records.
160+ // Every session is ~10k file records. Plus some mutations in the index.
161+ // One transaction allows to delete up to 80k rows.
162+ // To clean up everything once/week we have to garbage 7 * 3 * count(namespaces) at least.
163+ func handleBatchCoverageClean (w http.ResponseWriter , r * http.Request ) {
164+ paramSessions := r .FormValue ("sessions" )
165+ if paramSessions == "" {
166+ paramSessions = "3" // one day, one namespace means 3 records
167+ }
168+ maxSessionsToDel , err := strconv .Atoi (paramSessions )
169+ if err != nil {
170+ w .WriteHeader (http .StatusBadRequest )
171+ w .Write ([]byte ("failed to parse 'sessions', integer expected" ))
172+ return
173+ }
174+ ctx := context .Background ()
175+ for i := 0 ; i < maxSessionsToDel ; i ++ {
176+ deletedRows , err := deleteGarbageSession (ctx )
177+ if err != nil {
178+ errMsg := fmt .Sprintf ("failed to cleanCoverageDB: %s" , err .Error ())
179+ log .Errorf (ctx , errMsg )
180+ w .Write ([]byte (errMsg ))
181+ return
182+ }
183+ w .Write ([]byte (fmt .Sprintf ("deleteGarbageSession -> -%d rows\n " , deletedRows )))
184+ }
185+ }
186+
187+ // deleteGarbageSession generates approximately 20k mutations (10k in files and 10k in index)
188+ // Spanner limit for every transaction is 80k mutations.
189+ // It means we can delete up to 4 sessions data at once.
190+ // Let's keep it simple and delete only 1 session at once.
191+ //
192+ // deleteGarbageSession returns the deleted rows count.
193+ // (0, nil) means there is no data to delete.
194+ func deleteGarbageSession (ctx context.Context ) (int64 , error ) {
195+ client , err := coveragedb .NewClient (ctx , os .Getenv ("GOOGLE_CLOUD_PROJECT" ))
196+ if err != nil {
197+ return 0 , fmt .Errorf ("failed to coveragedb.NewClient: %w" , err )
198+ }
199+ defer client .Close ()
200+ var rowCount int64
201+
202+ _ , err = client .ReadWriteTransaction (ctx , func (ctx context.Context , txn * spanner.ReadWriteTransaction ) error {
203+ stmt := spanner.Statement {
204+ SQL : ` delete from files
205+ where files.session in (
206+ select
207+ distinct(files.session)
208+ from files
209+ left join merge_history
210+ on files.session = merge_history.session
211+ where merge_history.session is NULL
212+ limit 1
213+ );` ,
214+ }
215+ var err error
216+ rowCount , err = txn .Update (ctx , stmt )
217+ if err != nil {
218+ return fmt .Errorf ("txn.Update: %w" , err )
219+ }
220+ return nil
221+ })
222+ return rowCount , err
223+ }
0 commit comments