Skip to content

Commit 30f6c75

Browse files
committed
r
1 parent 595bb36 commit 30f6c75

File tree

6 files changed

+245
-1
lines changed

6 files changed

+245
-1
lines changed

logservice/logpuller/region_event_handler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState) {
252252

253253
func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *cdcpb.Event_Entries_) {
254254
regionID, _, _ := state.getRegionMeta()
255+
if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil {
256+
state.worker.client.regionRuntimeRegistry.updateLastEvent(state.region.runtimeKey, time.Now())
257+
}
255258
assembleRowEvent := func(regionID uint64, entry *cdcpb.Event_Row) common.RawKVEntry {
256259
var opType common.OpType
257260
switch entry.GetOpType() {
@@ -277,6 +280,13 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c
277280
switch entry.Type {
278281
case cdcpb.Event_INITIALIZED:
279282
state.setInitialized()
283+
if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil {
284+
now := time.Now()
285+
registry := state.worker.client.regionRuntimeRegistry
286+
registry.setInitializedTime(state.region.runtimeKey, now)
287+
registry.setReplicatingTime(state.region.runtimeKey, now)
288+
registry.transition(state.region.runtimeKey, regionPhaseReplicating, now)
289+
}
280290
log.Debug("region is initialized",
281291
zap.Int64("tableID", span.span.TableID),
282292
zap.Uint64("regionID", regionID),
@@ -364,6 +374,9 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u
364374
}
365375

366376
state.updateResolvedTs(resolvedTs)
377+
if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil {
378+
state.worker.client.regionRuntimeRegistry.updateResolvedTs(state.region.runtimeKey, resolvedTs, time.Now())
379+
}
367380

368381
ts := uint64(0)
369382
shouldAdvance := false

logservice/logpuller/region_request_worker.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,11 @@ func (s *regionRequestWorker) processRegionSendTask(
431431
// sentRequests visible in the same order and avoids leaving stale
432432
// requests in cleanup.
433433
s.requestCache.markSent(regionReq)
434+
if s.client != nil && s.client.regionRuntimeRegistry != nil && region.runtimeKey.isValid() {
435+
s.client.regionRuntimeRegistry.updateWorker(region.runtimeKey, s.workerID)
436+
s.client.regionRuntimeRegistry.setRequestSendTime(region.runtimeKey, time.Now())
437+
s.client.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseWaitInitialized, time.Now())
438+
}
434439
if err := doSend(s.createRegionRequest(region)); err != nil {
435440
state.markStopped(err)
436441
return err
@@ -511,7 +516,11 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS
511516
// add adds a region request to the worker's cache
512517
// It blocks if the cache is full until there's space or ctx is cancelled
513518
func (s *regionRequestWorker) add(ctx context.Context, region regionInfo, force bool) (bool, error) {
514-
return s.requestCache.add(ctx, region, force)
519+
ok, err := s.requestCache.add(ctx, region, force)
520+
if ok && err == nil && s.client != nil && s.client.regionRuntimeRegistry != nil && region.runtimeKey.isValid() {
521+
s.client.regionRuntimeRegistry.setRequestEnqueueTime(region.runtimeKey, time.Now())
522+
}
523+
return ok, err
515524
}
516525

517526
func (s *regionRequestWorker) clearPendingRegions() []regionInfo {

logservice/logpuller/region_runtime.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ type regionRuntimeKey struct {
4747
generation uint64
4848
}
4949

50+
func (k regionRuntimeKey) isValid() bool {
51+
return k.subID != InvalidSubscriptionID && k.regionID != 0 && k.generation != 0
52+
}
53+
5054
type regionRuntimeState struct {
5155
key regionRuntimeKey
5256

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package logpuller
15+
16+
import (
17+
"context"
18+
"testing"
19+
"time"
20+
21+
"github.com/pingcap/ticdc/heartbeatpb"
22+
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
23+
"github.com/pingcap/ticdc/pkg/common"
24+
"github.com/pingcap/ticdc/pkg/pdutil"
25+
"github.com/stretchr/testify/require"
26+
"github.com/tikv/client-go/v2/tikv"
27+
)
28+
29+
func TestScheduleRegionRequestUpdatesRuntimeRegistry(t *testing.T) {
30+
client := &subscriptionClient{
31+
regionTaskQueue: NewPriorityQueue(),
32+
regionRuntimeRegistry: newRegionRuntimeRegistry(),
33+
pdClock: pdutil.NewClock4Test(),
34+
}
35+
36+
rawSpan := heartbeatpb.TableSpan{
37+
TableID: 1,
38+
StartKey: []byte{'a'},
39+
EndKey: []byte{'z'},
40+
}
41+
consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false }
42+
advanceResolvedTs := func(uint64) {}
43+
subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false)
44+
45+
regionSpan := heartbeatpb.TableSpan{
46+
TableID: 1,
47+
StartKey: []byte{'b'},
48+
EndKey: []byte{'c'},
49+
}
50+
region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), regionSpan, nil, subSpan, false)
51+
52+
client.scheduleRegionRequest(context.Background(), region, TaskLowPrior)
53+
54+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
55+
defer cancel()
56+
task, err := client.regionTaskQueue.Pop(ctx)
57+
require.NoError(t, err)
58+
queued := task.GetRegionInfo()
59+
require.True(t, queued.runtimeKey.isValid())
60+
61+
state, ok := client.regionRuntimeRegistry.get(queued.runtimeKey)
62+
require.True(t, ok)
63+
require.Equal(t, regionPhaseQueued, state.phase)
64+
require.Equal(t, uint64(10), state.verID.GetID())
65+
require.False(t, state.rangeLockAcquiredTime.IsZero())
66+
}
67+
68+
func TestOnRegionFailUpdatesRuntimeRegistry(t *testing.T) {
69+
client := &subscriptionClient{
70+
regionRuntimeRegistry: newRegionRuntimeRegistry(),
71+
errCache: newErrCache(),
72+
}
73+
client.ctx, client.cancel = context.WithCancel(context.Background())
74+
defer client.cancel()
75+
76+
rawSpan := heartbeatpb.TableSpan{
77+
TableID: 1,
78+
StartKey: []byte{'a'},
79+
EndKey: []byte{'z'},
80+
}
81+
consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false }
82+
advanceResolvedTs := func(uint64) {}
83+
subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false)
84+
85+
lockRes := subSpan.rangeLock.LockRange(context.Background(), []byte{'b'}, []byte{'c'}, 10, 1)
86+
require.Equal(t, regionlock.LockRangeStatusSuccess, lockRes.Status)
87+
88+
region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), heartbeatpb.TableSpan{
89+
TableID: 1,
90+
StartKey: []byte{'b'},
91+
EndKey: []byte{'c'},
92+
}, nil, subSpan, false)
93+
region.lockedRangeState = lockRes.LockedRangeState
94+
95+
client.ensureRegionRuntime(&region, time.Now())
96+
require.True(t, region.runtimeKey.isValid())
97+
98+
client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{}))
99+
100+
state, ok := client.regionRuntimeRegistry.get(region.runtimeKey)
101+
require.True(t, ok)
102+
require.Equal(t, regionPhaseRetryPending, state.phase)
103+
require.Equal(t, "send request to store error", state.lastError)
104+
require.Equal(t, 1, state.retryCount)
105+
}
106+
107+
func TestHandleResolvedTsUpdatesRuntimeRegistry(t *testing.T) {
108+
client := &subscriptionClient{
109+
regionRuntimeRegistry: newRegionRuntimeRegistry(),
110+
}
111+
worker := &regionRequestWorker{client: client}
112+
113+
rawSpan := heartbeatpb.TableSpan{
114+
TableID: 1,
115+
StartKey: []byte{'a'},
116+
EndKey: []byte{'z'},
117+
}
118+
subSpan := &subscribedSpan{
119+
subID: SubscriptionID(1),
120+
startTs: 100,
121+
span: rawSpan,
122+
rangeLock: regionlock.NewRangeLock(1, rawSpan.StartKey, rawSpan.EndKey, 100),
123+
filterLoop: false,
124+
}
125+
subSpan.resolvedTs.Store(100)
126+
subSpan.resolvedTsUpdated.Store(time.Now().Unix())
127+
subSpan.advanceInterval = 0
128+
129+
lockRes := subSpan.rangeLock.LockRange(context.Background(), rawSpan.StartKey, rawSpan.EndKey, 10, 1)
130+
require.Equal(t, regionlock.LockRangeStatusSuccess, lockRes.Status)
131+
lockRes.LockedRangeState.Initialized.Store(true)
132+
133+
region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false)
134+
region.lockedRangeState = lockRes.LockedRangeState
135+
region.runtimeKey = client.regionRuntimeRegistry.allocKey(subSpan.subID, region.verID.GetID())
136+
client.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, region)
137+
138+
state := newRegionFeedState(region, uint64(subSpan.subID), worker)
139+
state.start()
140+
141+
resolvedTs := uint64(200)
142+
handleResolvedTs(subSpan, state, resolvedTs)
143+
144+
stored, ok := client.regionRuntimeRegistry.get(region.runtimeKey)
145+
require.True(t, ok)
146+
require.Equal(t, resolvedTs, stored.lastResolvedTs)
147+
require.False(t, stored.lastEventTime.IsZero())
148+
}

