Skip to content

Commit 7241f6a

Browse files
committed
logpuller: add region runtime registry scaffold
1 parent 165df37 commit 7241f6a

File tree

3 files changed

+457
-0
lines changed

3 files changed

+457
-0
lines changed
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package logpuller
15+
16+
import (
17+
"sort"
18+
"sync"
19+
"time"
20+
21+
"github.com/pingcap/ticdc/heartbeatpb"
22+
"github.com/tikv/client-go/v2/tikv"
23+
)
24+
25+
type regionPhase string
26+
27+
const (
28+
regionPhaseUnknown regionPhase = "unknown"
29+
regionPhaseDiscovered regionPhase = "discovered"
30+
regionPhaseRangeLockWait regionPhase = "range_lock_wait"
31+
regionPhaseRangeLocked regionPhase = "range_locked"
32+
regionPhaseQueued regionPhase = "queued"
33+
regionPhaseRPCReady regionPhase = "rpc_ready"
34+
regionPhaseSent regionPhase = "sent"
35+
regionPhaseWaitInitialized regionPhase = "wait_initialized"
36+
regionPhaseReplicating regionPhase = "replicating"
37+
regionPhaseStuck regionPhase = "stuck"
38+
regionPhaseRetryPending regionPhase = "retry_pending"
39+
regionPhaseRemoved regionPhase = "removed"
40+
regionPhaseDeregistering regionPhase = "deregistering"
41+
)
42+
43+
type regionRuntimeIdentity struct {
44+
subID SubscriptionID
45+
regionID uint64
46+
}
47+
48+
type regionRuntimeKey struct {
49+
subID SubscriptionID
50+
regionID uint64
51+
generation uint64
52+
}
53+
54+
type regionRuntimeState struct {
55+
key regionRuntimeKey
56+
57+
tableID int64
58+
span heartbeatpb.TableSpan
59+
verID tikv.RegionVerID
60+
61+
leaderStoreID uint64
62+
leaderPeerID uint64
63+
storeAddr string
64+
workerID uint64
65+
66+
phase regionPhase
67+
phaseEnterTime time.Time
68+
69+
lastEventTime time.Time
70+
lastResolvedTs uint64
71+
72+
lastError string
73+
lastErrorTime time.Time
74+
retryCount int
75+
76+
requestEnqueueTime time.Time
77+
requestRPCReadyTime time.Time
78+
requestSendTime time.Time
79+
initializedTime time.Time
80+
replicatingTime time.Time
81+
}
82+
83+
func (s regionRuntimeState) clone() regionRuntimeState {
84+
s.span = cloneTableSpan(s.span)
85+
return s
86+
}
87+
88+
func (s *regionRuntimeState) applyRegionInfo(region regionInfo) {
89+
if region.subscribedSpan != nil {
90+
s.tableID = region.subscribedSpan.span.TableID
91+
}
92+
s.span = cloneTableSpan(region.span)
93+
s.verID = region.verID
94+
if region.rpcCtx == nil {
95+
return
96+
}
97+
98+
s.storeAddr = region.rpcCtx.Addr
99+
if region.rpcCtx.Peer != nil {
100+
s.leaderPeerID = region.rpcCtx.Peer.Id
101+
s.leaderStoreID = region.rpcCtx.Peer.StoreId
102+
}
103+
}
104+
105+
func cloneTableSpan(span heartbeatpb.TableSpan) heartbeatpb.TableSpan {
106+
cloned := span
107+
if len(span.StartKey) > 0 {
108+
cloned.StartKey = append([]byte(nil), span.StartKey...)
109+
}
110+
if len(span.EndKey) > 0 {
111+
cloned.EndKey = append([]byte(nil), span.EndKey...)
112+
}
113+
return cloned
114+
}
115+
116+
type regionRuntimeRegistry struct {
117+
mu sync.RWMutex
118+
119+
states map[regionRuntimeKey]*regionRuntimeState
120+
generations map[regionRuntimeIdentity]uint64
121+
}
122+
123+
func newRegionRuntimeRegistry() *regionRuntimeRegistry {
124+
return &regionRuntimeRegistry{
125+
states: make(map[regionRuntimeKey]*regionRuntimeState),
126+
generations: make(map[regionRuntimeIdentity]uint64),
127+
}
128+
}
129+
130+
func (r *regionRuntimeRegistry) allocKey(subID SubscriptionID, regionID uint64) regionRuntimeKey {
131+
r.mu.Lock()
132+
defer r.mu.Unlock()
133+
134+
identity := regionRuntimeIdentity{subID: subID, regionID: regionID}
135+
r.generations[identity]++
136+
return regionRuntimeKey{
137+
subID: subID,
138+
regionID: regionID,
139+
generation: r.generations[identity],
140+
}
141+
}
142+
143+
func (r *regionRuntimeRegistry) upsert(
144+
key regionRuntimeKey,
145+
update func(*regionRuntimeState),
146+
) regionRuntimeState {
147+
r.mu.Lock()
148+
defer r.mu.Unlock()
149+
150+
state, ok := r.states[key]
151+
if !ok {
152+
state = &regionRuntimeState{
153+
key: key,
154+
phase: regionPhaseUnknown,
155+
}
156+
r.states[key] = state
157+
}
158+
if update != nil {
159+
update(state)
160+
}
161+
return state.clone()
162+
}
163+
164+
func (r *regionRuntimeRegistry) transition(
165+
key regionRuntimeKey,
166+
phase regionPhase,
167+
phaseEnterTime time.Time,
168+
) regionRuntimeState {
169+
return r.upsert(key, func(state *regionRuntimeState) {
170+
state.phase = phase
171+
state.phaseEnterTime = phaseEnterTime
172+
})
173+
}
174+
175+
func (r *regionRuntimeRegistry) updateRegionInfo(
176+
key regionRuntimeKey,
177+
region regionInfo,
178+
) regionRuntimeState {
179+
return r.upsert(key, func(state *regionRuntimeState) {
180+
state.applyRegionInfo(region)
181+
})
182+
}
183+
184+
func (r *regionRuntimeRegistry) updateWorker(key regionRuntimeKey, workerID uint64) regionRuntimeState {
185+
return r.upsert(key, func(state *regionRuntimeState) {
186+
state.workerID = workerID
187+
})
188+
}
189+
190+
func (r *regionRuntimeRegistry) updateLastEvent(
191+
key regionRuntimeKey,
192+
lastEventTime time.Time,
193+
) regionRuntimeState {
194+
return r.upsert(key, func(state *regionRuntimeState) {
195+
state.lastEventTime = lastEventTime
196+
})
197+
}
198+
199+
func (r *regionRuntimeRegistry) updateResolvedTs(
200+
key regionRuntimeKey,
201+
resolvedTs uint64,
202+
lastEventTime time.Time,
203+
) regionRuntimeState {
204+
return r.upsert(key, func(state *regionRuntimeState) {
205+
state.lastResolvedTs = resolvedTs
206+
if !lastEventTime.IsZero() {
207+
state.lastEventTime = lastEventTime
208+
}
209+
})
210+
}
211+
212+
func (r *regionRuntimeRegistry) recordError(
213+
key regionRuntimeKey,
214+
err error,
215+
errTime time.Time,
216+
) regionRuntimeState {
217+
return r.upsert(key, func(state *regionRuntimeState) {
218+
if err == nil {
219+
state.lastError = ""
220+
} else {
221+
state.lastError = err.Error()
222+
}
223+
state.lastErrorTime = errTime
224+
})
225+
}
226+
227+
func (r *regionRuntimeRegistry) incRetry(key regionRuntimeKey) regionRuntimeState {
228+
return r.upsert(key, func(state *regionRuntimeState) {
229+
state.retryCount++
230+
})
231+
}
232+
233+
func (r *regionRuntimeRegistry) setRequestEnqueueTime(
234+
key regionRuntimeKey,
235+
enqueueTime time.Time,
236+
) regionRuntimeState {
237+
return r.upsert(key, func(state *regionRuntimeState) {
238+
state.requestEnqueueTime = enqueueTime
239+
})
240+
}
241+
242+
func (r *regionRuntimeRegistry) setRPCReadyTime(
243+
key regionRuntimeKey,
244+
rpcReadyTime time.Time,
245+
) regionRuntimeState {
246+
return r.upsert(key, func(state *regionRuntimeState) {
247+
state.requestRPCReadyTime = rpcReadyTime
248+
})
249+
}
250+
251+
func (r *regionRuntimeRegistry) setRequestSendTime(
252+
key regionRuntimeKey,
253+
sendTime time.Time,
254+
) regionRuntimeState {
255+
return r.upsert(key, func(state *regionRuntimeState) {
256+
state.requestSendTime = sendTime
257+
})
258+
}
259+
260+
func (r *regionRuntimeRegistry) setInitializedTime(
261+
key regionRuntimeKey,
262+
initializedTime time.Time,
263+
) regionRuntimeState {
264+
return r.upsert(key, func(state *regionRuntimeState) {
265+
state.initializedTime = initializedTime
266+
})
267+
}
268+
269+
func (r *regionRuntimeRegistry) setReplicatingTime(
270+
key regionRuntimeKey,
271+
replicatingTime time.Time,
272+
) regionRuntimeState {
273+
return r.upsert(key, func(state *regionRuntimeState) {
274+
state.replicatingTime = replicatingTime
275+
})
276+
}
277+
278+
func (r *regionRuntimeRegistry) get(key regionRuntimeKey) (regionRuntimeState, bool) {
279+
r.mu.RLock()
280+
defer r.mu.RUnlock()
281+
282+
state, ok := r.states[key]
283+
if !ok {
284+
return regionRuntimeState{}, false
285+
}
286+
return state.clone(), true
287+
}
288+
289+
func (r *regionRuntimeRegistry) snapshot() []regionRuntimeState {
290+
r.mu.RLock()
291+
defer r.mu.RUnlock()
292+
293+
snapshots := make([]regionRuntimeState, 0, len(r.states))
294+
for _, state := range r.states {
295+
snapshots = append(snapshots, state.clone())
296+
}
297+
sort.Slice(snapshots, func(i, j int) bool {
298+
left, right := snapshots[i].key, snapshots[j].key
299+
if left.subID != right.subID {
300+
return left.subID < right.subID
301+
}
302+
if left.regionID != right.regionID {
303+
return left.regionID < right.regionID
304+
}
305+
return left.generation < right.generation
306+
})
307+
return snapshots
308+
}
309+
310+
func (r *regionRuntimeRegistry) remove(key regionRuntimeKey) bool {
311+
r.mu.Lock()
312+
defer r.mu.Unlock()
313+
314+
if _, ok := r.states[key]; !ok {
315+
return false
316+
}
317+
delete(r.states, key)
318+
return true
319+
}
320+
321+
func (r *regionRuntimeRegistry) removeBySubscription(subID SubscriptionID) int {
322+
r.mu.Lock()
323+
defer r.mu.Unlock()
324+
325+
removed := 0
326+
for key := range r.states {
327+
if key.subID != subID {
328+
continue
329+
}
330+
delete(r.states, key)
331+
removed++
332+
}
333+
return removed
334+
}

0 commit comments

Comments
 (0)