Skip to content

Commit 9fb9740

Browse files
authored
Merge pull request #2322 from jzelinskie/server-side-loop-repair
internal/datastore/pg: server-side loop for repair
2 parents eb94c2f + 60a3e1b commit 9fb9740

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

internal/datastore/postgres/postgres.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ const (
6060
gcBatchDeleteSize = 1000
6161

6262
primaryInstanceID = -1
63+
64+
// The number of loop iterations where pg_current_xact_id is called per
65+
// query to the database during a repair.
66+
//
67+
// This value is ideally dependent on the underlying hardware, but 1M seems
68+
// to be a reasonable starting place.
69+
repairBatchSize = 1_000_000
6370
)
6471

6572
var livingTupleConstraints = []string{"uq_relation_tuple_living_xid", "pk_relation_tuple"}
@@ -531,8 +538,6 @@ func (pgd *pgDatastore) Repair(ctx context.Context, operationName string, output
531538
}
532539
}
533540

534-
const batchSize = 10000
535-
536541
func (pgd *pgDatastore) repairTransactionIDs(ctx context.Context, outputProgress bool) error {
537542
conn, err := pgx.Connect(ctx, pgd.dburl)
538543
if err != nil {
@@ -569,17 +574,8 @@ func (pgd *pgDatastore) repairTransactionIDs(ctx context.Context, outputProgress
569574
}
570575

571576
for i := 0; i < counterDelta; i++ {
572-
var batch pgx.Batch
573-
574-
batchCount := min(batchSize, counterDelta-i)
575-
for j := 0; j < batchCount; j++ {
576-
batch.Queue("begin;")
577-
batch.Queue("select pg_current_xact_id();")
578-
batch.Queue("rollback;")
579-
}
580-
581-
br := conn.SendBatch(ctx, &batch)
582-
if err := br.Close(); err != nil {
577+
batchCount := min(repairBatchSize, counterDelta-i)
578+
if _, err := conn.Exec(ctx, queryLoopXactID(batchCount)); err != nil {
583579
return err
584580
}
585581

@@ -601,6 +597,17 @@ func (pgd *pgDatastore) repairTransactionIDs(ctx context.Context, outputProgress
601597
return nil
602598
}
603599

600+
// queryLoopXactID performs pg_current_xact_id() in a server-side loop in the
601+
// database in order to increment the xact_id.
602+
func queryLoopXactID(batchSize int) string {
603+
return fmt.Sprintf(`DO $$
604+
BEGIN
605+
FOR i IN 1..%d LOOP
606+
PERFORM pg_current_xact_id(); ROLLBACK;
607+
END LOOP;
608+
END $$;`, batchSize)
609+
}
610+
604611
// RepairOperations returns the available repair operations for the datastore.
605612
func (pgd *pgDatastore) RepairOperations() []datastore.RepairOperation {
606613
return []datastore.RepairOperation{

0 commit comments

Comments
 (0)