Skip to content

Commit d7c57a8

Browse files
committed
fix: batch sqlite store IO to better handle large deletes in migrations
1 parent 1775862 commit d7c57a8

File tree

4 files changed

+175
-39
lines changed

4 files changed

+175
-39
lines changed

internal/ioutil/iobatching.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package ioutil
2+
3+
const DefaultBatchSize = 512
4+
5+
func Batchify[T any](items []T, batchSize int) [][]T {
6+
var batches [][]T
7+
for i := 0; i < len(items); i += batchSize {
8+
end := i + batchSize
9+
if end > len(items) {
10+
end = len(items)
11+
}
12+
batches = append(batches, items[i:end])
13+
}
14+
return batches
15+
}

internal/oplog/migrations.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
v1 "github.com/garethgeorge/backrest/gen/go/v1"
7+
"github.com/garethgeorge/backrest/internal/ioutil"
78
"go.uber.org/zap"
89
"google.golang.org/protobuf/proto"
910
)
@@ -120,8 +121,13 @@ func migration003DeduplicateIndexedSnapshots(oplog *OpLog) error {
120121
if len(deleteIDs) == 0 {
121122
return nil
122123
}
123-
_, err := oplog.store.Delete(deleteIDs...)
124-
return err
124+
125+
for _, batch := range ioutil.Batchify(deleteIDs, ioutil.DefaultBatchSize) {
126+
if _, err := oplog.store.Delete(batch...); err != nil {
127+
return err
128+
}
129+
}
130+
return nil
125131
}
126132

127133
// migrationNoop is a migration that does nothing; replaces deprecated migrations.

internal/oplog/sqlitestore/sqlitestore.go

