diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 128f8cbe1e..883d271cba 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -97,6 +97,8 @@ type DispatcherManager struct { // Entries must be deleted on completion (create -> after creation; remove -> on cleanup), otherwise // future maintainer requests for the same dispatcherID will be ignored. currentOperatorMap sync.Map // map[common.DispatcherID]SchedulerDispatcherRequest (in dispatcher manager, not heartbeatpb) + // mergeOperatorMap keeps in-flight merge requests so bootstrap can reconstruct merge operators after maintainer failover. + mergeOperatorMap sync.Map // map[mergedDispatcherID.String()]*heartbeatpb.MergeDispatcherRequest // schemaIDToDispatchers is shared in the DispatcherManager, // it store all the infos about schemaID->Dispatchers // Dispatchers may change the schemaID when meets some special events, such as rename ddl @@ -191,6 +193,7 @@ func NewDispatcherManager( ctx: ctx, dispatcherMap: newDispatcherMap[*dispatcher.EventDispatcher](), currentOperatorMap: sync.Map{}, + mergeOperatorMap: sync.Map{}, changefeedID: changefeedID, keyspaceID: keyspaceID, pdClock: pdClock, diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_merge.go b/downstreamadapter/dispatchermanager/dispatcher_manager_merge.go new file mode 100644 index 0000000000..f801e684c5 --- /dev/null +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_merge.go @@ -0,0 +1,105 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatchermanager + +import ( + "github.com/pingcap/log" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "go.uber.org/zap" +) + +// TrackMergeOperator records an in-flight merge request so bootstrap can restore it after maintainer failover. +func (e *DispatcherManager) TrackMergeOperator(req *heartbeatpb.MergeDispatcherRequest) { + if req == nil || req.MergedDispatcherID == nil { + return + } + mergedID := common.NewDispatcherIDFromPB(req.MergedDispatcherID) + e.mergeOperatorMap.Store(mergedID.String(), cloneMergeDispatcherRequest(req)) +} + +// RemoveMergeOperator drops the persisted merge request once the merged dispatcher has converged. +func (e *DispatcherManager) RemoveMergeOperator(mergedDispatcherID common.DispatcherID) { + e.mergeOperatorMap.Delete(mergedDispatcherID.String()) +} + +// MaybeCleanupMergeOperator removes a persisted merge request when the merged dispatcher is already complete or gone. +func (e *DispatcherManager) MaybeCleanupMergeOperator(req *heartbeatpb.MergeDispatcherRequest) { + if req == nil || req.MergedDispatcherID == nil { + return + } + mergedID := common.NewDispatcherIDFromPB(req.MergedDispatcherID) + if common.IsRedoMode(req.Mode) { + if dispatcherItem, ok := e.redoDispatcherMap.Get(mergedID); ok { + if dispatcherItem.GetComponentStatus() == heartbeatpb.ComponentState_Working { + e.RemoveMergeOperator(mergedID) + } + return + } + } else { + if dispatcherItem, ok := e.dispatcherMap.Get(mergedID); ok { + if dispatcherItem.GetComponentStatus() == heartbeatpb.ComponentState_Working { + e.RemoveMergeOperator(mergedID) + } + return + } + } + log.Info("cleanup merge operator because merged dispatcher not found", + zap.Stringer("changefeedID", e.changefeedID), + zap.String("dispatcherID", mergedID.String()), + zap.Int64("mode", req.Mode)) + e.RemoveMergeOperator(mergedID) +} + +// GetMergeOperators returns cloned in-flight merge requests for maintainer bootstrap. +func (e *DispatcherManager) GetMergeOperators() []*heartbeatpb.MergeDispatcherRequest { + operators := make([]*heartbeatpb.MergeDispatcherRequest, 0) + e.mergeOperatorMap.Range(func(_, value any) bool { + req, ok := value.(*heartbeatpb.MergeDispatcherRequest) + if !ok || req == nil { + return true + } + operators = append(operators, cloneMergeDispatcherRequest(req)) + return true + }) + return operators +} + +func cloneMergeDispatcherRequest(req *heartbeatpb.MergeDispatcherRequest) *heartbeatpb.MergeDispatcherRequest { + if req == nil { + return nil + } + clone := &heartbeatpb.MergeDispatcherRequest{ + Mode: req.Mode, + } + if req.ChangefeedID != nil { + id := *req.ChangefeedID + clone.ChangefeedID = &id + } + if req.MergedDispatcherID != nil { + mergedID := *req.MergedDispatcherID + clone.MergedDispatcherID = &mergedID + } + if len(req.DispatcherIDs) > 0 { + clone.DispatcherIDs = make([]*heartbeatpb.DispatcherID, 0, len(req.DispatcherIDs)) + for _, dispatcherID := range req.DispatcherIDs { + if dispatcherID == nil { + continue + } + id := *dispatcherID + clone.DispatcherIDs = append(clone.DispatcherIDs, &id) + } + } + return clone +} diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 99ef678b14..a12b6d77da 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -60,6 +60,8 @@ type HeartBeatCollector struct { mergeDispatcherRequestDynamicStream dynstream.DynamicStream[int, common.GID, MergeDispatcherRequest, *DispatcherManager, *MergeDispatcherRequestHandler] mc messaging.MessageCenter + dispatcherManagers sync.Map // map[common.GID]*DispatcherManager + wg sync.WaitGroup cancel context.CancelFunc isClosed atomic.Bool @@ -128,6 +130,7 @@ func (c *HeartBeatCollector) RegisterDispatcherManager(m *DispatcherManager) err if err != nil { return errors.Trace(err) } + c.dispatcherManagers.Store(m.changefeedID.Id, m) return nil } @@ -170,6 +173,7 @@ func (c *HeartBeatCollector) RemoveDispatcherManager(id common.ChangeFeedID) err if err != nil { return errors.Trace(err) } + c.dispatcherManagers.Delete(id.Id) return nil } @@ -287,6 +291,9 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ NewRedoMetaMessage(redoMessage)) case messaging.TypeMergeDispatcherRequest: mergeDispatcherRequest := msg.Message[0].(*heartbeatpb.MergeDispatcherRequest) + if manager, ok := c.dispatcherManagers.Load(common.NewChangefeedGIDFromPB(mergeDispatcherRequest.ChangefeedID)); ok { + manager.(*DispatcherManager).TrackMergeOperator(mergeDispatcherRequest) + } c.mergeDispatcherRequestDynamicStream.Push( common.NewChangefeedGIDFromPB(mergeDispatcherRequest.ChangefeedID), NewMergeDispatcherRequest(mergeDispatcherRequest)) diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index 35e07a4765..b72504c43d 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -785,11 +785,19 @@ func (h *MergeDispatcherRequestHandler) Handle(dispatcherManager *DispatcherMana } mergeDispatcherRequest := reqs[0] + dispatcherManager.TrackMergeOperator(mergeDispatcherRequest.MergeDispatcherRequest) dispatcherIDs := make([]common.DispatcherID, 0, len(mergeDispatcherRequest.DispatcherIDs)) for _, id := range mergeDispatcherRequest.DispatcherIDs { dispatcherIDs = append(dispatcherIDs, common.NewDispatcherIDFromPB(id)) } - dispatcherManager.MergeDispatcher(dispatcherIDs, common.NewDispatcherIDFromPB(mergeDispatcherRequest.MergedDispatcherID), mergeDispatcherRequest.Mode) + task := dispatcherManager.MergeDispatcher( + dispatcherIDs, + common.NewDispatcherIDFromPB(mergeDispatcherRequest.MergedDispatcherID), + mergeDispatcherRequest.Mode, + ) + if task == nil { + dispatcherManager.MaybeCleanupMergeOperator(mergeDispatcherRequest.MergeDispatcherRequest) + } return false } diff --git a/downstreamadapter/dispatchermanager/task.go b/downstreamadapter/dispatchermanager/task.go index c129c8a29a..0af11bfc5e 100644 --- a/downstreamadapter/dispatchermanager/task.go +++ b/downstreamadapter/dispatchermanager/task.go @@ -232,6 +232,7 @@ func abortMerge[T dispatcher.Dispatcher](t *MergeCheckTask, dispatcherMap *Dispa } removeDispatcher(t.manager, t.mergedDispatcher.GetId(), dispatcherMap, sinkType) + t.manager.RemoveMergeOperator(t.mergedDispatcher.GetId()) } func doMerge[T dispatcher.Dispatcher](t *MergeCheckTask, dispatcherMap *DispatcherMap[T]) { @@ -330,6 +331,7 @@ func doMerge[T dispatcher.Dispatcher](t *MergeCheckTask, dispatcherMap *Dispatch } dispatcher.Remove() } + t.manager.RemoveMergeOperator(t.mergedDispatcher.GetId()) } type mergedStartTsCandidate struct { diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index f91e5222f2..a25988078f 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -526,4 +526,6 @@ func retrieveOperatorsForBootstrapResponse( }) return true }) + + response.MergeOperators = append(response.MergeOperators, manager.GetMergeOperators()...) } diff --git a/heartbeatpb/heartbeat.pb.go b/heartbeatpb/heartbeat.pb.go index 4cd9f689cc..af01b739b6 100644 --- a/heartbeatpb/heartbeat.pb.go +++ b/heartbeatpb/heartbeat.pb.go @@ -1884,6 +1884,8 @@ type MaintainerBootstrapResponse struct { // It will be used when redo enable. RedoCheckpointTs uint64 `protobuf:"varint,5,opt,name=redo_checkpoint_ts,json=redoCheckpointTs,proto3" json:"redo_checkpoint_ts,omitempty"` Operators []*ScheduleDispatcherRequest `protobuf:"bytes,6,rep,name=operators,proto3" json:"operators,omitempty"` + // merge_operators contains in-flight merge dispatcher requests for bootstrap recovery. + MergeOperators []*MergeDispatcherRequest `protobuf:"bytes,7,rep,name=merge_operators,json=mergeOperators,proto3" json:"merge_operators,omitempty"` } func (m *MaintainerBootstrapResponse) Reset() { *m = MaintainerBootstrapResponse{} } @@ -1961,6 +1963,13 @@ func (m *MaintainerBootstrapResponse) GetOperators() []*ScheduleDispatcherReques return nil } +func (m *MaintainerBootstrapResponse) GetMergeOperators() []*MergeDispatcherRequest { + if m != nil { + return m.MergeOperators + } + return nil +} + type MaintainerPostBootstrapRequest struct { ChangefeedID *ChangefeedID `protobuf:"bytes,1,opt,name=changefeedID,proto3" json:"changefeedID,omitempty"` TableTriggerEventDispatcherId *DispatcherID `protobuf:"bytes,2,opt,name=table_trigger_event_dispatcher_id,json=tableTriggerEventDispatcherId,proto3" json:"table_trigger_event_dispatcher_id,omitempty"` @@ -3501,168 +3510,169 @@ func init() { func init() { proto.RegisterFile("heartbeatpb/heartbeat.proto", fileDescriptor_6d584080fdadb670) } var fileDescriptor_6d584080fdadb670 = []byte{ - // 2569 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x3a, 0xcd, 0x6f, 0x1c, 0x59, - 0xf1, 0xee, 0x9e, 0xef, 0x1a, 0x8f, 0xdd, 0x79, 0x49, 0x1c, 0x6f, 0xe2, 0x38, 0xde, 0xfe, 0xfd, - 0x40, 0x66, 0x76, 0x49, 0x48, 0x76, 0x23, 0x60, 0x59, 0x76, 0x19, 0xcf, 0x78, 0x37, 0x23, 0xdb, - 0x63, 0xeb, 0x8d, 0x97, 0xc0, 0x72, 0x18, 0xda, 0xdd, 0x2f, 0xe3, 0xc6, 0x33, 0xdd, 0x9d, 0xee, - 0x9e, 0x38, 0x8e, 0xb4, 0x20, 0x84, 0xb8, 0x71, 0x00, 0x09, 0x0e, 0x1c, 0x38, 0x20, 0xfe, 0x01, - 0x24, 0xc4, 0x11, 0x89, 0x1b, 0x48, 0x5c, 0xf6, 0x84, 0xe0, 0x04, 0xda, 0xfc, 0x03, 0x08, 0x0e, - 0x5c, 0xd1, 0xfb, 0xea, 0xaf, 0x69, 0xdb, 0x89, 0x3c, 0xca, 0xad, 0xeb, 0xbd, 0xfa, 0x7a, 0x55, - 0xf5, 0xaa, 0xea, 0xd5, 0x0c, 0xdc, 0x38, 0x24, 0x86, 0x1f, 0x1e, 0x10, 0x23, 0xf4, 0x0e, 0xee, - 0x44, 0xdf, 0xb7, 0x3d, 0xdf, 0x0d, 0x5d, 0x54, 0x4f, 0x6c, 0xea, 0x27, 0x50, 0xdb, 0x37, 0x0e, - 0x46, 0xa4, 0xef, 0x19, 0x0e, 0x5a, 0x86, 0x0a, 0x03, 0xba, 0x9d, 0x65, 0x65, 0x4d, 0x59, 0x2f, - 0x60, 0x09, 0xa2, 0xeb, 0x50, 0xed, 0x87, 0x86, 0x1f, 0x6e, 0x91, 0x93, 0x65, 0x75, 0x4d, 0x59, - 0x9f, 0xc7, 0x11, 0x8c, 0x96, 0xa0, 0xbc, 0xe9, 0x58, 0x74, 0xa7, 0xc0, 0x76, 0x04, 0x84, 0x56, - 0x01, 0xb6, 0xc8, 0x49, 0xe0, 0x19, 0x26, 0x65, 0x58, 0x5c, 0x53, 0xd6, 0x1b, 0x38, 0xb1, 0xa2, - 0xff, 0x55, 0x05, 0xed, 0x01, 0x55, 0x65, 0x83, 0x18, 0x21, 0x26, 0x8f, 0x27, 0x24, 0x08, 0xd1, - 0xd7, 0x61, 0xde, 0x3c, 0x34, 0x9c, 0x21, 0x79, 0x44, 0x88, 0x25, 0xf4, 0xa8, 0xdf, 0x7b, 0xed, - 0x76, 0x42, 0xe7, 0xdb, 0xed, 0x04, 0x02, 0x4e, 0xa1, 0xa3, 0xb7, 0xa1, 0x76, 0x6c, 0x84, 0xc4, - 0x1f, 0x1b, 0xfe, 0x11, 0x53, 0xb4, 0x7e, 0x6f, 0x29, 0x45, 0xfb, 0x50, 0xee, 0xe2, 0x18, 0x11, - 0xbd, 0x0b, 0x0d, 0x9f, 0x58, 0x6e, 0xb4, 0xc7, 0x0e, 0x72, 0x3a, 0x65, 0x1a, 0x19, 0x7d, 0x05, - 0xaa, 0x41, 0x68, 0x84, 0x93, 0x80, 0x04, 0xcb, 0xc5, 0xb5, 0xc2, 0x7a, 0xfd, 0xde, 0x4a, 0x8a, - 0x30, 0xb2, 0x6f, 0x9f, 0x61, 0xe1, 0x08, 0x1b, 0xad, 0xc3, 0xa2, 0xe9, 0x8e, 0x3d, 0x32, 0x22, - 0x21, 0xe1, 0x9b, 0xcb, 0xa5, 0x35, 0x65, 0xbd, 0x8a, 0xb3, 0xcb, 0xe8, 0x0d, 0x28, 0x10, 0xdf, - 0x5f, 0x2e, 0xe7, 0x58, 0x03, 0x4f, 0x1c, 0xc7, 0x76, 0x86, 0x9b, 0xbe, 0xef, 0xfa, 0x98, 0x62, - 0xe9, 0x3f, 0x56, 0xa0, 0x16, 0xab, 0xa7, 0x53, 0x8b, 0x12, 0xf3, 0xc8, 0x73, 0x6d, 0x27, 0xdc, - 0x0f, 0x98, 0x45, 0x8b, 0x38, 0xb5, 0x46, 0x5d, 0xe5, 0x93, 0xc0, 0x1d, 0x3d, 0x21, 0xd6, 0x7e, - 0xc0, 0xec, 0x56, 0xc4, 0x89, 0x15, 0xa4, 0x41, 0x21, 0x20, 0x8f, 0x99, 0x59, 0x8a, 0x98, 0x7e, - 0x52, 0xae, 0x23, 0x23, 0x08, 0xfb, 0x27, 0x8e, 0xc9, 0x68, 0x8a, 0x9c, 0x6b, 0x72, 0x4d, 0xff, - 0x04, 0xb4, 0x8e, 0x1d, 0x78, 0x46, 0x68, 0x1e, 0x12, 0xbf, 0x65, 0x86, 0xb6, 0xeb, 0xa0, 0x37, - 0xa0, 0x6c, 0xb0, 0x2f, 0xa6, 0xc7, 0xc2, 0xbd, 0xcb, 0xa9, 0xb3, 0x70, 0x24, 0x2c, 0x50, 0x68, - 0xd4, 0xb5, 0xdd, 0xf1, 0xd8, 0x0e, 0x23, 0xa5, 0x22, 0x18, 0xad, 0x41, 0xbd, 0x1b, 0x50, 0x51, - 0x7b, 0xf4, 0x0c, 0x4c, 0xb5, 0x2a, 0x4e, 0x2e, 0xe9, 0x6d, 0x28, 0xb4, 0xda, 0x5b, 0x29, 0x26, - 0xca, 0xd9, 0x4c, 0xd4, 0x69, 0x26, 0x3f, 0x52, 0xe1, 0x6a, 0xd7, 0x79, 0x34, 0x9a, 0x10, 0x7a, - 0xa8, 0xf8, 0x38, 0x01, 0xfa, 0x06, 0x34, 0xa2, 0x8d, 0xfd, 0x13, 0x8f, 0x88, 0x03, 0x5d, 0x4f, - 0x1d, 0x28, 0x85, 0x81, 0xd3, 0x04, 0xe8, 0x7d, 0x68, 0xc4, 0x0c, 0xbb, 0x1d, 0x7a, 0xc6, 0xc2, - 0x94, 0x7b, 0x93, 0x18, 0x38, 0x8d, 0xcf, 0x6e, 0xa5, 0x79, 0x48, 0xc6, 0x46, 0xb7, 0xc3, 0x0c, - 0x50, 0xc0, 0x11, 0x8c, 0xb6, 0xe0, 0x32, 0x79, 0x6a, 0x8e, 0x26, 0x16, 0x49, 0xd0, 0x58, 0xcc, - 0x4f, 0x67, 0x8a, 0xc8, 0xa3, 0xd2, 0xff, 0xa4, 0x24, 0x5d, 0x29, 0x62, 0xf2, 0x5b, 0x70, 0xd5, - 0xce, 0xb3, 0x8c, 0xb8, 0xb3, 0x7a, 0xbe, 0x21, 0x92, 0x98, 0x38, 0x9f, 0x01, 0xba, 0x1f, 0x05, - 0x09, 0xbf, 0xc2, 0x37, 0x4f, 0x51, 0x37, 0x13, 0x2e, 0x3a, 0x14, 0x0c, 0x53, 0x5e, 0x5e, 0x2d, - 0x1d, 0x58, 0xed, 0x2d, 0x4c, 0x37, 0xf5, 0xdf, 0x2b, 0x70, 0x29, 0x91, 0x74, 0x02, 0xcf, 0x75, - 0x02, 0x72, 0xd1, 0xac, 0xb3, 0x03, 0xc8, 0xca, 0x58, 0x87, 0x48, 0x6f, 0x9e, 0xa6, 0xbb, 0x48, - 0x06, 0x39, 0x84, 0x08, 0x41, 0x71, 0xec, 0x5a, 0x44, 0xb8, 0x94, 0x7d, 0xeb, 0x4f, 0xe1, 0x72, - 0x3b, 0x71, 0x63, 0x77, 0x48, 0x10, 0x18, 0xc3, 0x0b, 0x2b, 0x9e, 0xcd, 0x0d, 0xea, 0x74, 0x6e, - 0xd0, 0x7f, 0xae, 0xc0, 0x22, 0x26, 0x96, 0xbb, 0x43, 0x42, 0x63, 0x46, 0x62, 0xcf, 0x4b, 0x37, - 0x59, 0xb5, 0x0a, 0x39, 0x6a, 0x7d, 0x1f, 0x6e, 0x52, 0xad, 0x70, 0x44, 0xb5, 0xe7, 0xbb, 0x43, - 0x9f, 0x04, 0xc1, 0xab, 0xd1, 0x51, 0xff, 0x04, 0x56, 0xd2, 0xf2, 0x3f, 0x70, 0xfd, 0x63, 0xc3, - 0xb7, 0x5e, 0x91, 0xf8, 0xff, 0xa4, 0x6e, 0x64, 0xdb, 0x75, 0x1e, 0xd9, 0x43, 0xd4, 0x84, 0x62, - 0xe0, 0x19, 0x8e, 0x90, 0xb5, 0x94, 0x5f, 0x85, 0x30, 0xc3, 0xa1, 0xb5, 0x3e, 0xa0, 0x15, 0x3c, - 0xe2, 0x2e, 0x41, 0xaa, 0xb9, 0x95, 0xc8, 0x08, 0xe2, 0x3e, 0x9d, 0x91, 0x32, 0x52, 0xe8, 0x34, - 0x29, 0x05, 0x32, 0x29, 0x15, 0x79, 0x52, 0x92, 0x70, 0x14, 0xd9, 0xa5, 0x38, 0xb2, 0x51, 0x13, - 0xb4, 0xe0, 0xc8, 0xf6, 0x3a, 0x3b, 0xdb, 0xad, 0xa0, 0x2f, 0x34, 0x2a, 0xb3, 0x44, 0x3c, 0xb5, - 0xae, 0xff, 0x42, 0x85, 0xd7, 0x68, 0x86, 0xb3, 0x26, 0xa3, 0x44, 0x82, 0x9a, 0x51, 0xef, 0x70, - 0x1f, 0xca, 0x26, 0xb3, 0xe3, 0x39, 0x59, 0x87, 0x1b, 0x1b, 0x0b, 0x64, 0xd4, 0x86, 0x85, 0x40, - 0xa8, 0xc4, 0xf3, 0x11, 0x33, 0xd8, 0xc2, 0xbd, 0x1b, 0x29, 0xf2, 0x7e, 0x0a, 0x05, 0x67, 0x48, - 0xa8, 0xea, 0xae, 0x47, 0x7c, 0x23, 0x74, 0x7d, 0x56, 0x4b, 0x8a, 0x8c, 0x45, 0x5a, 0xf5, 0xdd, - 0x04, 0x02, 0x4e, 0xa1, 0xeb, 0xff, 0x55, 0x60, 0x69, 0x87, 0xf8, 0xc3, 0xd9, 0x1b, 0xe5, 0x7d, - 0x68, 0x58, 0x2f, 0x59, 0xa3, 0x52, 0xf8, 0xa8, 0x0b, 0x68, 0x4c, 0x35, 0xb3, 0x3a, 0x2f, 0x15, - 0x53, 0x39, 0x44, 0x51, 0xf4, 0x14, 0x13, 0x79, 0x71, 0x0f, 0x2e, 0xef, 0x18, 0xb6, 0x13, 0x1a, - 0xb6, 0x43, 0xfc, 0x07, 0x92, 0x1b, 0xfa, 0x6a, 0xa2, 0x27, 0x53, 0x72, 0xf2, 0x70, 0x4c, 0x93, - 0x6d, 0xca, 0xf4, 0x3f, 0xaa, 0xa0, 0x65, 0xb7, 0x2f, 0x6a, 0xc5, 0x9b, 0x00, 0xf4, 0x6b, 0x40, - 0x85, 0x10, 0x16, 0x5e, 0x35, 0x5c, 0xa3, 0x2b, 0x94, 0x3d, 0x41, 0x77, 0xa1, 0xc4, 0x77, 0xf2, - 0x22, 0xa7, 0xed, 0x8e, 0x3d, 0xd7, 0x21, 0x4e, 0xc8, 0x70, 0x31, 0xc7, 0x44, 0xff, 0x07, 0x8d, - 0x38, 0x1d, 0x0e, 0xc2, 0xa8, 0x01, 0x4b, 0xb5, 0x75, 0xa2, 0x6b, 0x2c, 0xe5, 0xb8, 0x6c, 0xaa, - 0x6b, 0x44, 0x9f, 0x83, 0x85, 0x03, 0xd7, 0x0d, 0x83, 0xd0, 0x37, 0xbc, 0x81, 0xe5, 0x3a, 0x44, - 0xdc, 0xc2, 0x46, 0xb4, 0xda, 0x71, 0x1d, 0x32, 0xd5, 0xf8, 0x55, 0x72, 0x1a, 0x3f, 0x02, 0x8d, - 0x9e, 0x6b, 0x91, 0xd8, 0x1d, 0xf7, 0xa1, 0x3a, 0xb2, 0x9f, 0x10, 0x87, 0x04, 0x81, 0x68, 0x93, - 0xd2, 0xda, 0x50, 0xec, 0x6d, 0x81, 0x80, 0x23, 0x54, 0x6a, 0x36, 0xc7, 0xb5, 0xc8, 0x80, 0x78, - 0xae, 0x79, 0x28, 0xd2, 0x54, 0x8d, 0xae, 0x6c, 0xd2, 0x05, 0xfd, 0x7b, 0xb0, 0xd4, 0x27, 0x61, - 0x8a, 0x56, 0x04, 0xfd, 0x5d, 0x28, 0x87, 0x86, 0x3f, 0x24, 0xe1, 0xf9, 0xd2, 0x04, 0xe2, 0x79, - 0xb2, 0xc6, 0x70, 0x6d, 0x4a, 0x96, 0x68, 0x1e, 0xde, 0x82, 0x8a, 0xe1, 0x79, 0x23, 0x9b, 0x58, - 0xe7, 0x4b, 0x93, 0x98, 0xe7, 0x89, 0xfb, 0x32, 0xdc, 0x68, 0xbb, 0xae, 0x6f, 0xd9, 0x0e, 0xbd, - 0xe3, 0x1b, 0xd2, 0x03, 0xf2, 0x7c, 0xcb, 0x50, 0x79, 0x42, 0xfc, 0x40, 0xb6, 0xd1, 0x05, 0x2c, - 0x41, 0xfd, 0xdb, 0xb0, 0x92, 0x4f, 0x28, 0x94, 0xbd, 0xc0, 0xc5, 0xf8, 0x8b, 0x02, 0x57, 0x5a, - 0x96, 0x15, 0x63, 0x48, 0x6d, 0xbe, 0x00, 0xaa, 0x6d, 0x9d, 0x7f, 0x25, 0x54, 0xdb, 0xa2, 0x6f, - 0xc5, 0x44, 0x8e, 0x9d, 0x8f, 0x92, 0xe8, 0x54, 0x38, 0xe7, 0x94, 0x7c, 0xd4, 0x84, 0x4b, 0x76, - 0x30, 0x70, 0xc8, 0xf1, 0x20, 0xbe, 0x5c, 0x2c, 0xee, 0xab, 0x78, 0xd1, 0x0e, 0x7a, 0xe4, 0x38, - 0x16, 0x87, 0x6e, 0x41, 0xfd, 0x48, 0x3c, 0x35, 0x07, 0xb6, 0xc5, 0x0a, 0x4e, 0x03, 0x83, 0x5c, - 0xea, 0x5a, 0xfa, 0x2f, 0x15, 0xb8, 0x86, 0xc9, 0xd8, 0x7d, 0x42, 0x2e, 0x74, 0xa0, 0x65, 0xa8, - 0x98, 0x46, 0x60, 0x1a, 0x16, 0x11, 0xaf, 0x07, 0x09, 0xd2, 0x1d, 0x9f, 0xf1, 0xb7, 0xc4, 0xe3, - 0x44, 0x82, 0x59, 0xdd, 0x8a, 0x53, 0xba, 0xfd, 0xa6, 0x00, 0xd7, 0x63, 0xad, 0xa6, 0xbc, 0x7f, - 0xc1, 0x64, 0x74, 0x9a, 0x0f, 0x5e, 0x63, 0xa1, 0xe1, 0x27, 0xcc, 0x1f, 0xb5, 0x04, 0x26, 0xbc, - 0x1e, 0xd2, 0xfe, 0x61, 0x10, 0xfa, 0xf6, 0x70, 0x48, 0xfc, 0x01, 0x79, 0x42, 0x9c, 0x70, 0x10, - 0x67, 0x7a, 0x79, 0x8e, 0x33, 0x73, 0xfa, 0x4d, 0xc6, 0x63, 0x9f, 0xb3, 0xd8, 0xa4, 0x1c, 0x92, - 0x8f, 0x8c, 0x7c, 0xf7, 0x96, 0xf2, 0xdd, 0x6b, 0xc0, 0x5a, 0x5a, 0x21, 0xfa, 0x24, 0xcf, 0xe8, - 0x53, 0x3e, 0x4f, 0x9f, 0x95, 0xa4, 0x3e, 0xb4, 0x99, 0x4b, 0xa9, 0x93, 0xf1, 0x52, 0x65, 0xca, - 0x4b, 0x7f, 0x57, 0xe1, 0x46, 0xae, 0x97, 0x66, 0xf3, 0xa8, 0xb8, 0x0f, 0x25, 0xda, 0xa8, 0xc9, - 0x8a, 0x7b, 0x2b, 0x45, 0x17, 0x49, 0x8b, 0xdb, 0x3a, 0x8e, 0x2d, 0x73, 0x7e, 0xe1, 0x45, 0x26, - 0x05, 0x2f, 0x56, 0x45, 0xde, 0x04, 0xc4, 0xac, 0x9b, 0xc6, 0x2c, 0x31, 0x4c, 0x8d, 0xee, 0x24, - 0x1f, 0x26, 0xa8, 0x03, 0x35, 0xd9, 0x9a, 0xd0, 0x3e, 0x8e, 0xaa, 0xfe, 0xf9, 0xdc, 0x4e, 0x68, - 0xaa, 0x55, 0xc1, 0x31, 0xa1, 0xfe, 0x3b, 0x15, 0x56, 0x63, 0xdb, 0xee, 0xb9, 0x41, 0x38, 0xeb, - 0x5b, 0xf0, 0x42, 0x21, 0xad, 0x5e, 0x30, 0xa4, 0xef, 0x42, 0x85, 0xf7, 0xbe, 0xf4, 0x46, 0x51, - 0x53, 0x5c, 0x9b, 0x32, 0xc5, 0xd8, 0xe8, 0x3a, 0x8f, 0x5c, 0x2c, 0xf1, 0xd0, 0x3b, 0x30, 0xcf, - 0xac, 0x2d, 0xe9, 0x8a, 0x67, 0xd3, 0xd5, 0x29, 0x32, 0x87, 0x03, 0xfd, 0xdf, 0x0a, 0xdc, 0x3a, - 0xd5, 0x6a, 0xb3, 0x89, 0xca, 0x57, 0x62, 0xb6, 0x97, 0x89, 0x61, 0xfd, 0x29, 0x40, 0x6c, 0x8f, - 0xd4, 0x48, 0x44, 0xc9, 0x8c, 0x44, 0x56, 0x25, 0x66, 0xcf, 0x18, 0xcb, 0x2e, 0x2c, 0xb1, 0x82, - 0x6e, 0x43, 0x99, 0x5d, 0x27, 0xe9, 0xac, 0x9c, 0x07, 0x14, 0xb3, 0xb9, 0xc0, 0xd2, 0xdb, 0x62, - 0x76, 0xca, 0x04, 0x9f, 0x3e, 0x3b, 0x5d, 0x11, 0x68, 0x09, 0xa9, 0xf1, 0x82, 0xfe, 0x07, 0x15, - 0xd0, 0xf4, 0x6d, 0xa6, 0x25, 0xe8, 0x14, 0xe7, 0xa4, 0x0c, 0xa9, 0x8a, 0xd9, 0xac, 0x3c, 0xb2, - 0x9a, 0x39, 0xb2, 0x7c, 0x11, 0x16, 0x5e, 0xe0, 0x45, 0xf8, 0x01, 0x68, 0xa6, 0xec, 0x35, 0x07, - 0xbc, 0xea, 0x8b, 0x77, 0xc8, 0x99, 0x0d, 0xe9, 0xa2, 0x99, 0x84, 0x27, 0xc1, 0x74, 0x52, 0x29, - 0xe5, 0x24, 0x95, 0xb7, 0xa0, 0x7e, 0x30, 0x72, 0xcd, 0x23, 0xd1, 0x12, 0xf3, 0x5c, 0x8d, 0xd2, - 0x51, 0xce, 0xd8, 0x03, 0x43, 0xe3, 0x7d, 0xb2, 0x7c, 0x00, 0x54, 0x12, 0x0f, 0x80, 0xc7, 0xb0, - 0x14, 0x87, 0x7c, 0x7b, 0xe4, 0x06, 0x64, 0x46, 0x09, 0x22, 0x51, 0xbf, 0xd5, 0x54, 0xfd, 0xd6, - 0x7d, 0xb8, 0x36, 0x25, 0x72, 0x36, 0xb7, 0x8b, 0x3e, 0xca, 0x27, 0xa6, 0x49, 0xdb, 0x64, 0x21, - 0x53, 0x80, 0xfa, 0x4f, 0x14, 0xd0, 0xe2, 0x19, 0x1a, 0x0f, 0xc0, 0x19, 0x8c, 0x20, 0xaf, 0x43, - 0x55, 0x84, 0x29, 0xaf, 0x33, 0x05, 0x1c, 0xc1, 0x67, 0x4d, 0x17, 0xf5, 0xef, 0x40, 0x89, 0xe1, - 0x9d, 0xf3, 0x93, 0xc1, 0x69, 0x61, 0xb9, 0x02, 0xb5, 0xbe, 0x37, 0xb2, 0x59, 0x16, 0x10, 0xdd, - 0x51, 0xbc, 0xa0, 0x3b, 0xb0, 0x20, 0x31, 0xb9, 0xad, 0xce, 0x90, 0xb2, 0x06, 0xf5, 0xdd, 0x91, - 0x95, 0x11, 0x94, 0x5c, 0xa2, 0x18, 0x3d, 0x72, 0x9c, 0x39, 0x49, 0x72, 0x49, 0xff, 0x75, 0x01, - 0x4a, 0x3c, 0xc0, 0x56, 0xa0, 0xd6, 0x0d, 0x36, 0x68, 0xc0, 0x89, 0x66, 0xbe, 0x8a, 0xe3, 0x05, - 0xaa, 0x05, 0xfb, 0x8c, 0x47, 0x26, 0x02, 0x44, 0xef, 0x43, 0x9d, 0x7f, 0xca, 0xf4, 0x31, 0x3d, - 0x3f, 0xc8, 0x3a, 0x0f, 0x27, 0x29, 0xd0, 0x16, 0x5c, 0xea, 0x11, 0x62, 0x75, 0x7c, 0xd7, 0xf3, - 0x24, 0x86, 0x68, 0xa8, 0xce, 0x61, 0x33, 0x4d, 0x87, 0xde, 0x85, 0x45, 0xba, 0xd8, 0xb2, 0xac, - 0x88, 0x15, 0x7f, 0x02, 0xa2, 0xe9, 0xfb, 0x8f, 0xb3, 0xa8, 0xa8, 0x0d, 0x0b, 0x1f, 0x79, 0x96, - 0x11, 0x12, 0x61, 0x42, 0x59, 0xc5, 0x6f, 0xe4, 0x95, 0x20, 0xe1, 0x20, 0x9c, 0x21, 0xc9, 0x0e, - 0xd6, 0x2b, 0x53, 0x83, 0x75, 0xf4, 0x45, 0xf6, 0xe6, 0x1d, 0x92, 0xe5, 0x2a, 0x8b, 0xd9, 0x74, - 0x81, 0xdb, 0x10, 0x77, 0x7e, 0xc8, 0xdf, 0xbb, 0x43, 0xa2, 0xff, 0x00, 0xae, 0x44, 0xf9, 0x4a, - 0xee, 0xd2, 0x64, 0xf3, 0x12, 0x79, 0x72, 0x5d, 0xbe, 0xb2, 0xd5, 0x53, 0x93, 0x8d, 0x78, 0x5c, - 0xe7, 0x0d, 0x60, 0xff, 0xa5, 0xc0, 0x62, 0xe6, 0x97, 0x9c, 0x97, 0x11, 0x9e, 0x97, 0x5c, 0xd5, - 0x59, 0x24, 0xd7, 0xbc, 0x87, 0xd2, 0x5d, 0xb8, 0xca, 0xcb, 0x72, 0x60, 0x3f, 0x23, 0x03, 0x8f, - 0xf8, 0x83, 0x80, 0x98, 0xae, 0xc3, 0x5b, 0x74, 0x15, 0x23, 0xb6, 0xd9, 0xb7, 0x9f, 0x91, 0x3d, - 0xe2, 0xf7, 0xd9, 0x4e, 0xde, 0x64, 0x4e, 0xff, 0xad, 0x02, 0x28, 0x61, 0xeb, 0x19, 0xe5, 0xd5, - 0x0f, 0xa1, 0x71, 0x10, 0x33, 0x8d, 0xe6, 0xe4, 0xaf, 0xe7, 0xd7, 0xa6, 0xa4, 0xfc, 0x34, 0x5d, - 0xae, 0x97, 0x2c, 0x98, 0x4f, 0x76, 0x08, 0x14, 0x27, 0xb4, 0xc7, 0x3c, 0x31, 0xd6, 0x30, 0xfb, - 0xa6, 0x6b, 0xf4, 0xa1, 0x2d, 0x4a, 0x31, 0xfb, 0xa6, 0x6b, 0xa6, 0xe4, 0x55, 0xc3, 0xec, 0x9b, - 0x5e, 0xf7, 0x31, 0x1f, 0xe6, 0x32, 0xbb, 0xd5, 0xb0, 0x04, 0xf5, 0xb7, 0x61, 0x3e, 0x3b, 0x98, - 0x3a, 0xb4, 0x87, 0x87, 0xe2, 0xe7, 0x25, 0xf6, 0x8d, 0x34, 0x28, 0x8c, 0xdc, 0x63, 0x91, 0x28, - 0xe8, 0x27, 0xd5, 0x2d, 0x69, 0x96, 0x17, 0xa3, 0x62, 0xda, 0xd2, 0xc6, 0x41, 0x68, 0x46, 0xbf, - 0x69, 0x6a, 0x95, 0xef, 0x10, 0xa1, 0x5a, 0x04, 0xeb, 0xdf, 0x85, 0x5b, 0xdb, 0xee, 0x30, 0x31, - 0x03, 0x88, 0x27, 0xd4, 0xb3, 0x71, 0xa0, 0xfe, 0x43, 0x05, 0xd6, 0x4e, 0x17, 0x31, 0x9b, 0x42, - 0x78, 0xde, 0xf8, 0x7b, 0x44, 0x6d, 0x49, 0xcc, 0xa3, 0x60, 0x32, 0xde, 0x21, 0xa1, 0x81, 0xbe, - 0x24, 0xef, 0x76, 0x5e, 0x05, 0x94, 0x98, 0xa9, 0x3b, 0xde, 0x04, 0xcd, 0x4c, 0xae, 0xf7, 0xc9, - 0x63, 0x21, 0x67, 0x6a, 0x5d, 0xff, 0x99, 0x02, 0x57, 0x13, 0xbf, 0xdc, 0x90, 0x50, 0x72, 0x44, - 0x57, 0xa0, 0x64, 0xba, 0x13, 0x27, 0x14, 0x4e, 0xe4, 0x00, 0x8d, 0x9c, 0xa7, 0xae, 0xff, 0x80, - 0x3a, 0x57, 0x14, 0x0a, 0x01, 0xd2, 0xb7, 0xf7, 0x53, 0xd7, 0xdf, 0x76, 0x8f, 0xc5, 0xbd, 0x15, - 0x10, 0x2f, 0xfc, 0x63, 0x46, 0x51, 0x14, 0x4f, 0x6f, 0x0e, 0x52, 0x8a, 0x60, 0x32, 0xa6, 0x14, - 0xbc, 0x8d, 0x12, 0x90, 0xfe, 0x2b, 0x05, 0xd6, 0x72, 0x75, 0x6a, 0x99, 0x47, 0xb3, 0xf2, 0xc2, - 0x15, 0x28, 0x25, 0xe7, 0x53, 0x1c, 0xc8, 0xbb, 0x77, 0xf2, 0x07, 0xe2, 0x62, 0xf4, 0x03, 0xb1, - 0xfe, 0x0f, 0x05, 0xf4, 0x5c, 0xfd, 0x78, 0xa5, 0x98, 0x51, 0x32, 0xb9, 0x80, 0x86, 0xe8, 0x3d, - 0xa8, 0x4a, 0x4f, 0x33, 0xdb, 0x66, 0x7f, 0xb2, 0xcc, 0xd5, 0x1e, 0x47, 0x34, 0xcd, 0x9b, 0x50, - 0x16, 0xd3, 0xfb, 0x1a, 0x94, 0x1e, 0xfa, 0x76, 0x48, 0xb4, 0x39, 0x54, 0x85, 0xe2, 0x9e, 0x11, - 0x04, 0x9a, 0xd2, 0x5c, 0xe7, 0x5d, 0x4c, 0x62, 0xc8, 0x0f, 0x50, 0x6e, 0xfb, 0xc4, 0x60, 0x78, - 0x00, 0x65, 0x3e, 0x7d, 0xd2, 0x94, 0xe6, 0x0e, 0xcc, 0x27, 0x67, 0xfb, 0x94, 0xdd, 0xee, 0xa0, - 0x65, 0x59, 0xda, 0x1c, 0x9a, 0x87, 0xea, 0xee, 0x40, 0x22, 0x52, 0xa2, 0xdd, 0xc1, 0x0e, 0xfd, - 0x56, 0x51, 0x1d, 0x2a, 0xbb, 0x03, 0xd6, 0x33, 0x69, 0x05, 0x0e, 0xb0, 0x1f, 0x00, 0xb4, 0x62, - 0xf3, 0x3e, 0xcc, 0x27, 0x67, 0x8e, 0x94, 0x5d, 0x6b, 0xbb, 0xfb, 0xcd, 0x4d, 0xce, 0xae, 0x83, - 0x5b, 0xdd, 0x5e, 0xb7, 0xf7, 0xa1, 0xa6, 0x50, 0xa8, 0xbf, 0xbf, 0xbb, 0xb7, 0x47, 0x21, 0xb5, - 0xf9, 0x0e, 0x40, 0x5c, 0x76, 0xe9, 0x39, 0x7a, 0xbb, 0x3d, 0x4a, 0x53, 0x87, 0xca, 0xc3, 0x56, - 0x77, 0x9f, 0x93, 0x50, 0x00, 0x73, 0x40, 0xa5, 0x38, 0x1d, 0x8a, 0x53, 0x68, 0xbe, 0x99, 0x69, - 0x44, 0x51, 0x05, 0x0a, 0xad, 0xd1, 0x48, 0x9b, 0x43, 0x65, 0x50, 0x3b, 0x1b, 0x5c, 0xf5, 0x9e, - 0xeb, 0x8f, 0x8d, 0x91, 0xa6, 0x36, 0x9f, 0xc1, 0x42, 0xba, 0xcc, 0x31, 0xb6, 0xae, 0x7f, 0x64, - 0x3b, 0x43, 0x2e, 0xb0, 0x1f, 0xb2, 0x7e, 0x86, 0x0b, 0xe4, 0xc7, 0xb7, 0x34, 0x15, 0x69, 0x30, - 0xdf, 0x75, 0xec, 0xd0, 0x36, 0x46, 0xf6, 0x33, 0x8a, 0x5b, 0x40, 0x0d, 0xa8, 0xed, 0xf9, 0xc4, - 0x33, 0x7c, 0x0a, 0x16, 0xd1, 0x02, 0x00, 0xb3, 0x02, 0x26, 0x86, 0x75, 0xa2, 0x95, 0x28, 0xc1, - 0x43, 0xc3, 0x0e, 0x6d, 0x67, 0xc8, 0x8d, 0x53, 0x6e, 0x7e, 0x0d, 0x1a, 0xa9, 0x74, 0x80, 0x2e, - 0x41, 0xe3, 0xa3, 0x5e, 0xb7, 0xd7, 0xdd, 0xef, 0xb6, 0xb6, 0xbb, 0x1f, 0x6f, 0x76, 0xb8, 0x95, - 0x76, 0xba, 0xfd, 0x9d, 0xd6, 0x7e, 0xfb, 0x81, 0xa6, 0x50, 0xf3, 0xf1, 0x4f, 0x75, 0xe3, 0xbd, - 0x3f, 0x7f, 0xb6, 0xaa, 0x7c, 0xfa, 0xd9, 0xaa, 0xf2, 0xcf, 0xcf, 0x56, 0x95, 0x9f, 0x3e, 0x5f, - 0x9d, 0xfb, 0xf4, 0xf9, 0xea, 0xdc, 0xdf, 0x9e, 0xaf, 0xce, 0x7d, 0xfc, 0xff, 0x43, 0x3b, 0x3c, - 0x9c, 0x1c, 0xdc, 0x36, 0xdd, 0xf1, 0x1d, 0xcf, 0x76, 0x86, 0xa6, 0xe1, 0xdd, 0x09, 0x6d, 0xd3, - 0x32, 0xef, 0x24, 0x22, 0xea, 0xa0, 0xcc, 0xfe, 0x80, 0xf3, 0xd6, 0xff, 0x02, 0x00, 0x00, 0xff, - 0xff, 0x92, 0xfa, 0xeb, 0x02, 0x9f, 0x23, 0x00, 0x00, + // 2591 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x1a, 0x4d, 0x6f, 0x1c, 0x49, + 0xd5, 0xdd, 0xf3, 0xfd, 0xc6, 0x63, 0x77, 0x2a, 0x89, 0xe3, 0x4d, 0x1c, 0xc7, 0xdb, 0x0b, 0xc8, + 0xcc, 0x2e, 0x09, 0xc9, 0x6e, 0x04, 0x2c, 0xcb, 0x2e, 0xe3, 0x19, 0xef, 0x66, 0x64, 0x7b, 0x6c, + 0xd5, 0x78, 0x09, 0x2c, 0x87, 0xa1, 0xdd, 0x5d, 0x19, 0x37, 0x9e, 0xe9, 0xee, 0x74, 0xf7, 0xc4, + 0x71, 0xa4, 0x05, 0x21, 0xc4, 0x8d, 0x03, 0x48, 0x70, 0xe0, 0xc0, 0x01, 0x71, 0xe0, 0x8a, 0x84, + 0x38, 0x22, 0x71, 0x03, 0x89, 0xcb, 0x9e, 0x10, 0x37, 0xd0, 0xee, 0x1f, 0x40, 0x70, 0xe0, 0x8a, + 0xea, 0xab, 0xbf, 0xa6, 0x6d, 0x27, 0xf2, 0x28, 0xb7, 0x7e, 0x55, 0xef, 0xab, 0xde, 0x57, 0xbd, + 0x57, 0x33, 0x70, 0xe3, 0x90, 0x18, 0x7e, 0x78, 0x40, 0x8c, 0xd0, 0x3b, 0xb8, 0x13, 0x7d, 0xdf, + 0xf6, 0x7c, 0x37, 0x74, 0x51, 0x3d, 0xb1, 0xa9, 0x9f, 0x40, 0x6d, 0xdf, 0x38, 0x18, 0x91, 0xbe, + 0x67, 0x38, 0x68, 0x19, 0x2a, 0x0c, 0xe8, 0x76, 0x96, 0x95, 0x35, 0x65, 0xbd, 0x80, 0x25, 0x88, + 0xae, 0x43, 0xb5, 0x1f, 0x1a, 0x7e, 0xb8, 0x45, 0x4e, 0x96, 0xd5, 0x35, 0x65, 0x7d, 0x1e, 0x47, + 0x30, 0x5a, 0x82, 0xf2, 0xa6, 0x63, 0xd1, 0x9d, 0x02, 0xdb, 0x11, 0x10, 0x5a, 0x05, 0xd8, 0x22, + 0x27, 0x81, 0x67, 0x98, 0x94, 0x61, 0x71, 0x4d, 0x59, 0x6f, 0xe0, 0xc4, 0x8a, 0xfe, 0x77, 0x15, + 0xb4, 0x07, 0x54, 0x95, 0x0d, 0x62, 0x84, 0x98, 0x3c, 0x9e, 0x90, 0x20, 0x44, 0xdf, 0x80, 0x79, + 0xf3, 0xd0, 0x70, 0x86, 0xe4, 0x11, 0x21, 0x96, 0xd0, 0xa3, 0x7e, 0xef, 0x95, 0xdb, 0x09, 0x9d, + 0x6f, 0xb7, 0x13, 0x08, 0x38, 0x85, 0x8e, 0xde, 0x82, 0xda, 0xb1, 0x11, 0x12, 0x7f, 0x6c, 0xf8, + 0x47, 0x4c, 0xd1, 0xfa, 0xbd, 0xa5, 0x14, 0xed, 0x43, 0xb9, 0x8b, 0x63, 0x44, 0xf4, 0x0e, 0x34, + 0x7c, 0x62, 0xb9, 0xd1, 0x1e, 0x3b, 0xc8, 0xe9, 0x94, 0x69, 0x64, 0xf4, 0x55, 0xa8, 0x06, 0xa1, + 0x11, 0x4e, 0x02, 0x12, 0x2c, 0x17, 0xd7, 0x0a, 0xeb, 0xf5, 0x7b, 0x2b, 0x29, 0xc2, 0xc8, 0xbe, + 0x7d, 0x86, 0x85, 0x23, 0x6c, 0xb4, 0x0e, 0x8b, 0xa6, 0x3b, 0xf6, 0xc8, 0x88, 0x84, 0x84, 0x6f, + 0x2e, 0x97, 0xd6, 0x94, 0xf5, 0x2a, 0xce, 0x2e, 0xa3, 0xd7, 0xa1, 0x40, 0x7c, 0x7f, 0xb9, 0x9c, + 0x63, 0x0d, 0x3c, 0x71, 0x1c, 0xdb, 0x19, 0x6e, 0xfa, 0xbe, 0xeb, 0x63, 0x8a, 0xa5, 0xff, 0x44, + 0x81, 0x5a, 0xac, 0x9e, 0x4e, 0x2d, 0x4a, 0xcc, 0x23, 0xcf, 0xb5, 0x9d, 0x70, 0x3f, 0x60, 0x16, + 0x2d, 0xe2, 0xd4, 0x1a, 0x75, 0x95, 0x4f, 0x02, 0x77, 0xf4, 0x84, 0x58, 0xfb, 0x01, 0xb3, 0x5b, + 0x11, 0x27, 0x56, 0x90, 0x06, 0x85, 0x80, 0x3c, 0x66, 0x66, 0x29, 0x62, 0xfa, 0x49, 0xb9, 0x8e, + 0x8c, 0x20, 0xec, 0x9f, 0x38, 0x26, 0xa3, 0x29, 0x72, 0xae, 0xc9, 0x35, 0xfd, 0x63, 0xd0, 0x3a, + 0x76, 0xe0, 0x19, 0xa1, 0x79, 0x48, 0xfc, 0x96, 0x19, 0xda, 0xae, 0x83, 0x5e, 0x87, 0xb2, 0xc1, + 0xbe, 0x98, 0x1e, 0x0b, 0xf7, 0x2e, 0xa7, 0xce, 0xc2, 0x91, 0xb0, 0x40, 0xa1, 0x51, 0xd7, 0x76, + 0xc7, 0x63, 0x3b, 0x8c, 0x94, 0x8a, 0x60, 0xb4, 0x06, 0xf5, 0x6e, 0x40, 0x45, 0xed, 0xd1, 0x33, + 0x30, 0xd5, 0xaa, 0x38, 0xb9, 0xa4, 0xb7, 0xa1, 0xd0, 0x6a, 0x6f, 0xa5, 0x98, 0x28, 0x67, 0x33, + 0x51, 0xa7, 0x99, 0xfc, 0x58, 0x85, 0xab, 0x5d, 0xe7, 0xd1, 0x68, 0x42, 0xe8, 0xa1, 0xe2, 0xe3, + 0x04, 0xe8, 0x9b, 0xd0, 0x88, 0x36, 0xf6, 0x4f, 0x3c, 0x22, 0x0e, 0x74, 0x3d, 0x75, 0xa0, 0x14, + 0x06, 0x4e, 0x13, 0xa0, 0xf7, 0xa0, 0x11, 0x33, 0xec, 0x76, 0xe8, 0x19, 0x0b, 0x53, 0xee, 0x4d, + 0x62, 0xe0, 0x34, 0x3e, 0xcb, 0x4a, 0xf3, 0x90, 0x8c, 0x8d, 0x6e, 0x87, 0x19, 0xa0, 0x80, 0x23, + 0x18, 0x6d, 0xc1, 0x65, 0xf2, 0xd4, 0x1c, 0x4d, 0x2c, 0x92, 0xa0, 0xb1, 0x98, 0x9f, 0xce, 0x14, + 0x91, 0x47, 0xa5, 0xff, 0x45, 0x49, 0xba, 0x52, 0xc4, 0xe4, 0xb7, 0xe1, 0xaa, 0x9d, 0x67, 0x19, + 0x91, 0xb3, 0x7a, 0xbe, 0x21, 0x92, 0x98, 0x38, 0x9f, 0x01, 0xba, 0x1f, 0x05, 0x09, 0x4f, 0xe1, + 0x9b, 0xa7, 0xa8, 0x9b, 0x09, 0x17, 0x1d, 0x0a, 0x86, 0x29, 0x93, 0x57, 0x4b, 0x07, 0x56, 0x7b, + 0x0b, 0xd3, 0x4d, 0xfd, 0x8f, 0x0a, 0x5c, 0x4a, 0x14, 0x9d, 0xc0, 0x73, 0x9d, 0x80, 0x5c, 0xb4, + 0xea, 0xec, 0x00, 0xb2, 0x32, 0xd6, 0x21, 0xd2, 0x9b, 0xa7, 0xe9, 0x2e, 0x8a, 0x41, 0x0e, 0x21, + 0x42, 0x50, 0x1c, 0xbb, 0x16, 0x11, 0x2e, 0x65, 0xdf, 0xfa, 0x53, 0xb8, 0xdc, 0x4e, 0x64, 0xec, + 0x0e, 0x09, 0x02, 0x63, 0x78, 0x61, 0xc5, 0xb3, 0xb5, 0x41, 0x9d, 0xae, 0x0d, 0xfa, 0x2f, 0x14, + 0x58, 0xc4, 0xc4, 0x72, 0x77, 0x48, 0x68, 0xcc, 0x48, 0xec, 0x79, 0xe5, 0x26, 0xab, 0x56, 0x21, + 0x47, 0xad, 0x1f, 0xc0, 0x4d, 0xaa, 0x15, 0x8e, 0xa8, 0xf6, 0x7c, 0x77, 0xe8, 0x93, 0x20, 0x78, + 0x39, 0x3a, 0xea, 0x1f, 0xc3, 0x4a, 0x5a, 0xfe, 0xfb, 0xae, 0x7f, 0x6c, 0xf8, 0xd6, 0x4b, 0x12, + 0xff, 0xdf, 0x54, 0x46, 0xb6, 0x5d, 0xe7, 0x91, 0x3d, 0x44, 0x4d, 0x28, 0x06, 0x9e, 0xe1, 0x08, + 0x59, 0x4b, 0xf9, 0xb7, 0x10, 0x66, 0x38, 0xf4, 0xae, 0x0f, 0xe8, 0x0d, 0x1e, 0x71, 0x97, 0x20, + 0xd5, 0xdc, 0x4a, 0x54, 0x04, 0x91, 0x4f, 0x67, 0x94, 0x8c, 0x14, 0x3a, 0x2d, 0x4a, 0x81, 0x2c, + 0x4a, 0x45, 0x5e, 0x94, 0x24, 0x1c, 0x45, 0x76, 0x29, 0x8e, 0x6c, 0xd4, 0x04, 0x2d, 0x38, 0xb2, + 0xbd, 0xce, 0xce, 0x76, 0x2b, 0xe8, 0x0b, 0x8d, 0xca, 0xac, 0x10, 0x4f, 0xad, 0xeb, 0xbf, 0x54, + 0xe1, 0x15, 0x5a, 0xe1, 0xac, 0xc9, 0x28, 0x51, 0xa0, 0x66, 0xd4, 0x3b, 0xdc, 0x87, 0xb2, 0xc9, + 0xec, 0x78, 0x4e, 0xd5, 0xe1, 0xc6, 0xc6, 0x02, 0x19, 0xb5, 0x61, 0x21, 0x10, 0x2a, 0xf1, 0x7a, + 0xc4, 0x0c, 0xb6, 0x70, 0xef, 0x46, 0x8a, 0xbc, 0x9f, 0x42, 0xc1, 0x19, 0x12, 0xaa, 0xba, 0xeb, + 0x11, 0xdf, 0x08, 0x5d, 0x9f, 0xdd, 0x25, 0x45, 0xc6, 0x22, 0xad, 0xfa, 0x6e, 0x02, 0x01, 0xa7, + 0xd0, 0xf5, 0xff, 0x29, 0xb0, 0xb4, 0x43, 0xfc, 0xe1, 0xec, 0x8d, 0xf2, 0x1e, 0x34, 0xac, 0x17, + 0xbc, 0xa3, 0x52, 0xf8, 0xa8, 0x0b, 0x68, 0x4c, 0x35, 0xb3, 0x3a, 0x2f, 0x14, 0x53, 0x39, 0x44, + 0x51, 0xf4, 0x14, 0x13, 0x75, 0x71, 0x0f, 0x2e, 0xef, 0x18, 0xb6, 0x13, 0x1a, 0xb6, 0x43, 0xfc, + 0x07, 0x92, 0x1b, 0xfa, 0x5a, 0xa2, 0x27, 0x53, 0x72, 0xea, 0x70, 0x4c, 0x93, 0x6d, 0xca, 0xf4, + 0x3f, 0xab, 0xa0, 0x65, 0xb7, 0x2f, 0x6a, 0xc5, 0x9b, 0x00, 0xf4, 0x6b, 0x40, 0x85, 0x10, 0x16, + 0x5e, 0x35, 0x5c, 0xa3, 0x2b, 0x94, 0x3d, 0x41, 0x77, 0xa1, 0xc4, 0x77, 0xf2, 0x22, 0xa7, 0xed, + 0x8e, 0x3d, 0xd7, 0x21, 0x4e, 0xc8, 0x70, 0x31, 0xc7, 0x44, 0xaf, 0x41, 0x23, 0x2e, 0x87, 0x83, + 0x30, 0x6a, 0xc0, 0x52, 0x6d, 0x9d, 0xe8, 0x1a, 0x4b, 0x39, 0x2e, 0x9b, 0xea, 0x1a, 0xd1, 0xe7, + 0x61, 0xe1, 0xc0, 0x75, 0xc3, 0x20, 0xf4, 0x0d, 0x6f, 0x60, 0xb9, 0x0e, 0x11, 0x59, 0xd8, 0x88, + 0x56, 0x3b, 0xae, 0x43, 0xa6, 0x1a, 0xbf, 0x4a, 0x4e, 0xe3, 0x47, 0xa0, 0xd1, 0x73, 0x2d, 0x12, + 0xbb, 0xe3, 0x3e, 0x54, 0x47, 0xf6, 0x13, 0xe2, 0x90, 0x20, 0x10, 0x6d, 0x52, 0x5a, 0x1b, 0x8a, + 0xbd, 0x2d, 0x10, 0x70, 0x84, 0x4a, 0xcd, 0xe6, 0xb8, 0x16, 0x19, 0x10, 0xcf, 0x35, 0x0f, 0x45, + 0x99, 0xaa, 0xd1, 0x95, 0x4d, 0xba, 0xa0, 0x7f, 0x1f, 0x96, 0xfa, 0x24, 0x4c, 0xd1, 0x8a, 0xa0, + 0xbf, 0x0b, 0xe5, 0xd0, 0xf0, 0x87, 0x24, 0x3c, 0x5f, 0x9a, 0x40, 0x3c, 0x4f, 0xd6, 0x18, 0xae, + 0x4d, 0xc9, 0x12, 0xcd, 0xc3, 0x9b, 0x50, 0x31, 0x3c, 0x6f, 0x64, 0x13, 0xeb, 0x7c, 0x69, 0x12, + 0xf3, 0x3c, 0x71, 0x5f, 0x81, 0x1b, 0x6d, 0xd7, 0xf5, 0x2d, 0xdb, 0xa1, 0x39, 0xbe, 0x21, 0x3d, + 0x20, 0xcf, 0xb7, 0x0c, 0x95, 0x27, 0xc4, 0x0f, 0x64, 0x1b, 0x5d, 0xc0, 0x12, 0xd4, 0xbf, 0x03, + 0x2b, 0xf9, 0x84, 0x42, 0xd9, 0x0b, 0x24, 0xc6, 0xdf, 0x14, 0xb8, 0xd2, 0xb2, 0xac, 0x18, 0x43, + 0x6a, 0xf3, 0x45, 0x50, 0x6d, 0xeb, 0xfc, 0x94, 0x50, 0x6d, 0x8b, 0xce, 0x8a, 0x89, 0x1a, 0x3b, + 0x1f, 0x15, 0xd1, 0xa9, 0x70, 0xce, 0xb9, 0xf2, 0x51, 0x13, 0x2e, 0xd9, 0xc1, 0xc0, 0x21, 0xc7, + 0x83, 0x38, 0xb9, 0x58, 0xdc, 0x57, 0xf1, 0xa2, 0x1d, 0xf4, 0xc8, 0x71, 0x2c, 0x0e, 0xdd, 0x82, + 0xfa, 0x91, 0x18, 0x35, 0x07, 0xb6, 0xc5, 0x2e, 0x9c, 0x06, 0x06, 0xb9, 0xd4, 0xb5, 0xf4, 0x5f, + 0x29, 0x70, 0x0d, 0x93, 0xb1, 0xfb, 0x84, 0x5c, 0xe8, 0x40, 0xcb, 0x50, 0x31, 0x8d, 0xc0, 0x34, + 0x2c, 0x22, 0xa6, 0x07, 0x09, 0xd2, 0x1d, 0x9f, 0xf1, 0xb7, 0xc4, 0x70, 0x22, 0xc1, 0xac, 0x6e, + 0xc5, 0x29, 0xdd, 0x7e, 0x5b, 0x80, 0xeb, 0xb1, 0x56, 0x53, 0xde, 0xbf, 0x60, 0x31, 0x3a, 0xcd, + 0x07, 0xaf, 0xb0, 0xd0, 0xf0, 0x13, 0xe6, 0x8f, 0x5a, 0x02, 0x13, 0x5e, 0x0d, 0x69, 0xff, 0x30, + 0x08, 0x7d, 0x7b, 0x38, 0x24, 0xfe, 0x80, 0x3c, 0x21, 0x4e, 0x38, 0x88, 0x2b, 0xbd, 0x3c, 0xc7, + 0x99, 0x35, 0xfd, 0x26, 0xe3, 0xb1, 0xcf, 0x59, 0x6c, 0x52, 0x0e, 0xc9, 0x21, 0x23, 0xdf, 0xbd, + 0xa5, 0x7c, 0xf7, 0x1a, 0xb0, 0x96, 0x56, 0x88, 0x8e, 0xe4, 0x19, 0x7d, 0xca, 0xe7, 0xe9, 0xb3, + 0x92, 0xd4, 0x87, 0x36, 0x73, 0x29, 0x75, 0x32, 0x5e, 0xaa, 0x4c, 0x79, 0xe9, 0x77, 0x05, 0xb8, + 0x91, 0xeb, 0xa5, 0xd9, 0x0c, 0x15, 0xf7, 0xa1, 0x44, 0x1b, 0x35, 0x79, 0xe3, 0xde, 0x4a, 0xd1, + 0x45, 0xd2, 0xe2, 0xb6, 0x8e, 0x63, 0xcb, 0x9a, 0x5f, 0x78, 0x9e, 0x97, 0x82, 0xe7, 0xbb, 0x45, + 0xde, 0x00, 0xc4, 0xac, 0x9b, 0xc6, 0x2c, 0x31, 0x4c, 0x8d, 0xee, 0x24, 0x07, 0x13, 0xd4, 0x81, + 0x9a, 0x6c, 0x4d, 0x68, 0x1f, 0x47, 0x55, 0xff, 0x42, 0x6e, 0x27, 0x34, 0xd5, 0xaa, 0xe0, 0x98, + 0x10, 0x6d, 0xc3, 0x22, 0x6b, 0x00, 0x06, 0x31, 0xaf, 0x0a, 0xe3, 0xf5, 0x5a, 0xba, 0x5a, 0xe5, + 0xf6, 0x3c, 0x78, 0x81, 0xd1, 0xca, 0x7e, 0x29, 0xd0, 0xff, 0xa0, 0xc2, 0x6a, 0xec, 0xa9, 0x3d, + 0x37, 0x08, 0x67, 0x9d, 0x53, 0xcf, 0x95, 0x20, 0xea, 0x05, 0x13, 0xe4, 0x2e, 0x54, 0x78, 0x27, + 0x4d, 0xf3, 0x93, 0x1a, 0xe3, 0xda, 0x94, 0x61, 0xc7, 0x46, 0xd7, 0x79, 0xe4, 0x62, 0x89, 0x87, + 0xde, 0x86, 0x79, 0xe6, 0x3b, 0x49, 0x57, 0x3c, 0x9b, 0xae, 0x4e, 0x91, 0x39, 0x1c, 0xe8, 0xff, + 0x51, 0xe0, 0xd6, 0xa9, 0x56, 0x9b, 0x4d, 0x8c, 0xbf, 0x14, 0xb3, 0xbd, 0x48, 0x46, 0xe8, 0x4f, + 0x01, 0x62, 0x7b, 0xa4, 0x1e, 0x58, 0x94, 0xcc, 0x03, 0xcb, 0xaa, 0xc4, 0xec, 0x19, 0x63, 0xd9, + 0xd3, 0x25, 0x56, 0xd0, 0x6d, 0x28, 0xb3, 0xe4, 0x94, 0xce, 0xca, 0x19, 0xc7, 0x98, 0xcd, 0x05, + 0x96, 0xde, 0x16, 0x2f, 0xb1, 0x4c, 0xf0, 0xe9, 0x2f, 0xb1, 0x2b, 0x02, 0x2d, 0x21, 0x35, 0x5e, + 0xd0, 0xff, 0xa4, 0x02, 0x9a, 0xae, 0x0d, 0xf4, 0x42, 0x3b, 0xc5, 0x39, 0x29, 0x43, 0xaa, 0xe2, + 0xa5, 0x57, 0x1e, 0x59, 0xcd, 0x1c, 0x59, 0xce, 0x97, 0x85, 0xe7, 0x98, 0x2f, 0xdf, 0x07, 0xcd, + 0x94, 0x9d, 0xeb, 0x80, 0xf7, 0x10, 0x62, 0xaa, 0x39, 0xb3, 0xbd, 0x5d, 0x34, 0x93, 0xf0, 0x24, + 0x98, 0x2e, 0x51, 0xa5, 0x9c, 0x12, 0xf5, 0x26, 0xd4, 0x0f, 0x46, 0xae, 0x79, 0x24, 0x1a, 0x6c, + 0x5e, 0xf9, 0x51, 0x3a, 0xca, 0x19, 0x7b, 0x60, 0x68, 0xbc, 0xeb, 0x96, 0xe3, 0x44, 0x25, 0x31, + 0x4e, 0x3c, 0x86, 0xa5, 0x38, 0xe4, 0xdb, 0x23, 0x37, 0x20, 0x33, 0x2a, 0x10, 0x89, 0x6e, 0x40, + 0x4d, 0x75, 0x03, 0xba, 0x0f, 0xd7, 0xa6, 0x44, 0xce, 0x26, 0xbb, 0xe8, 0x88, 0x3f, 0x31, 0x4d, + 0xda, 0x74, 0x0b, 0x99, 0x02, 0xd4, 0x7f, 0xaa, 0x80, 0x16, 0xbf, 0xc8, 0xf1, 0x00, 0x9c, 0xc1, + 0x83, 0xe6, 0x75, 0xa8, 0x8a, 0x30, 0xe5, 0xb7, 0x56, 0x01, 0x47, 0xf0, 0x59, 0x6f, 0x95, 0xfa, + 0x77, 0xa1, 0xc4, 0xf0, 0xce, 0xf9, 0x01, 0xe2, 0xb4, 0xb0, 0x5c, 0x81, 0x5a, 0xdf, 0x1b, 0xd9, + 0xac, 0x0a, 0x88, 0x5e, 0x2b, 0x5e, 0xd0, 0x1d, 0x58, 0x90, 0x98, 0xdc, 0x56, 0x67, 0x48, 0x59, + 0x83, 0xfa, 0xee, 0xc8, 0xca, 0x08, 0x4a, 0x2e, 0x51, 0x8c, 0x1e, 0x39, 0xce, 0x9c, 0x24, 0xb9, + 0xa4, 0xff, 0xa6, 0x00, 0x25, 0x1e, 0x60, 0x2b, 0x50, 0xeb, 0x06, 0x1b, 0x34, 0xe0, 0xc4, 0x68, + 0x50, 0xc5, 0xf1, 0x02, 0xd5, 0x82, 0x7d, 0xc6, 0x0f, 0x30, 0x02, 0x44, 0xef, 0x41, 0x9d, 0x7f, + 0xca, 0xf2, 0x31, 0xfd, 0x1a, 0x91, 0x75, 0x1e, 0x4e, 0x52, 0xa0, 0x2d, 0xb8, 0xd4, 0x23, 0xc4, + 0xea, 0xf8, 0xae, 0xe7, 0x49, 0x0c, 0xd1, 0x9e, 0x9d, 0xc3, 0x66, 0x9a, 0x0e, 0xbd, 0x03, 0x8b, + 0x74, 0xb1, 0x65, 0x59, 0x11, 0x2b, 0x3e, 0x50, 0xa2, 0xe9, 0xfc, 0xc7, 0x59, 0x54, 0xd4, 0x86, + 0x85, 0x0f, 0x3d, 0xcb, 0x08, 0x89, 0x30, 0xa1, 0xec, 0x09, 0x6e, 0xe4, 0x5d, 0x41, 0xc2, 0x41, + 0x38, 0x43, 0x92, 0x7d, 0xa6, 0xaf, 0x4c, 0x3d, 0xd3, 0xa3, 0x2f, 0xb1, 0x09, 0x7a, 0x48, 0x96, + 0xab, 0x2c, 0x66, 0xd3, 0x17, 0xdc, 0x86, 0xc8, 0xf9, 0x21, 0x9f, 0x9e, 0x87, 0x44, 0xff, 0x21, + 0x5c, 0x89, 0xea, 0x95, 0xdc, 0xa5, 0xc5, 0xe6, 0x05, 0xea, 0xe4, 0xba, 0x9c, 0xd9, 0xd5, 0x53, + 0x8b, 0x8d, 0x18, 0xd5, 0xf3, 0x9e, 0x73, 0xff, 0xad, 0xc0, 0x62, 0xe6, 0x77, 0xa1, 0x17, 0x11, + 0x9e, 0x57, 0x5c, 0xd5, 0x59, 0x14, 0xd7, 0xbc, 0xb1, 0xeb, 0x2e, 0x5c, 0xe5, 0xd7, 0x72, 0x60, + 0x3f, 0x23, 0x03, 0x8f, 0xf8, 0x83, 0x80, 0x98, 0xae, 0xc3, 0x1b, 0x7e, 0x15, 0x23, 0xb6, 0xd9, + 0xb7, 0x9f, 0x91, 0x3d, 0xe2, 0xf7, 0xd9, 0x4e, 0xde, 0x3b, 0x9f, 0xfe, 0x7b, 0x05, 0x50, 0xc2, + 0xd6, 0x33, 0xaa, 0xab, 0x1f, 0x40, 0xe3, 0x20, 0x66, 0x1a, 0xbd, 0xba, 0xbf, 0x9a, 0x7f, 0x37, + 0x25, 0xe5, 0xa7, 0xe9, 0x72, 0xbd, 0x64, 0xc1, 0x7c, 0xb2, 0x43, 0xa0, 0x38, 0xa1, 0x3d, 0xe6, + 0x85, 0xb1, 0x86, 0xd9, 0x37, 0x5d, 0xa3, 0x63, 0xbb, 0xb8, 0x8a, 0xd9, 0x37, 0x5d, 0x33, 0x25, + 0xaf, 0x1a, 0x66, 0xdf, 0x34, 0xdd, 0xc7, 0xfc, 0x69, 0x98, 0xd9, 0xad, 0x86, 0x25, 0xa8, 0xbf, + 0x05, 0xf3, 0xd9, 0x67, 0xae, 0x43, 0x7b, 0x78, 0x28, 0x7e, 0xac, 0x62, 0xdf, 0x48, 0x83, 0xc2, + 0xc8, 0x3d, 0x16, 0x85, 0x82, 0x7e, 0x52, 0xdd, 0x92, 0x66, 0x79, 0x3e, 0x2a, 0xa6, 0x2d, 0x6d, + 0x1c, 0x84, 0x66, 0xf4, 0x9b, 0x96, 0x56, 0x39, 0xd5, 0x08, 0xd5, 0x22, 0x58, 0xff, 0x1e, 0xdc, + 0xda, 0x76, 0x87, 0x89, 0x17, 0x85, 0xf8, 0xbd, 0x7b, 0x36, 0x0e, 0xd4, 0x7f, 0xa4, 0xc0, 0xda, + 0xe9, 0x22, 0x66, 0x73, 0x11, 0x9e, 0xf7, 0x98, 0x3e, 0xa2, 0xb6, 0x24, 0xe6, 0x51, 0x30, 0x19, + 0xef, 0x90, 0xd0, 0x40, 0x5f, 0x96, 0xb9, 0x9d, 0x77, 0x03, 0x4a, 0xcc, 0x54, 0x8e, 0x37, 0x41, + 0x33, 0x93, 0xeb, 0x7d, 0xf2, 0x58, 0xc8, 0x99, 0x5a, 0xd7, 0x7f, 0xae, 0xc0, 0xd5, 0xc4, 0xef, + 0x40, 0x24, 0x94, 0x1c, 0xd1, 0x15, 0x28, 0x99, 0xee, 0xc4, 0x09, 0x85, 0x13, 0x39, 0x40, 0x23, + 0xe7, 0xa9, 0xeb, 0x3f, 0xa0, 0xce, 0x15, 0x17, 0x85, 0x00, 0xe9, 0x24, 0xff, 0xd4, 0xf5, 0xb7, + 0xdd, 0x63, 0x91, 0xb7, 0x02, 0xe2, 0x17, 0xff, 0x98, 0x51, 0x14, 0xc5, 0x20, 0xcf, 0x41, 0x4a, + 0x11, 0x4c, 0xc6, 0x94, 0x82, 0xb7, 0x51, 0x02, 0xd2, 0x7f, 0xad, 0xc0, 0x5a, 0xae, 0x4e, 0x2d, + 0xf3, 0x68, 0x56, 0x5e, 0xb8, 0x02, 0xa5, 0xe4, 0x6b, 0x17, 0x07, 0xf2, 0xf2, 0x4e, 0xfe, 0xdc, + 0x5c, 0x8c, 0x7e, 0x6e, 0xd6, 0xff, 0xa9, 0x80, 0x9e, 0xab, 0x1f, 0xbf, 0x29, 0x66, 0x54, 0x4c, + 0x2e, 0xa0, 0x21, 0x7a, 0x17, 0xaa, 0xd2, 0xd3, 0xcc, 0xb6, 0xd9, 0x1f, 0x40, 0x73, 0xb5, 0xc7, + 0x11, 0x4d, 0xf3, 0x26, 0x94, 0xc5, 0x6f, 0x01, 0x35, 0x28, 0x3d, 0xf4, 0xed, 0x90, 0x68, 0x73, + 0xa8, 0x0a, 0xc5, 0x3d, 0x23, 0x08, 0x34, 0xa5, 0xb9, 0xce, 0xbb, 0x98, 0xc4, 0x4f, 0x06, 0x00, + 0xe5, 0xb6, 0x4f, 0x0c, 0x86, 0x07, 0x50, 0xe6, 0x6f, 0x59, 0x9a, 0xd2, 0xdc, 0x81, 0xf9, 0xe4, + 0x2f, 0x05, 0x94, 0xdd, 0xee, 0xa0, 0x65, 0x59, 0xda, 0x1c, 0x9a, 0x87, 0xea, 0xee, 0x40, 0x22, + 0x52, 0xa2, 0xdd, 0xc1, 0x0e, 0xfd, 0x56, 0x51, 0x1d, 0x2a, 0xbb, 0x03, 0xd6, 0x33, 0x69, 0x05, + 0x0e, 0xb0, 0xd1, 0x5a, 0x2b, 0x36, 0xef, 0xc3, 0x7c, 0xf2, 0x05, 0x93, 0xb2, 0x6b, 0x6d, 0x77, + 0xbf, 0xb5, 0xc9, 0xd9, 0x75, 0x70, 0xab, 0xdb, 0xeb, 0xf6, 0x3e, 0xd0, 0x14, 0x0a, 0xf5, 0xf7, + 0x77, 0xf7, 0xf6, 0x28, 0xa4, 0x36, 0xdf, 0x06, 0x88, 0xaf, 0x5d, 0x7a, 0x8e, 0xde, 0x6e, 0x8f, + 0xd2, 0xd4, 0xa1, 0xf2, 0xb0, 0xd5, 0xdd, 0xe7, 0x24, 0x14, 0xc0, 0x1c, 0x50, 0x29, 0x4e, 0x87, + 0xe2, 0x14, 0x9a, 0x6f, 0x64, 0x1a, 0x51, 0x54, 0x81, 0x42, 0x6b, 0x34, 0xd2, 0xe6, 0x50, 0x19, + 0xd4, 0xce, 0x06, 0x57, 0xbd, 0xe7, 0xfa, 0x63, 0x63, 0xa4, 0xa9, 0xcd, 0x67, 0xb0, 0x90, 0xbe, + 0xe6, 0x18, 0x5b, 0xd7, 0x3f, 0xb2, 0x9d, 0x21, 0x17, 0xd8, 0x0f, 0x59, 0x3f, 0xc3, 0x05, 0xf2, + 0xe3, 0x5b, 0x9a, 0x8a, 0x34, 0x98, 0xef, 0x3a, 0x76, 0x68, 0x1b, 0x23, 0xfb, 0x19, 0xc5, 0x2d, + 0xa0, 0x06, 0xd4, 0xf6, 0x7c, 0xe2, 0x19, 0x3e, 0x05, 0x8b, 0x68, 0x01, 0x80, 0x59, 0x01, 0x13, + 0xc3, 0x3a, 0xd1, 0x4a, 0x94, 0xe0, 0xa1, 0x61, 0x87, 0xb6, 0x33, 0xe4, 0xc6, 0x29, 0x37, 0xbf, + 0x0e, 0x8d, 0x54, 0x39, 0x40, 0x97, 0xa0, 0xf1, 0x61, 0xaf, 0xdb, 0xeb, 0xee, 0x77, 0x5b, 0xdb, + 0xdd, 0x8f, 0x36, 0x3b, 0xdc, 0x4a, 0x3b, 0xdd, 0xfe, 0x4e, 0x6b, 0xbf, 0xfd, 0x40, 0x53, 0xa8, + 0xf9, 0xf8, 0xa7, 0xba, 0xf1, 0xee, 0x5f, 0x3f, 0x5d, 0x55, 0x3e, 0xf9, 0x74, 0x55, 0xf9, 0xd7, + 0xa7, 0xab, 0xca, 0xcf, 0x3e, 0x5b, 0x9d, 0xfb, 0xe4, 0xb3, 0xd5, 0xb9, 0x7f, 0x7c, 0xb6, 0x3a, + 0xf7, 0xd1, 0xe7, 0x86, 0x76, 0x78, 0x38, 0x39, 0xb8, 0x6d, 0xba, 0xe3, 0x3b, 0x9e, 0xed, 0x0c, + 0x4d, 0xc3, 0xbb, 0x13, 0xda, 0xa6, 0x65, 0xde, 0x49, 0x44, 0xd4, 0x41, 0x99, 0xfd, 0x9d, 0xe7, + 0xcd, 0xff, 0x07, 0x00, 0x00, 0xff, 0xff, 0x5f, 0xf2, 0x66, 0x62, 0xed, 0x23, 0x00, 0x00, } func (m *TableSpan) Marshal() (dAtA []byte, err error) { @@ -4973,6 +4983,20 @@ func (m *MaintainerBootstrapResponse) MarshalToSizedBuffer(dAtA []byte) (int, er _ = i var l int _ = l + if len(m.MergeOperators) > 0 { + for iNdEx := len(m.MergeOperators) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.MergeOperators[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintHeartbeat(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x3a + } + } if len(m.Operators) > 0 { for iNdEx := len(m.Operators) - 1; iNdEx >= 0; iNdEx-- { { @@ -6795,6 +6819,12 @@ func (m *MaintainerBootstrapResponse) Size() (n int) { n += 1 + l + sovHeartbeat(uint64(l)) } } + if len(m.MergeOperators) > 0 { + for _, e := range m.MergeOperators { + l = e.Size() + n += 1 + l + sovHeartbeat(uint64(l)) + } + } return n } @@ -10967,6 +10997,40 @@ func (m *MaintainerBootstrapResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MergeOperators", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHeartbeat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHeartbeat + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthHeartbeat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MergeOperators = append(m.MergeOperators, &MergeDispatcherRequest{}) + if err := m.MergeOperators[len(m.MergeOperators)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipHeartbeat(dAtA[iNdEx:]) diff --git a/heartbeatpb/heartbeat.proto b/heartbeatpb/heartbeat.proto index 4c021a1315..65995642a1 100644 --- a/heartbeatpb/heartbeat.proto +++ b/heartbeatpb/heartbeat.proto @@ -232,6 +232,8 @@ message MaintainerBootstrapResponse { // It will be used when redo enable. uint64 redo_checkpoint_ts = 5; repeated ScheduleDispatcherRequest operators = 6; + // merge_operators contains in-flight merge dispatcher requests for bootstrap recovery. + repeated MergeDispatcherRequest merge_operators = 7; } message MaintainerPostBootstrapRequest { @@ -393,7 +395,7 @@ enum ChecksumState { message ChecksumMeta { ChecksumState state = 1; - // checksumStateSeq is a monotonic version for checksumState, + // checksumStateSeq is a monotonic version for checksumState, // used to discard out-of-order meta info. uint64 checksumStateSeq = 2; } diff --git a/maintainer/maintainer_controller_bootstrap.go b/maintainer/maintainer_controller_bootstrap.go index f7fc9e7ab1..58765f525a 100644 --- a/maintainer/maintainer_controller_bootstrap.go +++ b/maintainer/maintainer_controller_bootstrap.go @@ -16,6 +16,7 @@ package maintainer import ( "bytes" "context" + "sort" "time" "github.com/pingcap/log" @@ -127,6 +128,13 @@ func (c *Controller) FinishBootstrap( } } + // Restore merge operators after task state is rebuilt from bootstrap spans/operators. + // Merge restoration needs the per-dispatcher task map from buildTaskInfo, but must run + // before we discard any leftover working tasks as dropped-table artifacts. + if err := c.restoreCurrentMergeOperators(allNodesResp, buildTableSplitMap(tables)); err != nil { + return nil, errors.Trace(err) + } + // Step 4: Handle any remaining working tasks (likely dropped tables) c.handleRemainingWorkingTasks(workingTaskMap, redoWorkingTaskMap) @@ -320,10 +328,7 @@ func (c *Controller) buildTaskInfo( error, ) { // Build table splitability map for later use - tableSplitMap := make(map[int64]bool, len(tables)) - for _, tbl := range tables { - tableSplitMap[tbl.TableID] = tbl.Splitable - } + tableSplitMap := buildTableSplitMap(tables) workingTaskMap := c.buildWorkingTaskMap(allNodesResp, tableSplitMap, mode) // Restore current working operators first so spanController reflects "in-flight scheduling intent" // captured by dispatcher managers. This avoids bootstrap creating duplicate tasks for a dispatcherID @@ -335,6 +340,14 @@ func (c *Controller) buildTaskInfo( return workingTaskMap, schemaInfos, nil } +func buildTableSplitMap(tables []commonEvent.Table) map[int64]bool { + tableSplitMap := make(map[int64]bool, len(tables)) + for _, tbl := range tables { + tableSplitMap[tbl.TableID] = tbl.Splitable + } + return tableSplitMap +} + func (c *Controller) handleRemainingWorkingTasks( workingTaskMap, redoWorkingTaskMap map[int64]utils.Map[*heartbeatpb.TableSpan, *replica.SpanReplication], ) { @@ -443,40 +456,40 @@ func addToWorkingTaskMap( tableSpans.ReplaceOrInsert(span, spanReplication) } -// findHoles returns an array of Span that are not covered in the range +// findHoles returns the uncovered sub-spans in totalSpan. +// +// Bootstrap snapshots can temporarily contain overlapping spans during in-flight merge recovery: +// for example, source dispatchers in WaitingMerge can coexist with a merged dispatcher in +// Preparing/Initializing. We therefore compute holes from the union of reported coverage instead +// of assuming the input spans are strictly non-overlapping. func findHoles(currentSpan utils.Map[*heartbeatpb.TableSpan, *replica.SpanReplication], totalSpan *heartbeatpb.TableSpan) []*heartbeatpb.TableSpan { - lastSpan := &heartbeatpb.TableSpan{ - TableID: totalSpan.TableID, - StartKey: totalSpan.StartKey, - EndKey: totalSpan.StartKey, - KeyspaceID: totalSpan.KeyspaceID, - } + coveredEnd := totalSpan.StartKey var holes []*heartbeatpb.TableSpan // table span is sorted currentSpan.Ascend(func(current *heartbeatpb.TableSpan, _ *replica.SpanReplication) bool { - ord := bytes.Compare(lastSpan.EndKey, current.StartKey) - if ord < 0 { + // Skip spans that are fully covered by earlier overlaps. This preserves complete table + // coverage without manufacturing holes for legitimate bootstrap overlap. + if bytes.Compare(current.EndKey, coveredEnd) <= 0 { + return true + } + if bytes.Compare(coveredEnd, current.StartKey) < 0 { // Find a hole. holes = append(holes, &heartbeatpb.TableSpan{ TableID: totalSpan.TableID, - StartKey: lastSpan.EndKey, + StartKey: coveredEnd, EndKey: current.StartKey, KeyspaceID: totalSpan.KeyspaceID, }) - } else if ord > 0 { - log.Panic("map is out of order", - zap.String("lastSpan", lastSpan.String()), - zap.String("current", current.String())) } - lastSpan = current + coveredEnd = current.EndKey return true }) // Check if there is a hole in the end. - // the lastSpan not reach the totalSpan end - if !bytes.Equal(lastSpan.EndKey, totalSpan.EndKey) { + // coveredEnd not reach the totalSpan end + if !bytes.Equal(coveredEnd, totalSpan.EndKey) { holes = append(holes, &heartbeatpb.TableSpan{ TableID: totalSpan.TableID, - StartKey: lastSpan.EndKey, + StartKey: coveredEnd, EndKey: totalSpan.EndKey, KeyspaceID: totalSpan.KeyspaceID, }) @@ -903,3 +916,237 @@ func (c *Controller) handleCurrentWorkingRemove( } return nil } + +// restoreCurrentMergeOperators rebuilds maintainer-side merge operators from bootstrap snapshots. +// +// Dispatcher managers persist in-flight merge requests independently from create/remove scheduling requests. +// After a maintainer failover, we restore those merge requests so source spans keep converging instead of +// remaining stuck in scheduling state or leaking an incomplete merged dispatcher. +func (c *Controller) restoreCurrentMergeOperators( + allNodesResp map[node.ID]*heartbeatpb.MaintainerBootstrapResponse, + tableSplitMap map[int64]bool, +) error { + for nodeID, resp := range allNodesResp { + if len(resp.MergeOperators) == 0 { + continue + } + + for _, mergeReq := range resp.MergeOperators { + if mergeReq == nil || mergeReq.MergedDispatcherID == nil { + continue + } + if len(mergeReq.DispatcherIDs) < 2 { + log.Warn("merge operator has insufficient dispatcher IDs", + zap.String("nodeID", nodeID.String()), + zap.String("changefeed", resp.ChangefeedID.String())) + continue + } + + spanController := c.getSpanController(mergeReq.Mode) + operatorController := c.getOperatorController(mergeReq.Mode) + spanInfoByID := indexBootstrapSpans(resp.Spans, mergeReq.Mode) + mergedDispatcherID := common.NewDispatcherIDFromPB(mergeReq.MergedDispatcherID) + + sourceReplicaSets := make([]*replica.SpanReplication, 0, len(mergeReq.DispatcherIDs)) + sourceComplete := true + seenSources := make(map[common.DispatcherID]struct{}, len(mergeReq.DispatcherIDs)) + for _, idPB := range mergeReq.DispatcherIDs { + if idPB == nil { + sourceComplete = false + break + } + dispatcherID := common.NewDispatcherIDFromPB(idPB) + if _, ok := seenSources[dispatcherID]; ok { + continue + } + seenSources[dispatcherID] = struct{}{} + + replicaSet := spanController.GetTaskByID(dispatcherID) + if replicaSet == nil { + spanInfo := spanInfoByID[dispatcherID] + if spanInfo == nil { + sourceComplete = false + break + } + splitEnabled := spanController.ShouldEnableSplit(tableSplitMap[spanInfo.Span.TableID]) + replicaSet = c.createSpanReplication(spanInfo, nodeID, splitEnabled) + spanController.AddReplicatingSpan(replicaSet) + } + sourceReplicaSets = append(sourceReplicaSets, replicaSet) + } + + mergedSpanInfo := spanInfoByID[mergedDispatcherID] + mergedReplicaSet := spanController.GetTaskByID(mergedDispatcherID) + if mergedSpanInfo != nil && mergedSpanInfo.ComponentStatus == heartbeatpb.ComponentState_Working { + if mergedReplicaSet == nil { + splitEnabled := spanController.ShouldEnableSplit(tableSplitMap[mergedSpanInfo.Span.TableID]) + mergedReplicaSet = c.createSpanReplication(mergedSpanInfo, nodeID, splitEnabled) + spanController.AddReplicatingSpan(mergedReplicaSet) + } + for _, replicaSet := range sourceReplicaSets { + if mergedReplicaSet != nil && replicaSet.ID == mergedReplicaSet.ID { + continue + } + spanController.RemoveReplicatingSpan(replicaSet) + } + if mergedReplicaSet != nil { + spanController.MarkSpanReplicating(mergedReplicaSet) + } + log.Info("merge already finished during bootstrap", + zap.String("nodeID", nodeID.String()), + zap.String("changefeed", resp.ChangefeedID.String()), + zap.String("dispatcher", mergedDispatcherID.String())) + continue + } + + if !sourceComplete || len(sourceReplicaSets) < 2 { + log.Warn("skip restoring merge operator due to missing source dispatchers", + zap.String("nodeID", nodeID.String()), + zap.String("changefeed", resp.ChangefeedID.String()), + zap.String("dispatcher", mergedDispatcherID.String())) + for _, replicaSet := range sourceReplicaSets { + spanController.MarkSpanScheduling(replicaSet) + } + if mergedSpanInfo != nil { + if mergedReplicaSet == nil { + splitEnabled := spanController.ShouldEnableSplit(tableSplitMap[mergedSpanInfo.Span.TableID]) + mergedReplicaSet = c.createSpanReplication(mergedSpanInfo, nodeID, splitEnabled) + if mergedSpanInfo.ComponentStatus == heartbeatpb.ComponentState_Working { + spanController.AddReplicatingSpan(mergedReplicaSet) + } else { + spanController.AddSchedulingReplicaSet(mergedReplicaSet, nodeID) + } + } else if mergedSpanInfo.ComponentStatus == heartbeatpb.ComponentState_Working { + spanController.MarkSpanReplicating(mergedReplicaSet) + } else { + spanController.MarkSpanScheduling(mergedReplicaSet) + } + } + continue + } + + sort.Slice(sourceReplicaSets, func(i, j int) bool { + return bytes.Compare(sourceReplicaSets[i].Span.StartKey, sourceReplicaSets[j].Span.StartKey) < 0 + }) + + createdMerged := false + if mergedReplicaSet == nil { + if mergedSpanInfo != nil { + splitEnabled := spanController.ShouldEnableSplit(tableSplitMap[mergedSpanInfo.Span.TableID]) + mergedReplicaSet = c.createSpanReplication(mergedSpanInfo, nodeID, splitEnabled) + spanController.AddSchedulingReplicaSet(mergedReplicaSet, nodeID) + } else { + mergedSpan, schemaID, checkpointTs, ok := buildMergedSpanFromReplicas(sourceReplicaSets) + if !ok { + log.Warn("skip restoring merge operator due to invalid merge spans", + zap.String("nodeID", nodeID.String()), + zap.String("changefeed", resp.ChangefeedID.String()), + zap.String("dispatcher", mergedDispatcherID.String())) + continue + } + splitEnabled := spanController.ShouldEnableSplit(tableSplitMap[mergedSpan.TableID]) + status := &heartbeatpb.TableSpanStatus{ + ID: mergedDispatcherID.ToPB(), + CheckpointTs: checkpointTs, + Mode: mergeReq.Mode, + ComponentStatus: heartbeatpb.ComponentState_Preparing, + } + mergedReplicaSet = replica.NewWorkingSpanReplication( + c.changefeedID, + mergedDispatcherID, + schemaID, + mergedSpan, + status, + nodeID, + splitEnabled, + ) + spanController.AddSchedulingReplicaSet(mergedReplicaSet, nodeID) + createdMerged = true + } + } + + if mergedReplicaSet == nil { + log.Warn("merge replica set not found when restoring merge", + zap.String("nodeID", nodeID.String()), + zap.String("changefeed", resp.ChangefeedID.String()), + zap.String("dispatcher", mergedDispatcherID.String())) + continue + } + + spanController.MarkSpanScheduling(mergedReplicaSet) + + mergeOp := operatorController.AddRestoredMergeOperator(sourceReplicaSets, mergedReplicaSet) + if mergeOp == nil { + log.Error("restore merge operator failed", + zap.String("nodeID", nodeID.String()), + zap.String("changefeed", resp.ChangefeedID.String()), + zap.String("dispatcher", mergedDispatcherID.String()), + zap.Any("mergedReplicaSet", mergedReplicaSet), + zap.Any("toMergedReplicaSets", sourceReplicaSets)) + if createdMerged { + spanController.RemoveReplicatingSpan(mergedReplicaSet) + } + return errors.ErrOperatorIsNil.GenWithStack("restore merge operator failed when bootstrap") + } + mergeOp.Start() + } + } + return nil +} + +// buildMergedSpanFromReplicas synthesizes the merged span shape from consecutive source replica sets. +func buildMergedSpanFromReplicas( + replicaSets []*replica.SpanReplication, +) (*heartbeatpb.TableSpan, int64, uint64, bool) { + if len(replicaSets) < 2 { + return nil, 0, 0, false + } + first := replicaSets[0] + if first == nil || first.Span == nil { + return nil, 0, 0, false + } + + tableID := first.Span.TableID + keyspaceID := first.Span.KeyspaceID + schemaID := first.GetSchemaID() + nodeID := first.GetNodeID() + startKey := first.Span.StartKey + endKey := first.Span.EndKey + firstStatus := first.GetStatus() + if firstStatus == nil { + return nil, 0, 0, false + } + minCheckpoint := firstStatus.CheckpointTs + prevSpan := first.Span + for idx := 1; idx < len(replicaSets); idx++ { + replicaSet := replicaSets[idx] + if replicaSet == nil || replicaSet.Span == nil { + return nil, 0, 0, false + } + if replicaSet.Span.TableID != tableID || + replicaSet.Span.KeyspaceID != keyspaceID || + replicaSet.GetSchemaID() != schemaID || + replicaSet.GetNodeID() != nodeID { + return nil, 0, 0, false + } + if !common.IsTableSpanConsecutive(prevSpan, replicaSet.Span) { + return nil, 0, 0, false + } + status := replicaSet.GetStatus() + if status == nil { + return nil, 0, 0, false + } + if checkpoint := status.CheckpointTs; checkpoint < minCheckpoint { + minCheckpoint = checkpoint + } + prevSpan = replicaSet.Span + endKey = replicaSet.Span.EndKey + } + + return &heartbeatpb.TableSpan{ + TableID: tableID, + StartKey: startKey, + EndKey: endKey, + KeyspaceID: keyspaceID, + }, schemaID, minCheckpoint, true +} diff --git a/maintainer/maintainer_controller_test.go b/maintainer/maintainer_controller_test.go index 31a75342f5..f46c456a15 100644 --- a/maintainer/maintainer_controller_test.go +++ b/maintainer/maintainer_controller_test.go @@ -1537,6 +1537,242 @@ func TestFinishBootstrapSkipsStaleCreateOperatorForDroppedTable(t *testing.T) { } } +// TestFinishBootstrapRestoresInFlightMergeWithoutDuplicateCoverage covers maintainer failover in +// the middle of a merge. The bootstrap snapshot contains two source spans in WaitingMerge and the +// overlapping merged target span in Preparing. FinishBootstrap must not panic in findHoles or +// create a fresh absent table task; instead it should rebuild the merge-related tasks/operators +// from the bootstrap snapshot. +func TestFinishBootstrapRestoresInFlightMergeWithoutDuplicateCoverage(t *testing.T) { + testutil.SetUpTestServices(t) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + + cfg := config.GetDefaultReplicaConfig().Clone() + cfg.Scheduler = &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: util.AddressOf(true), + BalanceScoreThreshold: util.AddressOf(1), + MinTrafficPercentage: util.AddressOf(0.8), + MaxTrafficPercentage: util.AddressOf(1.2), + } + refresher := replica.NewRegionCountRefresher(cfID, time.Minute) + s := NewController(cfID, 1, &mockThreadPool{}, cfg, ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false) + + schemaStore := eventservice.NewMockSchemaStore() + schemaStore.SetTables([]commonEvent.Table{ + { + TableID: 1, + SchemaID: 1, + Splitable: true, + SchemaTableName: &commonEvent.SchemaTableName{SchemaName: "test", TableName: "t1"}, + }, + }) + appcontext.SetService(appcontext.SchemaStore, schemaStore) + + totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1) + midKey := appendNew(totalSpan.StartKey, 'a') + sourceSpan1 := &heartbeatpb.TableSpan{ + TableID: 1, + StartKey: totalSpan.StartKey, + EndKey: midKey, + KeyspaceID: common.DefaultKeyspaceID, + } + sourceSpan2 := &heartbeatpb.TableSpan{ + TableID: 1, + StartKey: midKey, + EndKey: totalSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + } + mergedSpan := &heartbeatpb.TableSpan{ + TableID: 1, + StartKey: totalSpan.StartKey, + EndKey: totalSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + } + + sourceDispatcherID1 := common.NewDispatcherID() + sourceDispatcherID2 := common.NewDispatcherID() + mergedDispatcherID := common.NewDispatcherID() + + _, err := s.FinishBootstrap(map[node.ID]*heartbeatpb.MaintainerBootstrapResponse{ + "node1": { + ChangefeedID: cfID.ToPB(), + Spans: []*heartbeatpb.BootstrapTableSpan{ + { + ID: sourceDispatcherID1.ToPB(), + SchemaID: 1, + Span: sourceSpan1, + ComponentStatus: heartbeatpb.ComponentState_WaitingMerge, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, + { + ID: sourceDispatcherID2.ToPB(), + SchemaID: 1, + Span: sourceSpan2, + ComponentStatus: heartbeatpb.ComponentState_WaitingMerge, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, + { + ID: mergedDispatcherID.ToPB(), + SchemaID: 1, + Span: mergedSpan, + ComponentStatus: heartbeatpb.ComponentState_Preparing, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, + }, + MergeOperators: []*heartbeatpb.MergeDispatcherRequest{ + { + ChangefeedID: cfID.ToPB(), + DispatcherIDs: []*heartbeatpb.DispatcherID{sourceDispatcherID1.ToPB(), sourceDispatcherID2.ToPB()}, + MergedDispatcherID: mergedDispatcherID.ToPB(), + Mode: common.DefaultMode, + }, + }, + CheckpointTs: 10, + }, + }, false) + require.NoError(t, err) + require.True(t, s.bootstrapped) + + // No extra absent table span should be created during bootstrap; only the two source spans and + // the restored merged target should exist. + require.Zero(t, s.spanController.GetAbsentSize()) + require.Equal(t, 3, len(s.spanController.GetTasksByTableID(1))) + require.Equal(t, 3, s.spanController.GetSchedulingSize()) + require.Zero(t, s.spanController.GetReplicatingSize()) + require.NotNil(t, s.spanController.GetTaskByID(sourceDispatcherID1)) + require.NotNil(t, s.spanController.GetTaskByID(sourceDispatcherID2)) + require.NotNil(t, s.spanController.GetTaskByID(mergedDispatcherID)) + + mergeOp := s.operatorController.GetOperator(mergedDispatcherID) + require.NotNil(t, mergeOp) + _, ok := mergeOp.(*operator.MergeDispatcherOperator) + require.True(t, ok) +} + +// TestFinishBootstrapKeepsOverlapCoveredAfterMergeJournalCleanup covers a later merge-recovery +// window where dispatcher manager has already committed the merged dispatcher and dropped the merge +// journal, but the old source dispatchers are still waiting to be cleaned up. Bootstrap must still +// treat the overlapping spans as complete coverage and avoid creating absent hole tasks. +func TestFinishBootstrapKeepsOverlapCoveredAfterMergeJournalCleanup(t *testing.T) { + testutil.SetUpTestServices(t) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + + cfg := config.GetDefaultReplicaConfig().Clone() + cfg.Scheduler = &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: util.AddressOf(true), + BalanceScoreThreshold: util.AddressOf(1), + MinTrafficPercentage: util.AddressOf(0.8), + MaxTrafficPercentage: util.AddressOf(1.2), + } + refresher := replica.NewRegionCountRefresher(cfID, time.Minute) + s := NewController(cfID, 1, &mockThreadPool{}, cfg, ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false) + + schemaStore := eventservice.NewMockSchemaStore() + schemaStore.SetTables([]commonEvent.Table{ + { + TableID: 1, + SchemaID: 1, + Splitable: true, + SchemaTableName: &commonEvent.SchemaTableName{SchemaName: "test", TableName: "t1"}, + }, + }) + appcontext.SetService(appcontext.SchemaStore, schemaStore) + + totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1) + midKey := appendNew(totalSpan.StartKey, 'a') + sourceSpan1 := &heartbeatpb.TableSpan{ + TableID: 1, + StartKey: totalSpan.StartKey, + EndKey: midKey, + KeyspaceID: common.DefaultKeyspaceID, + } + sourceSpan2 := &heartbeatpb.TableSpan{ + TableID: 1, + StartKey: midKey, + EndKey: totalSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + } + mergedSpan := &heartbeatpb.TableSpan{ + TableID: 1, + StartKey: totalSpan.StartKey, + EndKey: totalSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + } + + sourceDispatcherID1 := common.NewDispatcherID() + sourceDispatcherID2 := common.NewDispatcherID() + mergedDispatcherID := common.NewDispatcherID() + + // Scenario: + // 1. Source dispatchers are already in WaitingMerge and still present in bootstrap spans. + // 2. The merged dispatcher is committed and visible as Initializing. + // 3. The persisted merge operator is already gone, so bootstrap only has overlapping coverage. + // Expectation: FinishBootstrap succeeds without adding absent spans for fake holes. + _, err := s.FinishBootstrap(map[node.ID]*heartbeatpb.MaintainerBootstrapResponse{ + "node1": { + ChangefeedID: cfID.ToPB(), + Spans: []*heartbeatpb.BootstrapTableSpan{ + { + ID: sourceDispatcherID1.ToPB(), + SchemaID: 1, + Span: sourceSpan1, + ComponentStatus: heartbeatpb.ComponentState_WaitingMerge, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, + { + ID: sourceDispatcherID2.ToPB(), + SchemaID: 1, + Span: sourceSpan2, + ComponentStatus: heartbeatpb.ComponentState_WaitingMerge, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, + { + ID: mergedDispatcherID.ToPB(), + SchemaID: 1, + Span: mergedSpan, + ComponentStatus: heartbeatpb.ComponentState_Initializing, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, + }, + CheckpointTs: 10, + }, + }, false) + require.NoError(t, err) + require.True(t, s.bootstrapped) + require.Zero(t, s.spanController.GetAbsentSize()) + require.GreaterOrEqual(t, len(s.spanController.GetTasksByTableID(1)), 2) + require.GreaterOrEqual(t, s.spanController.GetReplicatingSize(), 2) + require.Zero(t, s.spanController.GetSchedulingSize()) + require.NotNil(t, s.spanController.GetTaskByID(mergedDispatcherID)) +} + func TestSplitTableWhenBootstrapFinished(t *testing.T) { testutil.SetUpTestServices(t) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) @@ -1682,6 +1918,14 @@ func TestMapFindHole(t *testing.T) { {StartKey: []byte("t1_1"), EndKey: []byte("t2_0")}, }, }, + { // 6. overlapping spans still cover the whole range. + spans: []*heartbeatpb.TableSpan{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + {StartKey: []byte("t1_0"), EndKey: []byte("t2_0")}, + {StartKey: []byte("t1_1"), EndKey: []byte("t2_0")}, + }, + rang: &heartbeatpb.TableSpan{StartKey: []byte("t1_0"), EndKey: []byte("t2_0")}, + }, } for i, cs := range cases { diff --git a/maintainer/operator/operator_controller.go b/maintainer/operator/operator_controller.go index 5684611aea..2ea318428b 100644 --- a/maintainer/operator/operator_controller.go +++ b/maintainer/operator/operator_controller.go @@ -461,6 +461,57 @@ func (oc *Controller) AddMergeOperator( return mergeOperator } +// AddRestoredMergeOperator rebuilds a merge operator from bootstrap state after maintainer failover. +func (oc *Controller) AddRestoredMergeOperator( + affectedReplicaSets []*replica.SpanReplication, + mergedReplicaSet *replica.SpanReplication, +) operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus] { + if mergedReplicaSet == nil { + return nil + } + if !checkMergeOperator(affectedReplicaSets) { + return nil + } + + operators := make([]operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus], 0, len(affectedReplicaSets)) + for _, replicaSet := range affectedReplicaSets { + operator := NewOccupyDispatcherOperator(oc.spanController, replicaSet) + ret := oc.AddOperator(operator) + if ret { + operators = append(operators, operator) + } else { + log.Error("failed to add occupy dispatcher operator when restoring merge", + zap.Stringer("changefeedID", oc.changefeedID), + zap.Int64("group", replicaSet.GetGroupID()), + zap.String("span", common.FormatTableSpan(replicaSet.Span)), + zap.String("operator", operator.String())) + for _, op := range operators { + oc.cancelOperator(op.ID()) + } + return nil + } + } + + mergeOperator := NewRestoredMergeDispatcherOperator(oc.spanController, affectedReplicaSets, mergedReplicaSet, operators) + ret := oc.AddOperator(mergeOperator) + if !ret { + log.Error("failed to add merge dispatcher operator when restoring merge", + zap.Stringer("changefeedID", oc.changefeedID), + zap.Any("mergeSpans", affectedReplicaSets), + zap.String("operator", mergeOperator.String())) + for _, op := range operators { + oc.cancelOperator(op.ID()) + } + return nil + } + log.Info("restore merge operator", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.Int("affectedReplicaSets", len(affectedReplicaSets)), + ) + return mergeOperator +} + func (oc *Controller) GetAllOperators() []operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus] { oc.mu.RLock() defer oc.mu.RUnlock() diff --git a/maintainer/operator/operator_merge.go b/maintainer/operator/operator_merge.go index beee376717..0c1be154d5 100644 --- a/maintainer/operator/operator_merge.go +++ b/maintainer/operator/operator_merge.go @@ -150,6 +150,35 @@ func NewMergeDispatcherOperator( return op } +// NewRestoredMergeDispatcherOperator builds a merge operator whose occupy sub-operators were restored from bootstrap. +func NewRestoredMergeDispatcherOperator( + spanController *span.Controller, + toMergedReplicaSets []*replica.SpanReplication, + mergedReplicaSet *replica.SpanReplication, + occupyOperators []operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus], +) *MergeDispatcherOperator { + toMergedSpans := make([]*heartbeatpb.TableSpan, 0, len(toMergedReplicaSets)) + for _, replicaSet := range toMergedReplicaSets { + toMergedSpans = append(toMergedSpans, replicaSet.Span) + } + + dispatcherIDs := buildDispatcherIDs(toMergedReplicaSets) + spansInfo := buildMergedSpanInfo(toMergedSpans) + + return &MergeDispatcherOperator{ + spanController: spanController, + originNode: toMergedReplicaSets[0].GetNodeID(), + id: mergedReplicaSet.ID, + dispatcherIDs: dispatcherIDs, + toMergedReplicaSets: toMergedReplicaSets, + checkpointTs: 0, + mergedSpanInfo: spansInfo, + occupyOperators: occupyOperators, + newReplicaSet: mergedReplicaSet, + sendThrottler: newSendThrottler(), + } +} + func setOccupyOperatorsFinished(occupyOperators []operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) { for _, occupyOperator := range occupyOperators { // occupyOperators are created by AddMergeOperator and are guaranteed to be OccupyDispatcherOperator. diff --git a/tests/integration_tests/_utils/get_table_id b/tests/integration_tests/_utils/get_table_id index cf9a1a0363..a3177b367d 100755 --- a/tests/integration_tests/_utils/get_table_id +++ b/tests/integration_tests/_utils/get_table_id @@ -15,7 +15,8 @@ MAX_RETRIES=5 retries=0 while [ $retries -lt $MAX_RETRIES ]; do - id=$(curl http://127.0.0.1:10080/schema/"${dbName}"/"${tableName}" | jq .id) + # Follow the configured upstream status endpoint so tests can run against non-default local ports. + id=$(curl "http://${UP_TIDB_HOST:-127.0.0.1}:${UP_TIDB_STATUS:-10080}/schema/${dbName}/${tableName}" | jq .id) if [ -n "$id" ]; then echo $id exit 0 diff --git a/tests/integration_tests/maintainer_failover_when_operator/conf/diff_config.toml b/tests/integration_tests/maintainer_failover_when_operator/conf/diff_config.toml index 27d2d57567..7899ec9e3d 100644 --- a/tests/integration_tests/maintainer_failover_when_operator/conf/diff_config.toml +++ b/tests/integration_tests/maintainer_failover_when_operator/conf/diff_config.toml @@ -13,7 +13,7 @@ check-struct-only = false target-instance = "tidb0" - target-check-tables = ["maintainer_failover_when_operator.t1", "maintainer_failover_when_operator.t2", "maintainer_failover_when_operator.t4", "maintainer_failover_when_operator.t5"] + target-check-tables = ["maintainer_failover_when_operator.t1", "maintainer_failover_when_operator.t2", "maintainer_failover_when_operator.t4", "maintainer_failover_when_operator.t5", "maintainer_failover_when_operator.t6"] [data-sources] [data-sources.mysql1] diff --git a/tests/integration_tests/maintainer_failover_when_operator/run.sh b/tests/integration_tests/maintainer_failover_when_operator/run.sh index 06e57003c5..d759d64243 100755 --- a/tests/integration_tests/maintainer_failover_when_operator/run.sh +++ b/tests/integration_tests/maintainer_failover_when_operator/run.sh @@ -5,13 +5,14 @@ set -eu # This integration test verifies operator consistency across maintainer failover. # # What it validates: -# - Dispatcher managers persist "in-flight" scheduling requests (Create/Remove with an OperatorType). +# - Dispatcher managers persist "in-flight" scheduling requests (Create/Remove with an OperatorType) +# and merge requests. # - After the maintainer fails over, the new maintainer can restore these unfinished operators from # bootstrap responses and keep table scheduling converging instead of leaking or duplicating dispatchers. # # Main steps (per subcase): # 1) Start a 3-capture CDC cluster and create a changefeed. -# 2) Trigger Move / Split / Remove / Create operators and keep them in-progress using failpoints. +# 2) Trigger Move / Split / Remove / Create / Merge operators and keep them in-progress using failpoints. # 3) Kill the current maintainer capture to force a maintainer move. # 4) Disable failpoints and verify tables eventually converge (scheduled on a live capture, dropped table # stays dropped, and downstream data matches upstream via sync-diff). @@ -25,7 +26,25 @@ ROOT_WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 -CDC_ADDRS=("127.0.0.1:8300" "127.0.0.1:8301" "127.0.0.1:8302") +CDC_ADDRS_OVERRIDE=${CDC_ADDRS_OVERRIDE:-"127.0.0.1:8300 127.0.0.1:8301 127.0.0.1:8302"} +read -r -a CDC_ADDRS <<<"$CDC_ADDRS_OVERRIDE" +if [ "${#CDC_ADDRS[@]}" -ne 3 ]; then + echo "CDC_ADDRS_OVERRIDE must contain exactly 3 addresses" >&2 + exit 1 +fi +# Keep CLI-based helpers (changefeed create/query and move/split/merge operations) pointed at the same +# first capture when the case overrides the CDC addresses for isolated local reruns. +export CDC_HOST=${CDC_ADDRS[0]%:*} +export CDC_PORT=${CDC_ADDRS[0]#*:} +CDC_API_SERVER="http://${CDC_ADDRS[0]}" + +function cdc_cli_changefeed() { + command cdc_cli_changefeed "$@" --server "$CDC_API_SERVER" +} + +function run_cdc_cli() { + command run_cdc_cli "$@" --server "$CDC_API_SERVER" +} FAILPOINT_NOT_READY_TO_CLOSE_DISPATCHER="github.com/pingcap/ticdc/downstreamadapter/dispatcher/NotReadyToCloseDispatcher" FAILPOINT_BLOCK_CREATE_DISPATCHER="github.com/pingcap/ticdc/downstreamadapter/dispatchermanager/BlockCreateDispatcher" @@ -53,6 +72,83 @@ function get_table_replication_count() { jq -r --argjson tid "$table_id" '[.items[].table_ids[] | select(. == $tid)] | length' } +function wait_for_table_replication_count() { + local api_addr=$1 + local changefeed_id=$2 + local table_id=$3 + local target_count=$4 + local comparison_mode=$5 + local mode=$6 + local retry=${7:-30} + + for ((i = 0; i < retry; i++)); do + local count + count=$(get_table_replication_count "$api_addr" "$changefeed_id" "$table_id" "$mode") + if [ -n "$count" ] && [ "$count" != "null" ]; then + case "$comparison_mode" in + eq) + if [ "$count" -eq "$target_count" ]; then + echo "$count" + return 0 + fi + ;; + ge) + if [ "$count" -ge "$target_count" ]; then + echo "$count" + return 0 + fi + ;; + le) + if [ "$count" -le "$target_count" ]; then + echo "$count" + return 0 + fi + ;; + *) + echo "unknown comparison mode: $comparison_mode" >&2 + return 1 + ;; + esac + fi + sleep 2 + done + echo "table $table_id replication count did not satisfy $comparison_mode $target_count" >&2 + return 1 +} + +function wait_for_all_table_replications_on_addr() { + local api_addr=$1 + local changefeed_id=$2 + local table_id=$3 + local target_addr=$4 + local mode=$5 + local retry=${6:-30} + + for ((i = 0; i < retry; i++)); do + local target_id + target_id=$(get_capture_id_by_addr "$api_addr" "$target_addr") + if [ -z "$target_id" ] || [ "$target_id" == "null" ]; then + sleep 2 + continue + fi + + local resp + resp=$(curl -s "http://${api_addr}/api/v2/changefeeds/${changefeed_id}/tables?keyspace=$KEYSPACE_NAME&mode=$mode") + local total_count + total_count=$(echo "$resp" | jq -r --argjson tid "$table_id" '[.items[] | select(.table_ids | index($tid))] | length') + local target_count + target_count=$(echo "$resp" | jq -r --argjson tid "$table_id" --arg target "$target_id" '[.items[] | select((.table_ids | index($tid)) and .node_id == $target)] | length') + if [ "$total_count" != "null" ] && [ "$target_count" != "null" ] && + [ "$total_count" -gt 0 ] && [ "$total_count" -eq "$target_count" ]; then + echo "$target_count" + return 0 + fi + sleep 2 + done + echo "table $table_id still has replicas outside $target_addr" >&2 + return 1 +} + function get_maintainer_addr() { local api_addr=$1 local changefeed_id=$2 @@ -125,6 +221,20 @@ function wait_for_add_operator_inflight_in_logs() { return 1 } +function wait_for_restored_merge_operator_in_logs() { + local work_dir=$1 + local retry=$2 + + for ((i = 0; i < retry; i++)); do + if grep -qs "restore merge operator" "$work_dir"/cdc*.log 2>/dev/null; then + return 0 + fi + sleep 2 + done + echo "restored merge operator not observed in logs" >&2 + return 1 +} + function wait_for_maintainer_move() { local api_addr=$1 local changefeed_id=$2 @@ -223,8 +333,13 @@ function create_diff_config() { local work_dir=$1 local target_config_path=$2 - # update output-dir to point to current work directory to avoid collision between subcases. - sed "s|output-dir = \".*\"|output-dir = \"$work_dir/sync_diff/output\"|" "$CUR/conf/diff_config.toml" >"$target_config_path" + # Keep sync-diff aligned with the TiDB endpoints selected for this run, so isolated local reruns do not + # accidentally compare against another default-port cluster on the machine. + sed \ + -e "s|output-dir = \".*\"|output-dir = \"$work_dir/sync_diff/output\"|" \ + -e "/\\[data-sources.mysql1\\]/,/^$/{s/host = \".*\"/host = \"$UP_TIDB_HOST\"/; s/port = [0-9][0-9]*/port = $UP_TIDB_PORT/;}" \ + -e "/\\[data-sources.tidb0\\]/,/^$/{s/host = \".*\"/host = \"$DOWN_TIDB_HOST\"/; s/port = [0-9][0-9]*/port = $DOWN_TIDB_PORT/;}" \ + "$CUR/conf/diff_config.toml" >"$target_config_path" } function run_impl() { @@ -239,9 +354,11 @@ function run_impl() { # Disable balance scheduler to avoid unexpected auto split/move interfering with this test. export GO_FAILPOINTS='github.com/pingcap/ticdc/maintainer/scheduler/StopBalanceScheduler=return(true)' - run_cdc_server --workdir "$work_dir" --binary $CDC_BINARY --logsuffix 1 --addr "127.0.0.1:8300" - run_cdc_server --workdir "$work_dir" --binary $CDC_BINARY --logsuffix 2 --addr "127.0.0.1:8301" - run_cdc_server --workdir "$work_dir" --binary $CDC_BINARY --logsuffix 3 --addr "127.0.0.1:8302" + # Always pass the freshly started upstream PD explicitly so every capture joins the test cluster + # instead of silently falling back to the default 2379 endpoint during isolated local reruns. + run_cdc_server --workdir "$work_dir" --binary $CDC_BINARY --logsuffix 1 --addr "${CDC_ADDRS[0]}" --pd "$pd_addr" + run_cdc_server --workdir "$work_dir" --binary $CDC_BINARY --logsuffix 2 --addr "${CDC_ADDRS[1]}" --pd "$pd_addr" + run_cdc_server --workdir "$work_dir" --binary $CDC_BINARY --logsuffix 3 --addr "${CDC_ADDRS[2]}" --pd "$pd_addr" export GO_FAILPOINTS='' TOPIC_NAME="ticdc-move-table-maintainer-failover-$RANDOM" @@ -252,7 +369,7 @@ function run_impl() { run_pulsar_cluster "$work_dir" normal SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; - *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:${DOWN_TIDB_PORT}/?max-txn-row=1" ;; esac changefeed_config="$work_dir/changefeed.toml" create_changefeed_config "$mode" "$work_dir" "$changefeed_config" @@ -267,17 +384,22 @@ function run_impl() { run_sql "CREATE TABLE maintainer_failover_when_operator.t1(id INT PRIMARY KEY, val VARCHAR(20));" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # move run_sql "CREATE TABLE maintainer_failover_when_operator.t2(id INT PRIMARY KEY, val VARCHAR(20));" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # split run_sql "CREATE TABLE maintainer_failover_when_operator.t3(id INT PRIMARY KEY, val VARCHAR(20));" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # remove + run_sql "CREATE TABLE maintainer_failover_when_operator.t6(id INT PRIMARY KEY, val VARCHAR(20));" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # merge run_sql "split table maintainer_failover_when_operator.t2 between (1) and (100000) regions 20;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "split table maintainer_failover_when_operator.t6 between (1) and (100000) regions 20;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO maintainer_failover_when_operator.t1 VALUES (1, 'a'), (2, 'b');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO maintainer_failover_when_operator.t2 VALUES (1, 'a'), (2, 'b');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO maintainer_failover_when_operator.t3 VALUES (1, 'a'), (2, 'b');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO maintainer_failover_when_operator.t6 VALUES (1, 'a'), (2, 'b');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} check_table_exists "maintainer_failover_when_operator.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_exists "maintainer_failover_when_operator.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_exists "maintainer_failover_when_operator.t3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists "maintainer_failover_when_operator.t6" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 table_id_1=$(get_table_id "maintainer_failover_when_operator" "t1") table_id_2=$(get_table_id "maintainer_failover_when_operator" "t2") table_id_3=$(get_table_id "maintainer_failover_when_operator" "t3") + table_id_6=$(get_table_id "maintainer_failover_when_operator" "t6") api_addr=${CDC_ADDRS[0]} maintainer_addr=$(get_maintainer_addr "$api_addr" "$changefeed_id") @@ -297,9 +419,28 @@ function run_impl() { ensure_table_single_replication "$api_addr" "$changefeed_id" "$table_id_1" "$mode" ensure_table_single_replication "$api_addr" "$changefeed_id" "$table_id_2" "$mode" ensure_table_single_replication "$api_addr" "$changefeed_id" "$table_id_3" "$mode" + ensure_table_single_replication "$api_addr" "$changefeed_id" "$table_id_6" "$mode" ensure_table_on_addr "$api_addr" "$changefeed_id" "$table_id_1" "$origin_addr" "$mode" ensure_table_on_addr "$api_addr" "$changefeed_id" "$table_id_2" "$origin_addr" "$mode" ensure_table_on_addr "$api_addr" "$changefeed_id" "$table_id_3" "$origin_addr" "$mode" + ensure_table_on_addr "$api_addr" "$changefeed_id" "$table_id_6" "$origin_addr" "$mode" + + # Prepare a dedicated split table for the merge failover scenario. + # Steps: + # 1) Split the table into multiple replications. + # 2) Move all split dispatchers onto the surviving origin capture, so the merge request is persisted on a + # node that stays alive after the maintainer moves. + # 3) Record the replication count before issuing merge; a blocked merge should temporarily add one more + # merged dispatcher, and a successful restored merge should later reduce the count by one. + split_table_with_retry "$table_id_6" "$changefeed_id" 10 "$mode" + wait_for_table_replication_count "$api_addr" "$changefeed_id" "$table_id_6" 2 ge "$mode" 60 + move_split_table_with_retry "$origin_addr" "$table_id_6" "$changefeed_id" 10 "$mode" + wait_for_all_table_replications_on_addr "$api_addr" "$changefeed_id" "$table_id_6" "$origin_addr" "$mode" 60 + merge_replication_count_before=$(get_table_replication_count "$api_addr" "$changefeed_id" "$table_id_6" "$mode") + if [ -z "$merge_replication_count_before" ] || [ "$merge_replication_count_before" == "null" ] || [ "$merge_replication_count_before" -lt 2 ]; then + echo "table $table_id_6 does not have enough replications for merge, count=$merge_replication_count_before" >&2 + exit 1 + fi enable_failpoint --addr "$origin_addr" --name "$FAILPOINT_NOT_READY_TO_CLOSE_DISPATCHER" --expr "return(true)" enable_failpoint_on_all_addrs "$FAILPOINT_BLOCK_CREATE_DISPATCHER" "pause" @@ -337,6 +478,15 @@ function run_impl() { # failpoint is enabled on origin, so the table should not move to target wait_for_table_on_addr "$api_addr" "$changefeed_id" "$table_id_1" "$origin_addr" "$mode" + # Merge operator: issue merge against the dedicated split table while source dispatcher close is blocked. + # Once the merged dispatcher appears, the maintainer can be killed safely because the merge request has + # already reached dispatcher manager state and must be restored from bootstrap on failover. + set +e + merge_table_with_retry "$table_id_6" "$changefeed_id" 1 "$mode" & + merge_pid=$! + set -e + wait_for_table_replication_count "$api_addr" "$changefeed_id" "$table_id_6" "$((merge_replication_count_before + 1))" ge "$mode" 60 + # Remove operator: drop one table while dispatcher close is blocked. run_sql "DROP TABLE maintainer_failover_when_operator.t3;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -353,8 +503,12 @@ function run_impl() { disable_failpoint --addr "$origin_addr" --name "$FAILPOINT_NOT_READY_TO_CLOSE_DISPATCHER" disable_failpoint_on_all_addrs_best_effort "$FAILPOINT_BLOCK_CREATE_DISPATCHER" + wait_for_restored_merge_operator_in_logs "$work_dir" 60 + wait_for_table_replication_count "$api_addr" "$changefeed_id" "$table_id_6" "$((merge_replication_count_before - 1))" eq "$mode" 60 + set +e wait "$split_pid" + wait "$merge_pid" set -e # After removing failpoints, the table should be scheduled eventually. @@ -364,6 +518,7 @@ function run_impl() { check_table_exists "maintainer_failover_when_operator.t4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_exists "maintainer_failover_when_operator.t5" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists "maintainer_failover_when_operator.t6" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 check_table_not_exists "maintainer_failover_when_operator.t3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 run_sql "ALTER TABLE maintainer_failover_when_operator.t1 ADD COLUMN c2 INT DEFAULT 0;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -372,6 +527,7 @@ function run_impl() { run_sql "INSERT INTO maintainer_failover_when_operator.t2 VALUES (3, 'c');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO maintainer_failover_when_operator.t4 VALUES (3, 'c');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO maintainer_failover_when_operator.t5 VALUES (3, 'c');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO maintainer_failover_when_operator.t6 VALUES (3, 'c');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} diff_config="$work_dir/diff_config.toml" create_diff_config "$work_dir" "$diff_config"