Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,15 +816,31 @@ func (s *snapshot) doDropTable(tbInfo *model.TableInfo, currentTs uint64) {
// truncateTable truncate the table with the given ID, and replace it with a new `tbInfo`.
// NOTE: after a table is truncated:
// - physicalTableByID(id) will return nil;
// - IsTruncateTableID(id) should return true.
// - IsTruncateTableID(physicalTableID) should return true.
func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs uint64) (err error) {
old, ok := s.physicalTableByID(id)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(id)
}
s.doDropTable(old, currentTs)
s.doCreateTable(tbInfo, currentTs)
s.truncatedTables.ReplaceOrInsert(newVersionedID(id, negative(currentTs)))
tag := negative(currentTs)
// when the table is a partition table, we have to record all partition ids
if old.IsPartitionTable() {
newPi := tbInfo.GetPartitionInfo()
oldPi := old.GetPartitionInfo()
newPartitionIDMap := make(map[int64]struct{}, len(newPi.NewPartitionIDs))
for _, partition := range newPi.Definitions {
newPartitionIDMap[partition.ID] = struct{}{}
}
Comment on lines +832 to +835

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block has a potential nil pointer dereference if newPi is nil. Although TRUNCATE TABLE on a partitioned table should result in a new partitioned table, it's safer to handle the case where newPi could be nil for robustness.

Additionally, the map is sized using len(newPi.NewPartitionIDs), but it's populated from newPi.Definitions in the following loop. It would be more correct and robust to size it with len(newPi.Definitions).

newPartitionIDMap := make(map[int64]struct{})
		if newPi != nil {
			newPartitionIDMap = make(map[int64]struct{}, len(newPi.Definitions))
			for _, partition := range newPi.Definitions {
				newPartitionIDMap[partition.ID] = struct{}{}
			}
		}

for _, partition := range oldPi.Definitions {
if _, ok := newPartitionIDMap[partition.ID]; !ok {
s.truncatedTables.ReplaceOrInsert(newVersionedID(partition.ID, tag))
}
}
} else {
s.truncatedTables.ReplaceOrInsert(newVersionedID(id, tag))
}
s.currentTs = currentTs
log.Debug("truncate table success",
zap.String("schema", tbInfo.TableName.Schema),
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/schema/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestTable(t *testing.T) {
require.False(t, ok)
_, ok = snap.PhysicalTableByID(11 + 65536)
require.False(t, ok)
require.True(t, snap.IsTruncateTableID(11))
require.True(t, snap.IsTruncateTableID(11+65536))
_, ok = snap.PhysicalTableByID(12)
require.True(t, ok)
_, ok = snap.PhysicalTableByID(12 + 65536)
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestDrop(t *testing.T) {
require.Equal(t, 1, snap.inner.schemaNameToID.Len())
require.Equal(t, 1, snap.inner.tableNameToID.Len())
require.Equal(t, 1, snap.inner.partitions.Len())
require.Equal(t, 0, snap.inner.truncatedTables.Len())
require.Equal(t, 1, snap.inner.truncatedTables.Len())
require.Equal(t, 2, snap.inner.ineligibleTables.Len())
}

Expand Down
Loading