Skip to content

Commit d675a7d

Browse files
committed
perf(pruning): adaptive proportional sub-sharding + 2s wait
Phase A2 deploy showed skip-empty cut block_indexer prune from 11min to 33s (249/256 shards skipped) but txIndex (256/0) and block-store BH: prefix (256/0) had no empty shards. Per-shard byte counters showed each populated shard wrote 700MB-2GB; the cgroup peak only dropped from 42GB to 40GB because the 200ms pause is below Linux's 5s vm.dirty_writeback_centisecs default — kernel can't drain dirty pages between bursts that close together. Two changes here: 1. WaitTimeBetweenCompactions 200ms -> 2s. Crosses the writeback boundary so the kernel actually flushes between shards. Wall-time cost: ~256 * 1.8s = ~8min extra per sweep, fits the 3h budget. 2. Adaptive proportional sub-sharding driven by persisted prior-cycle dWrite. compactShardAdaptive reads the previous cycle's measured write_bytes for shard b from the metadata DB and, if above SubshardSplitThresholdBytes (500 MiB), divides that single byte shard into ceil(prior/500MiB) sub-ranges (capped at 8) by interpolating an extra byte. Each sub-range goes through the same skip-empty probe + measured compact, so a hot shard whose data clusters in a sub-range only touches the populated sub-ranges next cycle. Sub-shard pacing = parent shard pacing (each sub gets its own 2s wait), spreading I/O over more writeback windows. Why proportional rather than static: txIndex shards 0x00-0x48 each write ~1GB; static depth-2 sharding would either over-split rarely- hot shards or under-split the hot zone. Driving from prior dWrite adapts to whatever distribution the workload actually has. History keys live under "compact_history_<label>_<NNN>" and are only written when totalDW > 0, so empty-DB and empty-shard cases don't pollute the keyspace with metadata that would force spurious future compactions. Tests: - TestCompactShardAdaptive_NoHistory_SinglePass — first cycle / cold - TestCompactShardAdaptive_HotHistory_SubdividesProportionally — 1 GiB prior -> 3 sub-shards, contiguous coverage - TestSplitFactorFromHistory — threshold + cap math - Pre-existing skip-empty tests updated to swapCompactAndMeasure.
1 parent b4d0479 commit d675a7d

2 files changed

Lines changed: 244 additions & 30 deletions

File tree

internal/db/db_utils.go

Lines changed: 163 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,25 @@ const (
2424
// SSTs per shard once the DB grew past steady-state.
2525
MaxCompactionInterval = int64(50000)
2626

27-
// WaitTimeBetweenCompactions throttles successive shard compactions to
28-
// (a) yield to the scheduler so consensus goroutines aren't starved and
29-
// (b) give the kernel time to drain dirty page-cache pages before the
30-
// next shard re-fills it. 200ms gives writeback well over Linux's 5s
31-
// vm.dirty_writeback_centisecs default a real window between bursts.
32-
// Combined with skip-empty-shard, sweeps now spend most time on the
33-
// ~30 populated shards (out of 256) so total wall-time cost is small.
34-
WaitTimeBetweenCompactions = 200 * time.Millisecond
27+
// WaitTimeBetweenCompactions throttles successive shard compactions.
28+
// Linux's vm.dirty_writeback_centisecs defaults to 5s; pauses shorter
29+
// than that don't cross a writeback boundary so the kernel can't reliably
30+
// drain dirty pages between bursts. 2s gets us past the writeback flush
31+
// without idling forever and is the dominant lever for keeping page
32+
// cache from accumulating across shards.
33+
WaitTimeBetweenCompactions = 2 * time.Second
34+
35+
// SubshardSplitThresholdBytes is the per-shard write-byte threshold above
36+
// which the next prune cycle subdivides that shard. We cap subshard count
37+
// at SubshardMaxSplit so a 4 GB shard becomes 8 sub-shards of ~500 MB,
38+
// not 80 sub-shards. Below this threshold a shard runs un-split.
39+
SubshardSplitThresholdBytes = uint64(500 * 1024 * 1024) // 500 MiB
40+
SubshardMaxSplit = 8
41+
42+
// CompactHistoryPrefix scopes adaptive-sharding metadata (per-shard prior
43+
// dWrite). Stored alongside CompactPrefix in the same DB so resume +
44+
// adaptive-split state share a key namespace.
45+
compactHistoryPrefix = "compact_history_"
3546
)
3647

