Skip to content

Commit aee3dd5

Browse files
committed
checker: refine split scatter pending collection
Signed-off-by: lhy1024 <admin@liudos.us>
1 parent d45e5c3 commit aee3dd5

3 files changed

Lines changed: 49 additions & 15 deletions

File tree

pkg/schedule/checker/split_scatter.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ import (
3434

3535
const (
3636
splitScatterPendingLimit = 1024
37+
// The actual retry cadence is also bounded by the checker dispatch loop. This
38+
// is only the minimum interval to avoid retrying the same pending item too
39+
// frequently when checker ticks are fast.
3740
splitScatterRetryBackoff = time.Second
3841
splitScatterPendingTTL = 3 * time.Minute
3942
)
@@ -53,7 +56,10 @@ type splitScatterController struct {
5356
regionScatterer *scatter.RegionScatterer
5457

5558
pendingMu syncutil.RWMutex
56-
pending map[uint64]splitScatterPendingItem
59+
// pending maps a pending region ID to its latest split-scatter batch item.
60+
// The item keeps its batch group so stale snapshots cannot mutate a newer
61+
// pending entry for the same region.
62+
pending map[uint64]splitScatterPendingItem
5763
}
5864

5965
func newSplitScatterController(
@@ -90,10 +96,10 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
9096
// to block pending updates. Stale snapshots are safe because delay/delete
9197
// recheck regionID + group before mutating pending.
9298
pendingSnapshot := make([]splitScatterPendingItem, 0, len(c.pending))
93-
expiredRegionIDs := make([]uint64, 0)
94-
for regionID, pending := range c.pending {
99+
expiredSnapshot := make([]splitScatterPendingItem, 0)
100+
for _, pending := range c.pending {
95101
if !pending.expireAt.IsZero() && !now.Before(pending.expireAt) {
96-
expiredRegionIDs = append(expiredRegionIDs, regionID)
102+
expiredSnapshot = append(expiredSnapshot, pending)
97103
continue
98104
}
99105
pendingSnapshot = append(pendingSnapshot, pending)
@@ -124,6 +130,9 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
124130
candidates = append(candidates, pending)
125131
}
126132
sort.Slice(candidates, func(i, j int) bool {
133+
if !candidates[i].expireAt.Equal(candidates[j].expireAt) {
134+
return candidates[i].expireAt.Before(candidates[j].expireAt)
135+
}
127136
if candidates[i].group != candidates[j].group {
128137
return candidates[i].group < candidates[j].group
129138
}
@@ -133,16 +142,13 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
133142
candidates = candidates[:limit]
134143
}
135144

136-
if len(expiredRegionIDs) > 0 {
145+
if len(expiredSnapshot) > 0 {
137146
expiredCount := 0
138147
c.pendingMu.Lock()
139-
for _, regionID := range expiredRegionIDs {
140-
pending, ok := c.pending[regionID]
141-
if !ok {
142-
continue
143-
}
144-
if !pending.expireAt.IsZero() && !now.Before(pending.expireAt) {
145-
delete(c.pending, regionID)
148+
for _, expired := range expiredSnapshot {
149+
pending, ok := c.pending[expired.regionID]
150+
if ok && pending.group == expired.group && !pending.expireAt.IsZero() && !now.Before(pending.expireAt) {
151+
delete(c.pending, expired.regionID)
146152
expiredCount++
147153
}
148154
}
@@ -175,7 +181,8 @@ func (c *splitScatterController) deletePendingSplitScatter(expected splitScatter
175181
delete(c.pending, expected.regionID)
176182
}
177183

178-
func (c *splitScatterController) removeExpiredPendingSplitScatterLocked(now time.Time) int {
184+
func (c *splitScatterController) removeExpiredPendingSplitScatterLocked() int {
185+
now := time.Now()
179186
expiredCount := 0
180187
for regionID, pending := range c.pending {
181188
if !pending.expireAt.IsZero() && !now.Before(pending.expireAt) {
@@ -207,7 +214,7 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID uint64,
207214
}
208215
c.pendingMu.Lock()
209216
defer c.pendingMu.Unlock()
210-
if expiredCount := c.removeExpiredPendingSplitScatterLocked(time.Now()); expiredCount > 0 {
217+
if expiredCount := c.removeExpiredPendingSplitScatterLocked(); expiredCount > 0 {
211218
splitScatterPendingExpiredCounter.Add(float64(expiredCount))
212219
}
213220
newPendingCount := 0
@@ -220,6 +227,9 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID uint64,
220227
}
221228
}
222229
if len(c.pending)+newPendingCount > splitScatterPendingLimit {
230+
// Keep each split batch atomic. Recording only part of a source/child
231+
// batch would make the scatter group incomplete, so skip the whole batch
232+
// when the remaining capacity cannot fit it.
223233
splitScatterPendingDroppedCounter.Add(float64(newPendingCount))
224234
log.Info("skip recording split scatter batch due to pending limit",
225235
zap.Uint64("source-region-id", sourceRegionID),

pkg/schedule/checker/split_scatter_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,30 @@ func TestCollectTopPendingSortsBeforeLimit(t *testing.T) {
229229
re.Equal(uint64(100), pending[0].regionID)
230230
}
231231

232+
func TestCollectTopPendingPrioritizesNearExpiration(t *testing.T) {
233+
re := require.New(t)
234+
controller, tc, _, cleanup := newTestSplitScatterController(t)
235+
defer cleanup()
236+
237+
controller.RecordSplitScatterBatch(100, []uint64{101})
238+
controller.RecordSplitScatterBatch(200, []uint64{201})
239+
putSplitScatterRegion(tc, 100, "m", "n", 0)
240+
putSplitScatterRegion(tc, 101, "n", "o", 120)
241+
putSplitScatterRegion(tc, 200, "o", "p", 0)
242+
putSplitScatterRegion(tc, 201, "p", "q", 120)
243+
advanceSplitScatterSourceVersion(t, tc)
244+
advanceSplitScatterRegionVersion(t, tc, 200)
245+
246+
expireSplitScatterPendingAt(t, controller, 100, time.Now().Add(2*time.Minute))
247+
expireSplitScatterPendingAt(t, controller, 101, time.Now().Add(2*time.Minute))
248+
expireSplitScatterPendingAt(t, controller, 200, time.Now().Add(time.Minute))
249+
expireSplitScatterPendingAt(t, controller, 201, time.Now().Add(time.Minute))
250+
251+
pending := controller.collectTopPendingSplitScatter(1)
252+
re.Len(pending, 1)
253+
re.Equal(uint64(200), pending[0].regionID)
254+
}
255+
232256
func TestCollectTopPendingDefersBatchUntilSourceVersionAdvances(t *testing.T) {
233257
re := require.New(t)
234258
controller, tc, _, cleanup := newTestSplitScatterController(t)

pkg/schedule/scatter/region_scatterer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ const (
6969
// AdminScatterOperatorDesc is used by external admin/API scatter requests.
7070
AdminScatterOperatorDesc = "scatter-region"
7171
// InternalScatterOperatorDesc is used by PD-internal split-scatter dispatch.
72-
InternalScatterOperatorDesc = "internal-scatter-region"
72+
InternalScatterOperatorDesc = "split-scatter-region"
7373
)
7474

7575
type selectedStoreCounter interface {

0 commit comments

Comments
 (0)