+56-37
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
v1 "github.com/garethgeorge/backrest/gen/go/v1"
1515
"github.com/garethgeorge/backrest/internal/cryptoutil"
16+
"github.com/garethgeorge/backrest/internal/ioutil"
1617
"github.com/garethgeorge/backrest/internal/oplog"
1718
"github.com/garethgeorge/backrest/internal/protoutil"
1819
lru "github.com/hashicorp/golang-lru/v2"
@@ -490,50 +491,68 @@ func (m *SqliteStore) Delete(opID ...int64) ([]*v1.Operation, error) {
490491

491492
ops := make([]*v1.Operation, 0, len(opID))
492493
return ops, withImmediateSqliteTransaction(conn, func() error {
493-
// fetch all the operations we're about to delete
494-
predicate := []string{"operations.id IN ("}
495-
args := []any{}
496-
for i, id := range opID {
497-
if i > 0 {
498-
predicate = append(predicate, ",")
494+
for _, batch := range ioutil.Batchify(opID, ioutil.DefaultBatchSize) {
495+
// Optimize for the case of 1 element or batch size elements (which will be common)
496+
useTransient := len(batch) != ioutil.DefaultBatchSize || len(batch) == 1
497+
batchOps, err := m.deleteHelper(conn, useTransient, batch...)
498+
if err != nil {
499+
return err
499500
}
500-
predicate = append(predicate, "?")
501-
args = append(args, id)
501+
ops = append(ops, batchOps...)
502502
}
503-
predicate = append(predicate, ")")
504-
predicateStr := strings.Join(predicate, "")
505-
506-
if err := sqlitex.ExecuteTransient(conn, "SELECT operations.operation FROM operations JOIN operation_groups ON operations.ogid = operation_groups.ogid WHERE "+predicateStr, &sqlitex.ExecOptions{
507-
Args: args,
508-
ResultFunc: func(stmt *sqlite.Stmt) error {
509-
opBytes := make([]byte, stmt.ColumnLen(0))
510-
n := stmt.GetBytes("operation", opBytes)
511-
opBytes = opBytes[:n]
503+
return nil
504+
})
505+
}
512506

513-
var op v1.Operation
514-
if err := proto.Unmarshal(opBytes, &op); err != nil {
515-
return fmt.Errorf("unmarshal operation bytes: %v", err)
516-
}
517-
ops = append(ops, &op)
518-
return nil
519-
},
520-
}); err != nil {
521-
return fmt.Errorf("load operations for delete: %v", err)
507+
func (m *SqliteStore) deleteHelper(conn *sqlite.Conn, transient bool, opID ...int64) ([]*v1.Operation, error) {
508+
// fetch all the operations we're about to delete
509+
predicate := []string{"operations.id IN ("}
510+
args := []any{}
511+
for i, id := range opID {
512+
if i > 0 {
513+
predicate = append(predicate, ",")
522514
}
515+
predicate = append(predicate, "?")
516+
args = append(args, id)
517+
}
518+
predicate = append(predicate, ")")
519+
predicateStr := strings.Join(predicate, "")
523520

524-
if len(ops) != len(opID) {
525-
return fmt.Errorf("couldn't find all operations to delete: %w", oplog.ErrNotExist)
526-
}
521+
var ops []*v1.Operation
522+
if err := sqlitex.ExecuteTransient(conn, "SELECT operations.operation FROM operations JOIN operation_groups ON operations.ogid = operation_groups.ogid WHERE "+predicateStr, &sqlitex.ExecOptions{
523+
Args: args,
524+
ResultFunc: func(stmt *sqlite.Stmt) error {
525+
opBytes := make([]byte, stmt.ColumnLen(0))
526+
n := stmt.GetBytes("operation", opBytes)
527+
opBytes = opBytes[:n]
527528

528-
// Delete the operations
529-
if err := sqlitex.ExecuteTransient(conn, "DELETE FROM operations WHERE "+predicateStr, &sqlitex.ExecOptions{
530-
Args: args,
531-
}); err != nil {
532-
return fmt.Errorf("delete operations: %v", err)
533-
}
529+
var op v1.Operation
530+
if err := proto.Unmarshal(opBytes, &op); err != nil {
531+
return fmt.Errorf("unmarshal operation bytes: %v", err)
532+
}
533+
ops = append(ops, &op)
534+
return nil
535+
},
536+
}); err != nil {
537+
return nil, fmt.Errorf("load operations for delete: %v", err)
538+
}
534539

535-
return nil
536-
})
540+
if len(ops) != len(opID) {
541+
return nil, fmt.Errorf("couldn't find all operations to delete: %w", oplog.ErrNotExist)
542+
}
543+
544+
// Delete the operations
545+
execFunc := sqlitex.Execute
546+
if transient {
547+
execFunc = sqlitex.ExecuteTransient
548+
}
549+
if err := execFunc(conn, "DELETE FROM operations WHERE "+predicateStr, &sqlitex.ExecOptions{
550+
Args: args,
551+
}); err != nil {
552+
return nil, fmt.Errorf("delete operations: %v", err)
553+
}
554+
555+
return ops, nil
537556
}
538557

539558
func (m *SqliteStore) ResetForTest(t *testing.T) error {

internal/oplog/storetests/storecontract_test.go

+96
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,102 @@ func TestTransform(t *testing.T) {
651651
}
652652
}
653653

654+
func TestDelete(t *testing.T) {
655+
t.Parallel()
656+
for name, store := range StoresForTest(t) {
657+
t.Run(name, func(t *testing.T) {
658+
log, err := oplog.NewOpLog(store)
659+
if err != nil {
660+
t.Fatalf("error creating oplog: %v", err)
661+
}
662+
663+
op := &v1.Operation{
664+
UnixTimeStartMs: 1234,
665+
PlanId: "plan1",
666+
RepoId: "repo1",
667+
RepoGuid: "repo1",
668+
InstanceId: "instance1",
669+
Op: &v1.Operation_OperationBackup{},
670+
}
671+
672+
if err := log.Add(op); err != nil {
673+
t.Fatalf("error adding operation: %s", err)
674+
}
675+
676+
if err := log.Delete(op.Id); err != nil {
677+
t.Fatalf("error deleting operation: %s", err)
678+
}
679+
680+
var ops []*v1.Operation
681+
if err := log.Query(oplog.Query{}, func(op *v1.Operation) error {
682+
ops = append(ops, op)
683+
return nil
684+
}); err != nil {
685+
t.Fatalf("error querying operations: %s", err)
686+
}
687+
688+
if len(ops) != 0 {
689+
t.Errorf("expected 0 operations after deletion, got %d", len(ops))
690+
}
691+
})
692+
}
693+
}
694+
695+
func TestBulkDelete(t *testing.T) {
696+
t.Parallel()
697+
for name, store := range StoresForTest(t) {
698+
t.Run(name, func(t *testing.T) {
699+
log, err := oplog.NewOpLog(store)
700+
if err != nil {
701+
t.Fatalf("error creating oplog: %v", err)
702+
}
703+
704+
// Add 2000 operations
705+
var ops []*v1.Operation
706+
for i := 0; i < 2000; i++ {
707+
op := &v1.Operation{
708+
UnixTimeStartMs: 1234,
709+
PlanId: fmt.Sprintf("plan%d", i),
710+
RepoId: fmt.Sprintf("repo%d", i),
711+
RepoGuid: fmt.Sprintf("repo%d", i),
712+
InstanceId: fmt.Sprintf("instance%d", i),
713+
Op: &v1.Operation_OperationBackup{},
714+
}
715+
ops = append(ops, op)
716+
}
717+
718+
var ids []int64
719+
if err := log.Add(ops...); err != nil {
720+
t.Fatalf("error adding operations: %s", err)
721+
}
722+
for _, op := range ops {
723+
ids = append(ids, op.Id)
724+
}
725+
726+
// Delete all operations
727+
err = log.Delete(ids...)
728+
if err != nil {
729+
t.Fatalf("error deleting operations: %s", err)
730+
}
731+
if len(ids) != 2000 {
732+
t.Errorf("expected 2000 deleted operations, got %d", len(ids))
733+
}
734+
735+
// Verify deletion
736+
var count int
737+
if err := log.Query(oplog.Query{}, func(op *v1.Operation) error {
738+
count++
739+
return nil
740+
}); err != nil {
741+
t.Fatalf("error querying operations: %s", err)
742+
}
743+
if count != 0 {
744+
t.Errorf("expected 0 operations after deletion, got %d", count)
745+
}
746+
})
747+
}
748+
}
749+
654750
func TestQueryMetadata(t *testing.T) {
655751
t.Parallel()
656752
for name, store := range StoresForTest(t) {

0 commit comments

Comments
 (0)