3748
var (
@@ -181,18 +192,15 @@ func CompactPrefixHex256(db dbm.DB, prefix string, label string) error {
181192
shardLabel = fmt.Sprintf("%s %s ff-fg", label, prefix)
182193
}
183194

184-
hasAny, err := shardHasKeys(db, start, end)
195+
dw, err := compactShardAdaptive(db, start, end, label, b)
185196
if err != nil {
186-
return fmt.Errorf("compaction %s probe failed: %w", shardLabel, err)
197+
return fmt.Errorf("compaction %s: %w", shardLabel, err)
187198
}
188-
if !hasAny {
199+
if dw == 0 {
189200
skipped++
190201
continue
191202
}
192203
compacted++
193-
if err := compactAndLog(db, start, end, shardLabel); err != nil {
194-
return err
195-
}
196204
}
197205

198206
log.Printf("compaction %s prefix %q DONE in %s (compacted=%d skipped_empty=%d)",
@@ -227,18 +235,15 @@ func CompactSharded256(db dbm.DB, label string) error {
227235
shardLabel = fmt.Sprintf("%s shard %02x-%02x", label, b, b+1)
228236
}
229237

230-
hasAny, err := shardHasKeys(db, start, end)
238+
dw, err := compactShardAdaptive(db, start, end, label, b)
231239
if err != nil {
232-
return fmt.Errorf("compaction %s probe failed: %w", shardLabel, err)
240+
return fmt.Errorf("compaction %s: %w", shardLabel, err)
233241
}
234-
if !hasAny {
242+
if dw == 0 {
235243
skipped++
236244
continue
237245
}
238246
compacted++
239-
if err := compactAndLog(db, start, end, shardLabel); err != nil {
240-
return err
241-
}
242247
}
243248

244249
log.Printf("compaction %s DONE in %s (compacted=%d skipped_empty=%d)",
@@ -259,12 +264,146 @@ func shardHasKeys(db dbm.DB, start, end []byte) (bool, error) {
259264
return it.Valid(), nil
260265
}
261266

267+
// historyKey returns the metadata key under which we persist the prior-cycle
268+
// dWrite for a (label, shardIdx) pair. The shard index is the byte b in
269+
// CompactSharded256 / CompactPrefixHex256.
270+
func historyKey(label string, shardIdx int) []byte {
271+
return []byte(fmt.Sprintf("%s%s_%03d", compactHistoryPrefix, label, shardIdx))
272+
}
273+
274+
func readHistoryDWrite(db dbm.DB, label string, shardIdx int) uint64 {
275+
bz, err := db.Get(historyKey(label, shardIdx))
276+
if err != nil || len(bz) != 8 {
277+
return 0
278+
}
279+
return binary.BigEndian.Uint64(bz)
280+
}
281+
282+
func writeHistoryDWrite(db dbm.DB, label string, shardIdx int, dWrite uint64) {
283+
var b [8]byte
284+
binary.BigEndian.PutUint64(b[:], dWrite)
285+
_ = db.Set(historyKey(label, shardIdx), b[:])
286+
}
287+
288+
// splitFactorFromHistory maps prior-cycle dWrite to a sub-shard count.
289+
// Below threshold: 1 (no split). Above: ceil(prior / threshold), capped.
290+
func splitFactorFromHistory(priorDWrite uint64) int {
291+
if priorDWrite < SubshardSplitThresholdBytes {
292+
return 1
293+
}
294+
n := int((priorDWrite + SubshardSplitThresholdBytes - 1) / SubshardSplitThresholdBytes)
295+
if n > SubshardMaxSplit {
296+
n = SubshardMaxSplit
297+
}
298+
if n < 1 {
299+
n = 1
300+
}
301+
return n
302+
}
303+
304+
// byteRangeSplit divides [start, end) into n sub-ranges by interpolating an
305+
// extra byte after `start`. The returned ranges are contiguous and cover
306+
// exactly [start, end). For the open-ended case end==nil it splits the
307+
// keyspace [start, ∞) using ascending second-byte interpolation; the final
308+
// sub-range stays open-ended.
309+
//
310+
// Examples (n=4):
311+
//
312+
// [0x00, 0x01) -> [0x00, 0x0040), [0x0040, 0x0080), [0x0080, 0x00c0), [0x00c0, 0x01)
313+
// [0x42, 0x43) -> [0x42, 0x4240), ..., [0x42c0, 0x43)
314+
func byteRangeSplit(start, end []byte, n int) [][2][]byte {
315+
if n <= 1 {
316+
return [][2][]byte{{start, end}}
317+
}
318+
out := make([][2][]byte, 0, n)
319+
for i := 0; i < n; i++ {
320+
var sStart, sEnd []byte
321+
if i == 0 {
322+
sStart = start
323+
} else {
324+
sStart = appendInterpolant(start, i, n)
325+
}
326+
if i == n-1 {
327+
sEnd = end
328+
} else {
329+
sEnd = appendInterpolant(start, i+1, n)
330+
}
331+
out = append(out, [2][]byte{sStart, sEnd})
332+
}
333+
return out
334+
}
335+
336+
// appendInterpolant returns start || byte(i*256/n). Used to interpolate
337+
// sub-shard boundaries inside a single-byte shard range.
338+
func appendInterpolant(start []byte, i, n int) []byte {
339+
b := byte((i * 256) / n)
340+
out := make([]byte, len(start)+1)
341+
copy(out, start)
342+
out[len(start)] = b
343+
return out
344+
}
345+
346+
// compactShardAdaptive runs a single shard, optionally subdividing based on
347+
// the prior-cycle write history persisted in the DB. Returns the cumulative
348+
// dWrite observed for this shard (which becomes the next cycle's history).
349+
//
350+
// The skip-empty probe runs at the *sub*-shard level too: a shard whose
351+
// data clusters in part of its range will only re-compact the populated
352+
// sub-ranges next cycle.
353+
func compactShardAdaptive(db dbm.DB, start, end []byte, label string, shardIdx int) (uint64, error) {
354+
prior := readHistoryDWrite(db, label, shardIdx)
355+
splitN := splitFactorFromHistory(prior)
356+
subs := byteRangeSplit(start, end, splitN)
357+
358+
var totalDW uint64
359+
for i, sub := range subs {
360+
var subLabel string
361+
if splitN == 1 {
362+
subLabel = fmt.Sprintf("%s shard %02x", label, shardIdx)
363+
} else {
364+
subLabel = fmt.Sprintf("%s shard %02x sub %d/%d", label, shardIdx, i+1, splitN)
365+
}
366+
hasAny, err := shardHasKeys(db, sub[0], sub[1])
367+
if err != nil {
368+
return totalDW, fmt.Errorf("%s probe failed: %w", subLabel, err)
369+
}
370+
if !hasAny {
371+
continue
372+
}
373+
dw, err := compactAndMeasure(db, sub[0], sub[1], subLabel)
374+
if err != nil {
375+
return totalDW, err
376+
}
377+
totalDW += dw
378+
}
379+
if totalDW > 0 {
380+
// Only persist when we actually wrote something. Writing zeros for
381+
// every empty shard would pollute the keyspace with metadata keys
382+
// inside the same byte-shard ranges we're sweeping (history keys
383+
// start with 'c' = 0x63, which would otherwise fall into shard 0x63
384+
// on the next sweep and force a spurious compaction).
385+
writeHistoryDWrite(db, label, shardIdx, totalDW)
386+
}
387+
return totalDW, nil
388+
}
389+
262390
// CompactAndLog compacts [start, limit) and logs the range, duration, and
263391
// the process-level read/write byte deltas observed during the call. The
264392
// byte deltas come from /proc/self/io and capture *all* I/O issued by the
265393
// process during the compact (including background goroutines), so they're
266394
// upper bounds — but they pin per-shard amplification cost in production.
267395
func CompactAndLog(db dbm.DB, start, limit []byte, label string) error {
396+
_, err := compactAndMeasure(db, start, limit, label)
397+
return err
398+
}
399+
400+
// compactAndMeasure is the shared implementation for both the dWrite-returning
401+
// adaptive path and the legacy CompactAndLog signature. Returns the observed
402+
// /proc/self/io write_bytes delta so callers can persist it for adaptive
403+
// sub-sharding decisions on the next cycle.
404+
var compactAndMeasure = realCompactAndMeasure
405+
406+
func realCompactAndMeasure(db dbm.DB, start, limit []byte, label string) (uint64, error) {
268407
time.Sleep(WaitTimeBetweenCompactions)
269408

270409
rng := fmt.Sprintf("[%s, %s)", prettyKey(start), prettyKey(limit))
@@ -277,11 +416,12 @@ func CompactAndLog(db dbm.DB, start, limit []byte, label string) error {
277416

278417
if err != nil {
279418
log.Printf("compaction %s range %s FAILED after %s: %v", label, rng, elapsed, err)
280-
return err
419+
return 0, err
281420
}
421+
dWrite := wb1 - wb0
282422
log.Printf("compaction %s range %s done in %s dRead=%dB dWrite=%dB",
283-
label, rng, elapsed, rb1-rb0, wb1-wb0)
284-
return nil
423+
label, rng, elapsed, rb1-rb0, dWrite)
424+
return dWrite, nil
285425
}
286426

287427
// procIOBytes reads /proc/self/io and returns (read_bytes, write_bytes).

internal/db/db_utils_test.go

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,16 @@ func swapCompactAndLog(f func(dbm.DB, []byte, []byte, string) error) (restore fu
2525
return func() { compactAndLog = prev }
2626
}
2727

28+
// swapCompactAndMeasure intercepts the byte-measuring compaction primitive
29+
// used by CompactSharded256 / CompactPrefixHex256 (via compactShardAdaptive).
30+
// Returning a nonzero dWrite makes the shard count as "compacted"; returning
31+
// 0 makes it count as "skipped_empty" from the caller's perspective.
32+
func swapCompactAndMeasure(f func(dbm.DB, []byte, []byte, string) (uint64, error)) (restore func()) {
33+
prev := compactAndMeasure
34+
compactAndMeasure = f
35+
return func() { compactAndMeasure = prev }
36+
}
37+
2838
func TestCompactIntSharded_DiscoveryStartsAtHugeFirstKey_NoGaps(t *testing.T) {
2939
var intervals [][2][]byte
3040
restore := swapCompactAndLog(func(db dbm.DB, start, end []byte, lbl string) error {
@@ -165,13 +175,13 @@ func TestCompactIntSharded_ResumeFromStoredMeta_NoGaps(t *testing.T) {
165175
}
166176

167177
// TestCompactSharded256_SkipsEmptyShards seeds keys only under the lowercase
168-
// 't' prefix and asserts that CompactSharded256 invokes compactAndLog only
178+
// 't' prefix and asserts that CompactSharded256 invokes the compactor only
169179
// for that single shard out of 256.
170180
func TestCompactSharded256_SkipsEmptyShards(t *testing.T) {
171181
var calls [][2][]byte
172-
restore := swapCompactAndLog(func(db dbm.DB, start, end []byte, lbl string) error {
182+
restore := swapCompactAndMeasure(func(db dbm.DB, start, end []byte, lbl string) (uint64, error) {
173183
calls = append(calls, [2][]byte{append([]byte(nil), start...), append([]byte(nil), end...)})
174-
return nil
184+
return 1, nil // nonzero so the caller counts it as "compacted"
175185
})
176186
defer restore()
177187

@@ -189,9 +199,9 @@ func TestCompactSharded256_SkipsEmptyShards(t *testing.T) {
189199
// DB no shard is compacted at all.
190200
func TestCompactSharded256_EmptyDB_NoShardsCompacted(t *testing.T) {
191201
var calls int
192-
restore := swapCompactAndLog(func(db dbm.DB, start, end []byte, lbl string) error {
202+
restore := swapCompactAndMeasure(func(db dbm.DB, start, end []byte, lbl string) (uint64, error) {
193203
calls++
194-
return nil
204+
return 1, nil
195205
})
196206
defer restore()
197207

@@ -204,9 +214,9 @@ func TestCompactSharded256_EmptyDB_NoShardsCompacted(t *testing.T) {
204214
// — both '1' first byte). One shard should fire.
205215
func TestCompactPrefixHex256_SkipsEmptyShards(t *testing.T) {
206216
var calls int
207-
restore := swapCompactAndLog(func(db dbm.DB, start, end []byte, lbl string) error {
217+
restore := swapCompactAndMeasure(func(db dbm.DB, start, end []byte, lbl string) (uint64, error) {
208218
calls++
209-
return nil
219+
return 1, nil
210220
})
211221
defer restore()
212222

@@ -218,3 +228,67 @@ func TestCompactPrefixHex256_SkipsEmptyShards(t *testing.T) {
218228

219229
require.Equal(t, 1, calls, "only BH:31-32 shard should fire")
220230
}
231+
232+
// TestCompactShardAdaptive_NoHistory_SinglePass: with no prior dWrite stored
233+
// the shard runs as one undivided pass.
234+
func TestCompactShardAdaptive_NoHistory_SinglePass(t *testing.T) {
235+
var calls int
236+
restore := swapCompactAndMeasure(func(db dbm.DB, start, end []byte, lbl string) (uint64, error) {
237+
calls++
238+
return 100, nil
239+
})
240+
defer restore()
241+
242+
memdb := dbm.NewMemDB()
243+
require.NoError(t, memdb.Set([]byte{0x42, 0x00}, []byte{1}))
244+
245+
dw, err := compactShardAdaptive(memdb, []byte{0x42}, []byte{0x43}, "test", 0x42)
246+
require.NoError(t, err)
247+
require.Equal(t, uint64(100), dw)
248+
require.Equal(t, 1, calls, "no history -> single pass")
249+
}
250+
251+
// TestCompactShardAdaptive_HotHistory_SubdividesProportionally: prior write
252+
// of 1 GiB triggers ceil(1GiB/500MiB)=3 sub-shards; ranges interpolate over
253+
// the parent shard.
254+
func TestCompactShardAdaptive_HotHistory_SubdividesProportionally(t *testing.T) {
255+
var subs [][2][]byte
256+
restore := swapCompactAndMeasure(func(db dbm.DB, start, end []byte, lbl string) (uint64, error) {
257+
subs = append(subs, [2][]byte{append([]byte(nil), start...), append([]byte(nil), end...)})
258+
return 100, nil
259+
})
260+
defer restore()
261+
262+
memdb := dbm.NewMemDB()
263+
// Seed keys spanning the entire [0x42, 0x43) range so all sub-shards are populated.
264+
for b := 0; b < 256; b += 16 {
265+
require.NoError(t, memdb.Set([]byte{0x42, byte(b)}, []byte{1}))
266+
}
267+
// Prime history: 1 GiB prior write -> ceil(1024MiB / 500MiB) = 3 sub-shards.
268+
writeHistoryDWrite(memdb, "test", 0x42, 1024*1024*1024)
269+
270+
_, err := compactShardAdaptive(memdb, []byte{0x42}, []byte{0x43}, "test", 0x42)
271+
require.NoError(t, err)
272+
require.Equal(t, 3, len(subs), "1GiB prior should split into 3 sub-shards")
273+
274+
// Verify ranges are contiguous and cover [0x42, 0x43).
275+
require.Equal(t, []byte{0x42}, subs[0][0])
276+
require.Equal(t, []byte{0x43}, subs[len(subs)-1][1])
277+
for i := 1; i < len(subs); i++ {
278+
require.Equal(t, subs[i-1][1], subs[i][0], "sub-shards must be contiguous")
279+
}
280+
}
281+
282+
// TestSplitFactorFromHistory verifies the threshold + cap behaviour.
283+
func TestSplitFactorFromHistory(t *testing.T) {
284+
const MiB = uint64(1024 * 1024)
285+
require.Equal(t, 1, splitFactorFromHistory(0))
286+
require.Equal(t, 1, splitFactorFromHistory(499*MiB))
287+
require.Equal(t, 1, splitFactorFromHistory(500*MiB-1))
288+
require.Equal(t, 1, splitFactorFromHistory(500*MiB)) // exactly threshold = single
289+
require.Equal(t, 2, splitFactorFromHistory(500*MiB+1)) // just above
290+
require.Equal(t, 3, splitFactorFromHistory(1024*MiB)) // 1 GiB -> ceil(1024/500) = 3
291+
require.Equal(t, 5, splitFactorFromHistory(2*1024*MiB)) // 2 GiB -> ceil(2048/500) = 5
292+
require.Equal(t, 8, splitFactorFromHistory(4*1024*MiB)) // 4 GiB -> 8 (just hits cap)
293+
require.Equal(t, 8, splitFactorFromHistory(99*1024*MiB)) // capped
294+
}

0 commit comments

Comments
 (0)