logservice/logpuller/region_state.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ const (
2828
)
2929

3030
type regionInfo struct {
31+
// runtimeKey links this region info to a regionRuntimeRegistry entry.
32+
// It is assigned by subscriptionClient when scheduling the region.
33+
runtimeKey regionRuntimeKey
34+
3135
verID tikv.RegionVerID
3236
// The span of the region.
3337
// Note(dongmen): The span doesn't always represent the whole span of a region.

logservice/logpuller/subscription_client.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,43 @@ type subscriptionClient struct {
226226
errCache *errCache
227227
}
228228

229+
func (s *subscriptionClient) ensureRegionRuntime(region *regionInfo, now time.Time) {
230+
if s.regionRuntimeRegistry == nil {
231+
return
232+
}
233+
if region == nil || region.subscribedSpan == nil {
234+
return
235+
}
236+
if region.verID.GetID() == 0 {
237+
return
238+
}
239+
if !region.runtimeKey.isValid() {
240+
region.runtimeKey = s.regionRuntimeRegistry.allocKey(region.subscribedSpan.subID, region.verID.GetID())
241+
s.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, *region)
242+
s.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseDiscovered, now)
243+
}
244+
}
245+
246+
func (s *subscriptionClient) updateRegionRuntimeInfo(region regionInfo) {
247+
if s.regionRuntimeRegistry == nil {
248+
return
249+
}
250+
if !region.runtimeKey.isValid() {
251+
return
252+
}
253+
s.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, region)
254+
}
255+
256+
func (s *subscriptionClient) transitionRegionRuntime(region regionInfo, phase regionPhase, now time.Time) {
257+
if s.regionRuntimeRegistry == nil {
258+
return
259+
}
260+
if !region.runtimeKey.isValid() {
261+
return
262+
}
263+
s.regionRuntimeRegistry.transition(region.runtimeKey, phase, now)
264+
}
265+
229266
// NewSubscriptionClient creates a client.
230267
func NewSubscriptionClient(
231268
config *SubscriptionClientConfig,
@@ -508,6 +545,10 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) {
508545
log.Info("subscription client stop span is finished",
509546
zap.Uint64("subscriptionID", uint64(rt.subID)))
510547

548+
if s.regionRuntimeRegistry != nil {
549+
s.regionRuntimeRegistry.removeBySubscription(rt.subID)
550+
}
551+
511552
err := s.ds.RemovePath(rt.subID)
512553
if err != nil {
513554
log.Warn("subscription client remove path failed",
@@ -521,6 +562,11 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) {
521562

522563
// Note: don't block the caller, otherwise there may be deadlock
523564
func (s *subscriptionClient) onRegionFail(errInfo regionErrorInfo) {
565+
if s.regionRuntimeRegistry != nil && errInfo.runtimeKey.isValid() {
566+
s.regionRuntimeRegistry.recordError(errInfo.runtimeKey, errInfo.err, time.Now())
567+
s.regionRuntimeRegistry.incRetry(errInfo.runtimeKey)
568+
s.regionRuntimeRegistry.transition(errInfo.runtimeKey, regionPhaseRetryPending, time.Now())
569+
}
524570
// unlock the range early to prevent blocking the range.
525571
if errInfo.subscribedSpan.rangeLock.UnlockRange(
526572
errInfo.span.StartKey, errInfo.span.EndKey,
@@ -625,6 +671,11 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro
625671
if !ok {
626672
continue
627673
}
674+
s.updateRegionRuntimeInfo(region)
675+
if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() {
676+
s.regionRuntimeRegistry.setRPCReadyTime(region.runtimeKey, time.Now())
677+
}
678+
s.transitionRegionRuntime(region, regionPhaseRPCReady, time.Now())
628679

629680
store := getStore(region.rpcCtx.Addr)
630681
worker := store.getRequestWorker()
@@ -800,21 +851,36 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests(
800851
// scheduleRegionRequest locks the region's range and send the region to regionTaskQueue,
801852
// which will be handled by handleRegions.
802853
func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) {
854+
s.ensureRegionRuntime(&region, time.Now())
803855
lockRangeResult := region.subscribedSpan.rangeLock.LockRange(
804856
ctx, region.span.StartKey, region.span.EndKey, region.verID.GetID(), region.verID.GetVer())
805857

806858
if lockRangeResult.Status == regionlock.LockRangeStatusWait {
859+
s.transitionRegionRuntime(region, regionPhaseRangeLockWait, time.Now())
807860
lockRangeResult = lockRangeResult.WaitFn()
808861
}
809862

810863
switch lockRangeResult.Status {
811864
case regionlock.LockRangeStatusSuccess:
812865
region.lockedRangeState = lockRangeResult.LockedRangeState
866+
if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() {
867+
s.regionRuntimeRegistry.setRangeLockAcquiredTime(region.runtimeKey, lockRangeResult.LockedRangeState.Created)
868+
}
869+
s.transitionRegionRuntime(region, regionPhaseQueued, time.Now())
813870
s.regionTaskQueue.Push(NewRegionPriorityTask(priority, region, s.pdClock.CurrentTS()))
814871
case regionlock.LockRangeStatusStale:
872+
s.transitionRegionRuntime(region, regionPhaseRemoved, time.Now())
873+
if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() {
874+
s.regionRuntimeRegistry.remove(region.runtimeKey)
875+
}
815876
for _, r := range lockRangeResult.RetryRanges {
816877
s.scheduleRangeRequest(ctx, r, region.subscribedSpan, region.filterLoop, priority)
817878
}
879+
case regionlock.LockRangeStatusCancel:
880+
s.transitionRegionRuntime(region, regionPhaseRemoved, time.Now())
881+
if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() {
882+
s.regionRuntimeRegistry.remove(region.runtimeKey)
883+
}
818884
default:
819885
return
820886
}

0 commit comments

Comments
 (0)