Skip to content

Commit e124c26

Browse files
committed
maintainer: tolerate bootstrap overlap during merge
1 parent 927581d commit e124c26

File tree

2 files changed

+262
-18
lines changed

2 files changed

+262
-18
lines changed

maintainer/maintainer_controller_bootstrap.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -456,40 +456,40 @@ func addToWorkingTaskMap(
456456
tableSpans.ReplaceOrInsert(span, spanReplication)
457457
}
458458

459-
// findHoles returns an array of Span that are not covered in the range
459+
// findHoles returns the uncovered sub-spans in totalSpan.
460+
//
461+
// Bootstrap snapshots can temporarily contain overlapping spans during in-flight merge recovery:
462+
// for example, source dispatchers in WaitingMerge can coexist with a merged dispatcher in
463+
// Preparing/Initializing. We therefore compute holes from the union of reported coverage instead
464+
// of assuming the input spans are strictly non-overlapping.
460465
func findHoles(currentSpan utils.Map[*heartbeatpb.TableSpan, *replica.SpanReplication], totalSpan *heartbeatpb.TableSpan) []*heartbeatpb.TableSpan {
461-
lastSpan := &heartbeatpb.TableSpan{
462-
TableID: totalSpan.TableID,
463-
StartKey: totalSpan.StartKey,
464-
EndKey: totalSpan.StartKey,
465-
KeyspaceID: totalSpan.KeyspaceID,
466-
}
466+
coveredEnd := totalSpan.StartKey
467467
var holes []*heartbeatpb.TableSpan
468468
// table span is sorted
469469
currentSpan.Ascend(func(current *heartbeatpb.TableSpan, _ *replica.SpanReplication) bool {
470-
ord := bytes.Compare(lastSpan.EndKey, current.StartKey)
471-
if ord < 0 {
470+
// Skip spans that are fully covered by earlier overlaps. This preserves complete table
471+
// coverage without manufacturing holes for legitimate bootstrap overlap.
472+
if bytes.Compare(current.EndKey, coveredEnd) <= 0 {
473+
return true
474+
}
475+
if bytes.Compare(coveredEnd, current.StartKey) < 0 {
472476
// Find a hole.
473477
holes = append(holes, &heartbeatpb.TableSpan{
474478
TableID: totalSpan.TableID,
475-
StartKey: lastSpan.EndKey,
479+
StartKey: coveredEnd,
476480
EndKey: current.StartKey,
477481
KeyspaceID: totalSpan.KeyspaceID,
478482
})
479-
} else if ord > 0 {
480-
log.Panic("map is out of order",
481-
zap.String("lastSpan", lastSpan.String()),
482-
zap.String("current", current.String()))
483483
}
484-
lastSpan = current
484+
coveredEnd = current.EndKey
485485
return true
486486
})
487487
// Check if there is a hole in the end.
488-
// the lastSpan not reach the totalSpan end
489-
if !bytes.Equal(lastSpan.EndKey, totalSpan.EndKey) {
488+
// coveredEnd not reach the totalSpan end
489+
if !bytes.Equal(coveredEnd, totalSpan.EndKey) {
490490
holes = append(holes, &heartbeatpb.TableSpan{
491491
TableID: totalSpan.TableID,
492-
StartKey: lastSpan.EndKey,
492+
StartKey: coveredEnd,
493493
EndKey: totalSpan.EndKey,
494494
KeyspaceID: totalSpan.KeyspaceID,
495495
})

maintainer/maintainer_controller_test.go

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,242 @@ func TestFinishBootstrapSkipsStaleCreateOperatorForDroppedTable(t *testing.T) {
15371537
}
15381538
}
15391539

1540+
// TestFinishBootstrapRestoresInFlightMergeWithoutDuplicateCoverage covers maintainer failover in
1541+
// the middle of a merge. The bootstrap snapshot contains two source spans in WaitingMerge and the
1542+
// overlapping merged target span in Preparing. FinishBootstrap must not panic in findHoles or
1543+
// create a fresh absent table task; instead it should rebuild the merge-related tasks/operators
1544+
// from the bootstrap snapshot.
1545+
func TestFinishBootstrapRestoresInFlightMergeWithoutDuplicateCoverage(t *testing.T) {
1546+
testutil.SetUpTestServices(t)
1547+
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
1548+
nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"}
1549+
1550+
tableTriggerEventDispatcherID := common.NewDispatcherID()
1551+
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
1552+
ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID,
1553+
common.DDLSpanSchemaID,
1554+
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
1555+
ID: tableTriggerEventDispatcherID.ToPB(),
1556+
ComponentStatus: heartbeatpb.ComponentState_Working,
1557+
CheckpointTs: 1,
1558+
}, "node1", false)
1559+
1560+
cfg := config.GetDefaultReplicaConfig().Clone()
1561+
cfg.Scheduler = &config.ChangefeedSchedulerConfig{
1562+
EnableTableAcrossNodes: util.AddressOf(true),
1563+
BalanceScoreThreshold: util.AddressOf(1),
1564+
MinTrafficPercentage: util.AddressOf(0.8),
1565+
MaxTrafficPercentage: util.AddressOf(1.2),
1566+
}
1567+
refresher := replica.NewRegionCountRefresher(cfID, time.Minute)
1568+
s := NewController(cfID, 1, &mockThreadPool{}, cfg, ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false)
1569+
1570+
schemaStore := eventservice.NewMockSchemaStore()
1571+
schemaStore.SetTables([]commonEvent.Table{
1572+
{
1573+
TableID: 1,
1574+
SchemaID: 1,
1575+
Splitable: true,
1576+
SchemaTableName: &commonEvent.SchemaTableName{SchemaName: "test", TableName: "t1"},
1577+
},
1578+
})
1579+
appcontext.SetService(appcontext.SchemaStore, schemaStore)
1580+
1581+
totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1)
1582+
midKey := appendNew(totalSpan.StartKey, 'a')
1583+
sourceSpan1 := &heartbeatpb.TableSpan{
1584+
TableID: 1,
1585+
StartKey: totalSpan.StartKey,
1586+
EndKey: midKey,
1587+
KeyspaceID: common.DefaultKeyspaceID,
1588+
}
1589+
sourceSpan2 := &heartbeatpb.TableSpan{
1590+
TableID: 1,
1591+
StartKey: midKey,
1592+
EndKey: totalSpan.EndKey,
1593+
KeyspaceID: common.DefaultKeyspaceID,
1594+
}
1595+
mergedSpan := &heartbeatpb.TableSpan{
1596+
TableID: 1,
1597+
StartKey: totalSpan.StartKey,
1598+
EndKey: totalSpan.EndKey,
1599+
KeyspaceID: common.DefaultKeyspaceID,
1600+
}
1601+
1602+
sourceDispatcherID1 := common.NewDispatcherID()
1603+
sourceDispatcherID2 := common.NewDispatcherID()
1604+
mergedDispatcherID := common.NewDispatcherID()
1605+
1606+
_, err := s.FinishBootstrap(map[node.ID]*heartbeatpb.MaintainerBootstrapResponse{
1607+
"node1": {
1608+
ChangefeedID: cfID.ToPB(),
1609+
Spans: []*heartbeatpb.BootstrapTableSpan{
1610+
{
1611+
ID: sourceDispatcherID1.ToPB(),
1612+
SchemaID: 1,
1613+
Span: sourceSpan1,
1614+
ComponentStatus: heartbeatpb.ComponentState_WaitingMerge,
1615+
CheckpointTs: 10,
1616+
Mode: common.DefaultMode,
1617+
},
1618+
{
1619+
ID: sourceDispatcherID2.ToPB(),
1620+
SchemaID: 1,
1621+
Span: sourceSpan2,
1622+
ComponentStatus: heartbeatpb.ComponentState_WaitingMerge,
1623+
CheckpointTs: 10,
1624+
Mode: common.DefaultMode,
1625+
},
1626+
{
1627+
ID: mergedDispatcherID.ToPB(),
1628+
SchemaID: 1,
1629+
Span: mergedSpan,
1630+
ComponentStatus: heartbeatpb.ComponentState_Preparing,
1631+
CheckpointTs: 10,
1632+
Mode: common.DefaultMode,
1633+
},
1634+
},
1635+
MergeOperators: []*heartbeatpb.MergeDispatcherRequest{
1636+
{
1637+
ChangefeedID: cfID.ToPB(),
1638+
DispatcherIDs: []*heartbeatpb.DispatcherID{sourceDispatcherID1.ToPB(), sourceDispatcherID2.ToPB()},
1639+
MergedDispatcherID: mergedDispatcherID.ToPB(),
1640+
Mode: common.DefaultMode,
1641+
},
1642+
},
1643+
CheckpointTs: 10,
1644+
},
1645+
}, false)
1646+
require.NoError(t, err)
1647+
require.True(t, s.bootstrapped)
1648+
1649+
// No extra absent table span should be created during bootstrap; only the two source spans and
1650+
// the restored merged target should exist.
1651+
require.Zero(t, s.spanController.GetAbsentSize())
1652+
require.Equal(t, 3, len(s.spanController.GetTasksByTableID(1)))
1653+
require.Equal(t, 3, s.spanController.GetSchedulingSize())
1654+
require.Zero(t, s.spanController.GetReplicatingSize())
1655+
require.NotNil(t, s.spanController.GetTaskByID(sourceDispatcherID1))
1656+
require.NotNil(t, s.spanController.GetTaskByID(sourceDispatcherID2))
1657+
require.NotNil(t, s.spanController.GetTaskByID(mergedDispatcherID))
1658+
1659+
mergeOp := s.operatorController.GetOperator(mergedDispatcherID)
1660+
require.NotNil(t, mergeOp)
1661+
_, ok := mergeOp.(*operator.MergeDispatcherOperator)
1662+
require.True(t, ok)
1663+
}
1664+
1665+
// TestFinishBootstrapKeepsOverlapCoveredAfterMergeJournalCleanup covers a later merge-recovery
1666+
// window where dispatcher manager has already committed the merged dispatcher and dropped the merge
1667+
// journal, but the old source dispatchers are still waiting to be cleaned up. Bootstrap must still
1668+
// treat the overlapping spans as complete coverage and avoid creating absent hole tasks.
1669+
func TestFinishBootstrapKeepsOverlapCoveredAfterMergeJournalCleanup(t *testing.T) {
1670+
testutil.SetUpTestServices(t)
1671+
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
1672+
nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"}
1673+
1674+
tableTriggerEventDispatcherID := common.NewDispatcherID()
1675+
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
1676+
ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID,
1677+
common.DDLSpanSchemaID,
1678+
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
1679+
ID: tableTriggerEventDispatcherID.ToPB(),
1680+
ComponentStatus: heartbeatpb.ComponentState_Working,
1681+
CheckpointTs: 1,
1682+
}, "node1", false)
1683+
1684+
cfg := config.GetDefaultReplicaConfig().Clone()
1685+
cfg.Scheduler = &config.ChangefeedSchedulerConfig{
1686+
EnableTableAcrossNodes: util.AddressOf(true),
1687+
BalanceScoreThreshold: util.AddressOf(1),
1688+
MinTrafficPercentage: util.AddressOf(0.8),
1689+
MaxTrafficPercentage: util.AddressOf(1.2),
1690+
}
1691+
refresher := replica.NewRegionCountRefresher(cfID, time.Minute)
1692+
s := NewController(cfID, 1, &mockThreadPool{}, cfg, ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false)
1693+
1694+
schemaStore := eventservice.NewMockSchemaStore()
1695+
schemaStore.SetTables([]commonEvent.Table{
1696+
{
1697+
TableID: 1,
1698+
SchemaID: 1,
1699+
Splitable: true,
1700+
SchemaTableName: &commonEvent.SchemaTableName{SchemaName: "test", TableName: "t1"},
1701+
},
1702+
})
1703+
appcontext.SetService(appcontext.SchemaStore, schemaStore)
1704+
1705+
totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1)
1706+
midKey := appendNew(totalSpan.StartKey, 'a')
1707+
sourceSpan1 := &heartbeatpb.TableSpan{
1708+
TableID: 1,
1709+
StartKey: totalSpan.StartKey,
1710+
EndKey: midKey,
1711+
KeyspaceID: common.DefaultKeyspaceID,
1712+
}
1713+
sourceSpan2 := &heartbeatpb.TableSpan{
1714+
TableID: 1,
1715+
StartKey: midKey,
1716+
EndKey: totalSpan.EndKey,
1717+
KeyspaceID: common.DefaultKeyspaceID,
1718+
}
1719+
mergedSpan := &heartbeatpb.TableSpan{
1720+
TableID: 1,
1721+
StartKey: totalSpan.StartKey,
1722+
EndKey: totalSpan.EndKey,
1723+
KeyspaceID: common.DefaultKeyspaceID,
1724+
}
1725+
1726+
sourceDispatcherID1 := common.NewDispatcherID()
1727+
sourceDispatcherID2 := common.NewDispatcherID()
1728+
mergedDispatcherID := common.NewDispatcherID()
1729+
1730+
// Scenario:
1731+
// 1. Source dispatchers are already in WaitingMerge and still present in bootstrap spans.
1732+
// 2. The merged dispatcher is committed and visible as Initializing.
1733+
// 3. The persisted merge operator is already gone, so bootstrap only has overlapping coverage.
1734+
// Expectation: FinishBootstrap succeeds without adding absent spans for fake holes.
1735+
_, err := s.FinishBootstrap(map[node.ID]*heartbeatpb.MaintainerBootstrapResponse{
1736+
"node1": {
1737+
ChangefeedID: cfID.ToPB(),
1738+
Spans: []*heartbeatpb.BootstrapTableSpan{
1739+
{
1740+
ID: sourceDispatcherID1.ToPB(),
1741+
SchemaID: 1,
1742+
Span: sourceSpan1,
1743+
ComponentStatus: heartbeatpb.ComponentState_WaitingMerge,
1744+
CheckpointTs: 10,
1745+
Mode: common.DefaultMode,
1746+
},
1747+
{
1748+
ID: sourceDispatcherID2.ToPB(),
1749+
SchemaID: 1,
1750+
Span: sourceSpan2,
1751+
ComponentStatus: heartbeatpb.ComponentState_WaitingMerge,
1752+
CheckpointTs: 10,
1753+
Mode: common.DefaultMode,
1754+
},
1755+
{
1756+
ID: mergedDispatcherID.ToPB(),
1757+
SchemaID: 1,
1758+
Span: mergedSpan,
1759+
ComponentStatus: heartbeatpb.ComponentState_Initializing,
1760+
CheckpointTs: 10,
1761+
Mode: common.DefaultMode,
1762+
},
1763+
},
1764+
CheckpointTs: 10,
1765+
},
1766+
}, false)
1767+
require.NoError(t, err)
1768+
require.True(t, s.bootstrapped)
1769+
require.Zero(t, s.spanController.GetAbsentSize())
1770+
require.GreaterOrEqual(t, len(s.spanController.GetTasksByTableID(1)), 2)
1771+
require.GreaterOrEqual(t, s.spanController.GetReplicatingSize(), 2)
1772+
require.Zero(t, s.spanController.GetSchedulingSize())
1773+
require.NotNil(t, s.spanController.GetTaskByID(mergedDispatcherID))
1774+
}
1775+
15401776
func TestSplitTableWhenBootstrapFinished(t *testing.T) {
15411777
testutil.SetUpTestServices(t)
15421778
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
@@ -1682,6 +1918,14 @@ func TestMapFindHole(t *testing.T) {
16821918
{StartKey: []byte("t1_1"), EndKey: []byte("t2_0")},
16831919
},
16841920
},
1921+
{ // 6. overlapping spans still cover the whole range.
1922+
spans: []*heartbeatpb.TableSpan{
1923+
{StartKey: []byte("t1_0"), EndKey: []byte("t1_1")},
1924+
{StartKey: []byte("t1_0"), EndKey: []byte("t2_0")},
1925+
{StartKey: []byte("t1_1"), EndKey: []byte("t2_0")},
1926+
},
1927+
rang: &heartbeatpb.TableSpan{StartKey: []byte("t1_0"), EndKey: []byte("t2_0")},
1928+
},
16851929
}
16861930

16871931
for i, cs := range cases {

0 commit comments

Comments
 (0)