Skip to content

Commit d0fb962

Browse files
Alexandra Congclaude
andcommitted
fix: apply sink routing to new table ID after TRUNCATE TABLE
For TRUNCATE TABLE, job.TableID is the OLD table ID which no longer exists in the snapshot after the DDL is applied. The snapshot only contains the NEW table with a different ID from job.BinlogInfo.TableInfo.ID. This fix ensures sink routing is applied to the correct (new) table ID so that DML events after TRUNCATE are routed to the correct destination. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 769181d commit d0fb962

File tree

4 files changed

+95
-4
lines changed

4 files changed

+95
-4
lines changed

cdc/entry/schema_storage.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
244244
// Apply sink routing to the TableInfo if sinkRouter is configured
245245
// This ensures DML events get the routed schema/table for SQL generation
246246
if s.sinkRouter != nil && job.TableID > 0 {
247-
applySinkRoutingToTable(snap, job.TableID, s.sinkRouter)
247+
tableIDForRouting := job.TableID
248+
// For TRUNCATE TABLE, job.TableID is the OLD table ID which no longer exists
249+
// in the snapshot. We need to use the NEW table ID from BinlogInfo.TableInfo.
250+
if job.Type == timodel.ActionTruncateTable && job.BinlogInfo.TableInfo != nil {
251+
tableIDForRouting = job.BinlogInfo.TableInfo.ID
252+
}
253+
applySinkRoutingToTable(snap, tableIDForRouting, s.sinkRouter)
248254
}
249255

250256
s.snaps = append(s.snaps, snap)

cdc/entry/schema_storage_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,4 +1162,41 @@ func TestSchemaStorageWithSinkRouting(t *testing.T) {
11621162
require.True(t, ok)
11631163
require.Equal(t, "unmapped_db", t5.TableName.Schema)
11641164
require.Equal(t, "", t5.TableName.TargetSchema)
1165+
1166+
// Test TRUNCATE TABLE - this creates a new table with a new ID
1167+
// The new table should also have routing applied
1168+
tk.MustExec("truncate table source_db1.t4")
1169+
1170+
jobs, err = getAllHistoryDDLJob(store, f)
1171+
require.Nil(t, err)
1172+
1173+
var truncateT4Job *timodel.Job
1174+
for i := len(jobs) - 1; i >= 0; i-- {
1175+
if jobs[i].Type == timodel.ActionTruncateTable && jobs[i].SchemaName == "source_db1" && jobs[i].TableName == "t4" {
1176+
truncateT4Job = jobs[i]
1177+
break
1178+
}
1179+
}
1180+
require.NotNil(t, truncateT4Job)
1181+
1182+
// Note: For TRUNCATE, job.TableID is the OLD table ID, and job.BinlogInfo.TableInfo.ID is the NEW table ID
1183+
oldTableID := truncateT4Job.TableID
1184+
newTableID := truncateT4Job.BinlogInfo.TableInfo.ID
1185+
require.NotEqual(t, oldTableID, newTableID, "TRUNCATE should create a new table ID")
1186+
1187+
err = schemaStorage.HandleDDLJob(truncateT4Job)
1188+
require.NoError(t, err)
1189+
1190+
newSnap3, err := schemaStorage.GetSnapshot(ctx, truncateT4Job.BinlogInfo.FinishedTS)
1191+
require.NoError(t, err)
1192+
1193+
// The old table ID should not exist anymore
1194+
_, ok = newSnap3.PhysicalTableByID(oldTableID)
1195+
require.False(t, ok, "old table ID should not exist after truncate")
1196+
1197+
// The new table should exist and have routing applied
1198+
t4New, ok := newSnap3.PhysicalTableByID(newTableID)
1199+
require.True(t, ok, "new table ID should exist after truncate")
1200+
require.Equal(t, "source_db1", t4New.TableName.Schema)
1201+
require.Equal(t, "target_db1", t4New.TableName.TargetSchema, "new table after truncate should have routing applied")
11651202
}

tests/integration_tests/sink_routing/data/test.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ INSERT INTO truncate_test VALUES (2, 'also truncated');
7979

8080
TRUNCATE TABLE truncate_test;
8181

82-
-- Insert new data after truncate
83-
INSERT INTO truncate_test VALUES (10, 'after truncate');
82+
-- NOTE: INSERT after truncate is done separately in run.sh to ensure
83+
-- the TRUNCATE DDL is fully processed before the INSERT arrives.
84+
-- This is necessary to properly test DML routing after TRUNCATE.
8485

8586
-- ============================================
8687
-- DDL: DROP TABLE

tests/integration_tests/sink_routing/run.sh

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,54 @@ function run() {
8989
# ============================================
9090
echo "Verifying TRUNCATE TABLE routing..."
9191
check_table_exists target_db.truncate_test_routed ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
92-
# After truncate, only 1 row should exist (inserted after truncate)
92+
# Wait for TRUNCATE to complete by checking the table is empty
93+
# (the pre-truncate rows should be gone)
94+
echo "Waiting for TRUNCATE to complete..."
95+
i=0
96+
while [ $i -lt 60 ]; do
97+
run_sql "SELECT COUNT(*) as cnt FROM target_db.truncate_test_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
98+
if [ "$(cat $OUT_DIR/sql_res.$TEST_NAME.txt | grep -c 'cnt: 0')" -eq 1 ]; then
99+
echo "TRUNCATE completed, table is empty"
100+
break
101+
fi
102+
echo "Table not empty yet, current state:"
103+
cat $OUT_DIR/sql_res.$TEST_NAME.txt
104+
sleep 1
105+
i=$((i + 1))
106+
done
107+
if [ $i -ge 60 ]; then
108+
echo "Timeout waiting for TRUNCATE to complete"
109+
echo "Final table state:"
110+
run_sql "SELECT * FROM target_db.truncate_test_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
111+
exit 1
112+
fi
113+
114+
# Now insert data AFTER truncate is confirmed complete
115+
# This ensures the INSERT uses the NEW table ID and tests DML routing after TRUNCATE
116+
echo "Inserting data after TRUNCATE..."
117+
run_sql "INSERT INTO source_db.truncate_test VALUES (10, 'after truncate')" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
118+
119+
# Wait for the INSERT to be replicated and verify it arrived at the routed destination
120+
echo "Waiting for INSERT after TRUNCATE to be replicated..."
121+
i=0
122+
while [ $i -lt 60 ]; do
123+
run_sql "SELECT COUNT(*) as cnt FROM target_db.truncate_test_routed WHERE id = 10" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
124+
if [ "$(cat $OUT_DIR/sql_res.$TEST_NAME.txt | grep -c 'cnt: 1')" -eq 1 ]; then
125+
echo "INSERT after TRUNCATE successfully routed"
126+
break
127+
fi
128+
sleep 1
129+
i=$((i + 1))
130+
done
131+
if [ $i -ge 60 ]; then
132+
echo "ERROR: INSERT after TRUNCATE was not routed to target_db.truncate_test_routed"
133+
echo "This indicates DML routing is broken after TRUNCATE TABLE"
134+
run_sql "SELECT COUNT(*) as cnt FROM target_db.truncate_test_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
135+
cat $OUT_DIR/sql_res.$TEST_NAME.txt
136+
exit 1
137+
fi
138+
139+
# Final verification
93140
run_sql "SELECT COUNT(*) as cnt FROM target_db.truncate_test_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
94141
check_contains "cnt: 1"
95142
run_sql "SELECT id FROM target_db.truncate_test_routed" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

0 commit comments

Comments
 (0)