Skip to content

Commit a8281b6

Browse files
authored
br: fix table filter nil pointer issue (pingcap#61225)
close pingcap#61226
1 parent 32309b9 commit a8281b6

File tree

3 files changed

+129
-19
lines changed

3 files changed

+129
-19
lines changed

br/pkg/restore/log_client/batch_meta_processor.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
6060
ctx context.Context,
6161
hasExplicitFilter bool,
6262
files []*backuppb.DataFileInfo,
63+
schemasReplace *stream.SchemasReplace,
6364
) error {
6465
// starts gc row collector
6566
rp.client.RunGCRowsLoader(ctx)
@@ -89,7 +90,12 @@ func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
8990
return errors.Trace(err)
9091
}
9192
} else {
92-
log.Info("skip doing full reload for filtered PiTR")
93+
// refresh metadata will sync data from TiKV to info schema one table at a time.
94+
// this must succeed to ensure schema consistency
95+
log.Info("refreshing schema meta")
96+
if err := rp.client.RefreshMetaForTables(ctx, schemasReplace); err != nil {
97+
return errors.Trace(err)
98+
}
9399
}
94100
return nil
95101
}

br/pkg/task/stream.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1579,7 +1579,7 @@ func restoreStream(
15791579
var rp *logclient.RestoreMetaKVProcessor
15801580
if err = glue.WithProgress(ctx, g, "Restore Meta Files", int64(len(ddlFiles)), !cfg.LogProgress, func(p glue.Progress) error {
15811581
rp = logclient.NewRestoreMetaKVProcessor(client, schemasReplace, updateStats, p.Inc)
1582-
return rp.RestoreAndRewriteMetaKVFiles(ctx, cfg.ExplicitFilter, ddlFiles)
1582+
return rp.RestoreAndRewriteMetaKVFiles(ctx, cfg.ExplicitFilter, ddlFiles, schemasReplace)
15831583
}); err != nil {
15841584
return errors.Annotate(err, "failed to restore meta files")
15851585
}
@@ -1663,12 +1663,6 @@ func restoreStream(
16631663
}
16641664

16651665
if cfg.ExplicitFilter {
1666-
// refresh metadata will sync data from TiKV to info schema one table at a time.
1667-
// this must succeed to ensure schema consistency
1668-
if err = client.RefreshMetaForTables(ctx, schemasReplace); err != nil {
1669-
return errors.Trace(err)
1670-
}
1671-
16721666
failpoint.Inject("before-set-table-mode-to-normal", func(_ failpoint.Value) {
16731667
failpoint.Return(errors.New("fail before setting table mode to normal"))
16741668
})
@@ -2131,7 +2125,9 @@ func buildAndSaveIDMapIfNeeded(ctx context.Context, client *logclient.LogClient,
21312125

21322126
// either getting base id map from previous pitr or this is a new task and get base map from snapshot restore phase
21332127
// do filter
2134-
cfg.tableMappingManager.ApplyFilterToDBReplaceMap(cfg.PiTRTableTracker)
2128+
if cfg.PiTRTableTracker != nil {
2129+
cfg.tableMappingManager.ApplyFilterToDBReplaceMap(cfg.PiTRTableTracker)
2130+
}
21352131
// replace temp id with read global id
21362132
err = cfg.tableMappingManager.ReplaceTemporaryIDs(ctx, client.GenGlobalIDs)
21372133
if err != nil {

br/tests/br_pitr_table_filter/run.sh

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,16 +1553,124 @@ test_log_compaction() {
15531553
echo "log compaction with filter test passed"
15541554
}
15551555

1556-
#test_basic_filter
1557-
#test_with_full_backup_filter
1556+
test_pitr_chaining() {
1557+
restart_services || { echo "Failed to restart services"; exit 1; }
1558+
1559+
echo "case 21: start testing PITR chaining (sequential restores without cleaning up)"
1560+
1561+
run_sql "create schema $DB;"
1562+
1563+
echo "creating tables for initial state..."
1564+
run_sql "CREATE TABLE $DB.table_a (
1565+
id INT PRIMARY KEY,
1566+
value VARCHAR(50)
1567+
);"
1568+
run_sql "CREATE TABLE $DB.table_b (
1569+
id INT PRIMARY KEY,
1570+
value VARCHAR(50)
1571+
);"
1572+
1573+
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"
1574+
1575+
run_sql "INSERT INTO $DB.table_a VALUES (1, 'initial data 1'), (2, 'initial data 2');"
1576+
run_sql "INSERT INTO $DB.table_b VALUES (1, 'initial data 1'), (2, 'initial data 2');"
1577+
1578+
run_br backup full -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR
1579+
1580+
run_sql "INSERT INTO $DB.table_a VALUES (3, 'post-backup data 1');"
1581+
run_sql "INSERT INTO $DB.table_b VALUES (3, 'post-backup data 1');"
1582+
1583+
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME"
1584+
first_restore_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)")
1585+
echo "Captured first checkpoint timestamp: $first_restore_ts"
1586+
1587+
run_sql "INSERT INTO $DB.table_a VALUES (4, 'post-first-checkpoint data');"
1588+
run_sql "INSERT INTO $DB.table_b VALUES (4, 'post-first-checkpoint data');"
1589+
1590+
run_sql "CREATE TABLE $DB.table_c (
1591+
id INT PRIMARY KEY,
1592+
value VARCHAR(50)
1593+
);"
1594+
run_sql "INSERT INTO $DB.table_c VALUES (1, 'created after first checkpoint');"
1595+
1596+
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME"
1597+
1598+
run_br --pd $PD_ADDR log stop --task-name $TASK_NAME
1599+
1600+
run_sql "drop schema if exists $DB;"
1601+
1602+
echo "Step 1: First restore with full backup to first checkpoint timestamp"
1603+
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \
1604+
--full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" \
1605+
--restored-ts $first_restore_ts \
1606+
-f "$DB.*"
1607+
1608+
run_sql "SELECT COUNT(*) = 3 FROM $DB.table_a" || {
1609+
echo "table_a doesn't have expected row count after first restore"
1610+
exit 1
1611+
}
1612+
1613+
run_sql "SELECT COUNT(*) = 3 FROM $DB.table_b" || {
1614+
echo "table_b doesn't have expected row count after first restore"
1615+
exit 1
1616+
}
1617+
1618+
if run_sql "SELECT * FROM $DB.table_c" 2>/dev/null; then
1619+
echo "table_c exists after first restore but shouldn't"
1620+
exit 1
1621+
fi
1622+
1623+
echo "Step 2: Second restore with log only using first checkpoint timestamp as startTS"
1624+
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" \
1625+
--start-ts $first_restore_ts \
1626+
-f "$DB.*"
1627+
1628+
# Verify data after second restore
1629+
run_sql "SELECT COUNT(*) = 4 FROM $DB.table_a" || {
1630+
echo "table_a doesn't have expected row count after second restore"
1631+
exit 1
1632+
}
1633+
1634+
run_sql "SELECT COUNT(*) = 4 FROM $DB.table_b" || {
1635+
echo "table_b doesn't have expected row count after second restore"
1636+
exit 1
1637+
}
1638+
1639+
run_sql "SELECT COUNT(*) = 1 FROM $DB.table_c" || {
1640+
echo "table_c doesn't have expected row count after second restore"
1641+
exit 1
1642+
}
1643+
1644+
# make sure able to write data after restore
1645+
run_sql "INSERT INTO $DB.table_a VALUES (5, 'post-second-checkpoint data');"
1646+
run_sql "CREATE TABLE $DB.table_d (
1647+
id INT PRIMARY KEY,
1648+
value VARCHAR(50)
1649+
);"
1650+
run_sql "INSERT INTO $DB.table_d VALUES (1, 'created after second checkpoint');"
1651+
1652+
verify_no_unexpected_tables 4 "$DB" || {
1653+
echo "Wrong number of tables after all restores"
1654+
exit 1
1655+
}
1656+
1657+
run_sql "drop schema if exists $DB;"
1658+
rm -rf "$TEST_DIR/$TASK_NAME"
1659+
1660+
echo "PITR sequential restore test passed"
1661+
}
1662+
1663+
test_basic_filter
1664+
test_with_full_backup_filter
15581665
test_table_rename
1559-
#test_with_checkpoint
1560-
#test_partition_exchange
1561-
#test_system_tables
1562-
#test_foreign_keys
1563-
#test_index_filter
1564-
#test_table_truncation
1565-
#test_sequential_restore
1566-
#test_log_compaction
1666+
test_with_checkpoint
1667+
test_partition_exchange
1668+
test_system_tables
1669+
test_foreign_keys
1670+
test_index_filter
1671+
test_table_truncation
1672+
test_sequential_restore
1673+
test_log_compaction
1674+
test_pitr_chaining
15671675

15681676
echo "br pitr table filter all tests passed"

0 commit comments

Comments
 (0)