Skip to content

Commit 4254485

Browse files
authored
locate: clamp bucket boundaries to region boundaries (#1822)
Signed-off-by: ekexium <[email protected]>
1 parent 1bf6abc commit 4254485

File tree

3 files changed

+229
-1
lines changed

3 files changed

+229
-1
lines changed

internal/locate/region_cache.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,7 @@ func (l *KeyLocation) LocateBucket(key []byte) *Bucket {
11241124
bucket := l.locateBucket(key)
11251125
// Return the bucket when locateBucket can locate the key
11261126
if bucket != nil {
1127-
return bucket
1127+
return l.clampBucketToRegion(bucket)
11281128
}
11291129
// Case one returns nil too.
11301130
if !l.Contains(key) {
@@ -1187,6 +1187,46 @@ func (b *Bucket) Contains(key []byte) bool {
11871187
return contains(b.StartKey, b.EndKey, key)
11881188
}
11891189

1190+
// clampBucketToRegion ensures bucket boundaries don't exceed region boundaries.
1191+
// This is a defensive measure against stale bucket metadata that can have keys
1192+
// outside the current region boundary.
1193+
func (l *KeyLocation) clampBucketToRegion(bucket *Bucket) *Bucket {
1194+
startKey := bucket.StartKey
1195+
endKey := bucket.EndKey
1196+
1197+
// Clamp start: max(bucket.StartKey, region.StartKey)
1198+
if bytes.Compare(startKey, l.StartKey) < 0 {
1199+
startKey = l.StartKey
1200+
}
1201+
1202+
// Clamp end: min(bucket.EndKey, region.EndKey)
1203+
// Note: empty endKey means infinity, so clamp if bucket is unbounded but region is not
1204+
if len(l.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(endKey, l.EndKey) > 0) {
1205+
endKey = l.EndKey
1206+
}
1207+
1208+
// Ensure clamped bucket is valid (start < end)
1209+
if len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0 {
1210+
logutil.BgLogger().Warn("Clamped bucket invalid, using region boundaries",
1211+
zap.Uint64("regionID", l.Region.GetID()),
1212+
zap.String("bucketStart", redact.Key(bucket.StartKey)),
1213+
zap.String("bucketEnd", redact.Key(bucket.EndKey)),
1214+
zap.String("regionStart", redact.Key(l.StartKey)),
1215+
zap.String("regionEnd", redact.Key(l.EndKey)))
1216+
startKey = l.StartKey
1217+
endKey = l.EndKey
1218+
}
1219+
1220+
// If no clamping needed, return original bucket
1221+
if bytes.Equal(startKey, bucket.StartKey) && bytes.Equal(endKey, bucket.EndKey) {
1222+
return bucket
1223+
}
1224+
1225+
metrics.TiKVBucketClampedCounter.Inc()
1226+
1227+
return &Bucket{StartKey: startKey, EndKey: endKey}
1228+
}
1229+
11901230
// LocateKeyRange lists region and range that key in [start_key,end_key).
11911231
// Regions without leader won't be returned.
11921232
func (c *RegionCache) LocateKeyRange(bo *retry.Backoffer, startKey, endKey []byte) ([]*KeyLocation, error) {

internal/locate/region_cache_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,6 +1918,184 @@ func (s *testRegionCacheSuite) TestLocateBucket() {
19181918
}
19191919
}
19201920

1921+
// TestBucketClampingToRegion tests that bucket boundaries exceeding region
1922+
// boundaries are clamped correctly.
1923+
func TestBucketClampingToRegion(t *testing.T) {
1924+
tests := []struct {
1925+
name string
1926+
regionStart []byte
1927+
regionEnd []byte
1928+
bucketStart []byte
1929+
bucketEnd []byte
1930+
wantStart []byte
1931+
wantEnd []byte
1932+
shouldBeClamped bool
1933+
}{
1934+
{
1935+
name: "bucket within region - no clamping",
1936+
regionStart: []byte("a"),
1937+
regionEnd: []byte("z"),
1938+
bucketStart: []byte("f"),
1939+
bucketEnd: []byte("m"),
1940+
wantStart: []byte("f"),
1941+
wantEnd: []byte("m"),
1942+
shouldBeClamped: false,
1943+
},
1944+
{
1945+
name: "bucket start before region - clamp start",
1946+
regionStart: []byte("f"),
1947+
regionEnd: []byte("z"),
1948+
bucketStart: []byte("a"), // Before region start
1949+
bucketEnd: []byte("m"),
1950+
wantStart: []byte("f"), // Clamped to region start
1951+
wantEnd: []byte("m"),
1952+
shouldBeClamped: true,
1953+
},
1954+
{
1955+
name: "bucket end after region - clamp end",
1956+
regionStart: []byte("a"),
1957+
regionEnd: []byte("m"),
1958+
bucketStart: []byte("f"),
1959+
bucketEnd: []byte("z"), // After region end
1960+
wantStart: []byte("f"),
1961+
wantEnd: []byte("m"), // Clamped to region end
1962+
shouldBeClamped: true,
1963+
},
1964+
{
1965+
name: "bucket exceeds both boundaries - clamp both",
1966+
regionStart: []byte("f"),
1967+
regionEnd: []byte("m"),
1968+
bucketStart: []byte("a"), // Before region start
1969+
bucketEnd: []byte("z"), // After region end
1970+
wantStart: []byte("f"), // Clamped to region start
1971+
wantEnd: []byte("m"), // Clamped to region end
1972+
shouldBeClamped: true,
1973+
},
1974+
{
1975+
name: "empty region start - bucket start not clamped",
1976+
regionStart: []byte{}, // Beginning of keyspace
1977+
regionEnd: []byte("m"),
1978+
bucketStart: []byte("a"),
1979+
bucketEnd: []byte("z"),
1980+
wantStart: []byte("a"),
1981+
wantEnd: []byte("m"), // Only end clamped
1982+
shouldBeClamped: true,
1983+
},
1984+
{
1985+
name: "empty region end - bucket end not clamped",
1986+
regionStart: []byte("f"),
1987+
regionEnd: []byte{}, // End of keyspace (infinity)
1988+
bucketStart: []byte("a"),
1989+
bucketEnd: []byte("z"),
1990+
wantStart: []byte("f"), // Only start clamped
1991+
wantEnd: []byte("z"),
1992+
shouldBeClamped: true,
1993+
},
1994+
{
1995+
name: "bucket completely before region - fallback to region",
1996+
regionStart: []byte("m"),
1997+
regionEnd: []byte("z"),
1998+
bucketStart: []byte("a"),
1999+
bucketEnd: []byte("f"), // Ends before region starts
2000+
wantStart: []byte("m"), // Falls back to region boundaries
2001+
wantEnd: []byte("z"),
2002+
shouldBeClamped: true,
2003+
},
2004+
{
2005+
name: "bucket with empty startKey - clamp to region start",
2006+
regionStart: []byte("f"),
2007+
regionEnd: []byte("z"),
2008+
bucketStart: []byte{}, // Beginning of keyspace
2009+
bucketEnd: []byte("m"),
2010+
wantStart: []byte("f"), // Clamped to region start
2011+
wantEnd: []byte("m"),
2012+
shouldBeClamped: true,
2013+
},
2014+
{
2015+
name: "bucket with empty endKey - clamp to region end",
2016+
regionStart: []byte("a"),
2017+
regionEnd: []byte("m"),
2018+
bucketStart: []byte("f"),
2019+
bucketEnd: []byte{}, // End of keyspace (infinity)
2020+
wantStart: []byte("f"),
2021+
wantEnd: []byte("m"), // Clamped to region end
2022+
shouldBeClamped: true,
2023+
},
2024+
{
2025+
name: "bucket with empty startKey and region with empty startKey - no start clamping",
2026+
regionStart: []byte{}, // Beginning of keyspace
2027+
regionEnd: []byte("z"),
2028+
bucketStart: []byte{}, // Beginning of keyspace
2029+
bucketEnd: []byte("m"),
2030+
wantStart: []byte{}, // No clamping needed
2031+
wantEnd: []byte("m"),
2032+
shouldBeClamped: false,
2033+
},
2034+
{
2035+
name: "bucket with empty endKey and region with empty endKey - no end clamping",
2036+
regionStart: []byte("a"),
2037+
regionEnd: []byte{}, // End of keyspace (infinity)
2038+
bucketStart: []byte("f"),
2039+
bucketEnd: []byte{}, // End of keyspace (infinity)
2040+
wantStart: []byte("f"),
2041+
wantEnd: []byte{}, // No clamping needed
2042+
shouldBeClamped: false,
2043+
},
2044+
{
2045+
name: "bucket spanning full keyspace - clamp to region",
2046+
regionStart: []byte("f"),
2047+
regionEnd: []byte("m"),
2048+
bucketStart: []byte{}, // Beginning of keyspace
2049+
bucketEnd: []byte{}, // End of keyspace (infinity)
2050+
wantStart: []byte("f"), // Clamped to region start
2051+
wantEnd: []byte("m"), // Clamped to region end
2052+
shouldBeClamped: true,
2053+
},
2054+
{
2055+
name: "infinity region - no clamping needed",
2056+
regionStart: []byte{}, // Beginning of keyspace
2057+
regionEnd: []byte{}, // End of keyspace (infinity)
2058+
bucketStart: []byte("a"),
2059+
bucketEnd: []byte("z"),
2060+
wantStart: []byte("a"),
2061+
wantEnd: []byte("z"),
2062+
shouldBeClamped: false,
2063+
},
2064+
}
2065+
2066+
for _, tt := range tests {
2067+
t.Run(tt.name, func(t *testing.T) {
2068+
loc := &KeyLocation{
2069+
Region: RegionVerID{id: 1},
2070+
StartKey: tt.regionStart,
2071+
EndKey: tt.regionEnd,
2072+
}
2073+
bucket := &Bucket{
2074+
StartKey: tt.bucketStart,
2075+
EndKey: tt.bucketEnd,
2076+
}
2077+
2078+
result := loc.clampBucketToRegion(bucket)
2079+
2080+
if !bytes.Equal(result.StartKey, tt.wantStart) {
2081+
t.Errorf("StartKey = %q, want %q", result.StartKey, tt.wantStart)
2082+
}
2083+
if !bytes.Equal(result.EndKey, tt.wantEnd) {
2084+
t.Errorf("EndKey = %q, want %q", result.EndKey, tt.wantEnd)
2085+
}
2086+
2087+
// Verify that clamped bucket is different from original if clamping was expected
2088+
isSameBucket := result == bucket
2089+
if tt.shouldBeClamped && isSameBucket {
2090+
t.Error("Expected bucket to be clamped (new object), but got same object")
2091+
}
2092+
if !tt.shouldBeClamped && !isSameBucket {
2093+
t.Error("Expected no clamping (same object), but got new object")
2094+
}
2095+
})
2096+
}
2097+
}
2098+
19212099
func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
19222100
// Split at "b", "c", "d", "e"
19232101
regions := s.cluster.AllocIDs(4)

metrics/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ var (
121121
TiKVValidateReadTSFromPDCount prometheus.Counter
122122
TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge
123123
TiKVStaleRegionFromPDCounter prometheus.Counter
124+
TiKVBucketClampedCounter prometheus.Counter
124125
TiKVPipelinedFlushThrottleSecondsHistogram prometheus.Histogram
125126
TiKVTxnWriteConflictCounter prometheus.Counter
126127
TiKVAsyncSendReqCounter *prometheus.CounterVec
@@ -893,6 +894,14 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
893894
Help: "Counter of stale region from PD",
894895
ConstLabels: constLabels,
895896
})
897+
TiKVBucketClampedCounter = prometheus.NewCounter(
898+
prometheus.CounterOpts{
899+
Namespace: namespace,
900+
Subsystem: subsystem,
901+
Name: "bucket_clamped",
902+
Help: "Counter of bucket boundaries clamped to region boundaries",
903+
ConstLabels: constLabels,
904+
})
896905
TiKVPipelinedFlushThrottleSecondsHistogram = prometheus.NewHistogram(
897906
prometheus.HistogramOpts{
898907
Namespace: namespace,
@@ -1035,6 +1044,7 @@ func RegisterMetrics() {
10351044
prometheus.MustRegister(TiKVValidateReadTSFromPDCount)
10361045
prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge)
10371046
prometheus.MustRegister(TiKVStaleRegionFromPDCounter)
1047+
prometheus.MustRegister(TiKVBucketClampedCounter)
10381048
prometheus.MustRegister(TiKVPipelinedFlushThrottleSecondsHistogram)
10391049
prometheus.MustRegister(TiKVTxnWriteConflictCounter)
10401050
prometheus.MustRegister(TiKVAsyncSendReqCounter)

0 commit comments

Comments
 (0)