Skip to content

Commit e667191

Browse files
Merge pull request #4 from randomizedcoder/readStartShard
readStartShard for more even consumption
2 parents 4b112c7 + 8a56c02 commit e667191

File tree

14 files changed

+428
-113
lines changed

14 files changed

+428
-113
lines changed

Makefile

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,21 @@ test-datagen-short:
4242
go test -v -short -count=1 ./data-generator/
4343

4444
# Run integration tests (quick set - ~20s)
45+
# Note: ^TestIntegration$$ anchors to exact match, avoiding TestIntegrationSmoke etc.
4546
test-integration:
46-
go test -v -count=1 -run TestIntegration ./integration-tests/ -args -testset=quick
47+
go test -v -count=1 -run "^TestIntegration$$" ./integration-tests/ -args -testset=quick
4748

4849
# Run integration smoke test (~3s)
4950
test-integration-smoke:
50-
go test -v -count=1 -run TestIntegrationSmoke ./integration-tests/
51+
go test -v -count=1 -run "^TestIntegrationSmoke$$" ./integration-tests/
5152

5253
# Run integration tests (standard set - ~100s)
5354
test-integration-standard:
54-
go test -v -count=1 -run TestIntegration ./integration-tests/ -args -testset=standard
55+
go test -v -count=1 -run "^TestIntegration$$" ./integration-tests/ -args -testset=standard
5556

5657
# Run integration tests (full matrix - use with caution)
5758
test-integration-full:
58-
go test -v -timeout=60m -count=1 -run TestIntegration ./integration-tests/ -args -testset=full
59+
go test -v -timeout=60m -count=1 -run "^TestIntegration$$" ./integration-tests/ -args -testset=full
5960

6061
# Run integration config unit tests only
6162
test-integration-unit:
@@ -90,11 +91,11 @@ test-integration-profile-1gbps:
9091
# Quick 1 Gbps throughput demo (no profiling, just shows throughput)
9192
# Use this for quick validation of high-throughput performance
9293
test-integration-1gbps:
93-
go test -v -timeout=30m -count=1 -run "TestIntegration/T001" ./integration-tests/ -args -testset=gbps
94+
go test -v -timeout=30m -count=1 -run "^TestIntegration$$/T001" ./integration-tests/ -args -testset=gbps
9495

9596
# Generate integration test report from quick tests
9697
test-integration-report:
97-
go test -v -timeout=30m -count=1 -run TestIntegration ./integration-tests/ -args -testset=quick -report
98+
go test -v -timeout=30m -count=1 -run "^TestIntegration$$" ./integration-tests/ -args -testset=quick -report
9899

99100
# =============================================================================
100101
# Strategy Comparison Tests

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,22 @@ func (r *ShardedRing) ReadBatch(maxItems int) []any {
428428

429429
The ring library is designed to work with `sync.Pool` for efficient memory reuse:
430430

431+
> **⚠️ Important: Use Pointer Types for Zero-Allocation**
432+
>
433+
> Always store **pointer types** (e.g., `*Packet`) in the ring, not value types (e.g., `int`, `string`).
434+
> Storing value types in an `any` interface causes **boxing allocations** on every write.
435+
> With pointer types, the ring achieves true zero-allocation steady-state operation.
436+
>
437+
> ```go
438+
> // ❌ Bad: causes boxing allocation on every Write
439+
> ring.Write(id, 42)
440+
> ring.Write(id, "hello")
441+
>
442+
> // ✅ Good: no allocation (pointer already on heap)
443+
> ring.Write(id, &myPacket)
444+
> ring.Write(id, pooledBuffer)
445+
> ```
446+
431447
#### Producer Side
432448
433449
```go

boxing_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,4 @@ func BenchmarkWriterWithBoxing(b *testing.B) {
5555
}
5656
}
5757

58+

falsesharing_bench_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,4 @@ func BenchmarkShardPadding(b *testing.B) {
249249
})
250250
}
251251

252+
Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,29 @@
11
=== Test: T001 ===
2-
Config: 8p×125Mb/1450b/5ms
3-
Command: /home/das/Downloads/go-lock-free-ring/bin/ring [-producers=8 -rate=125.00 -packetSize=1450 -frequency=5 -duration=10s -btreeSize=4000 -debugLevel=3 -statsInterval=1 -ringShards=8 -profile=block -profilepath=/home/das/Downloads/go-lock-free-ring/integration-tests/output/profiles]
4-
Start: 2025-12-27T13:06:41-08:00
5-
End: 2025-12-27T13:06:51-08:00
6-
Duration: 10.006993714s
2+
Config: 1p×10Mb/1450b/10ms
3+
Command: /home/das/Downloads/go-lock-free-ring/bin/ring [-producers=1 -rate=10.00 -packetSize=1450 -frequency=10 -duration=5s -btreeSize=2000 -debugLevel=3 -statsInterval=1]
4+
Start: 2025-12-27T17:47:41-08:00
5+
End: 2025-12-27T17:47:46-08:00
6+
Duration: 5.006437014s
77
Exit Code: 0
88

99
=== STDOUT ===
1010

1111
=== STDERR ===
12-
2025/12/27 13:06:41 GOMAXPROCS: 24 (default)
13-
2025/12/27 13:06:41 Using retry strategy: SleepBackoff
14-
2025/12/27 13:06:41 profile: block profiling enabled, /home/das/Downloads/go-lock-free-ring/integration-tests/output/profiles/block.pprof
15-
2025/12/27 13:06:41 Ring size auto-calculated: 1024 (8 producers × 125.0 Mb/s, 5ms interval, 2.0x multiplier)
16-
2025/12/27 13:06:41 Starting: 8 producers @ 125.0 Mb/s, ring=1024/8 shards, strategy=SleepBackoff, btree max=4000, consumer every 5ms
17-
2025/12/27 13:06:42 [stats] inserted=86218 trimmed=82218 rate=999.69 Mb/s btree=4000 ring=16/1024 produced=86234 dropped=0
18-
2025/12/27 13:06:43 [stats] inserted=86215 trimmed=86215 rate=1000.05 Mb/s btree=4000 ring=13/1024 produced=172446 dropped=0
19-
2025/12/27 13:06:44 [stats] inserted=86230 trimmed=86230 rate=999.99 Mb/s btree=4000 ring=15/1024 produced=258678 dropped=0
20-
2025/12/27 13:06:45 [stats] inserted=86195 trimmed=86195 rate=999.98 Mb/s btree=4000 ring=16/1024 produced=344874 dropped=0
21-
2025/12/27 13:06:46 [stats] inserted=86175 trimmed=86175 rate=1000.06 Mb/s btree=4000 ring=13/1024 produced=431046 dropped=0
22-
2025/12/27 13:06:47 [stats] inserted=86237 trimmed=86237 rate=1000.01 Mb/s btree=4000 ring=12/1024 produced=517282 dropped=0
23-
2025/12/27 13:06:48 [stats] inserted=86210 trimmed=86210 rate=999.98 Mb/s btree=4000 ring=15/1024 produced=603495 dropped=0
24-
2025/12/27 13:06:49 [stats] inserted=86211 trimmed=86211 rate=1000.00 Mb/s btree=4000 ring=15/1024 produced=689706 dropped=0
25-
2025/12/27 13:06:50 [stats] inserted=86231 trimmed=86231 rate=1000.01 Mb/s btree=4000 ring=16/1024 produced=775938 dropped=0
26-
2025/12/27 13:06:51 Duration 10s reached, shutting down...
27-
2025/12/27 13:06:51 Shutdown signal received, waiting for goroutines...
28-
2025/12/27 13:06:51 [stats] inserted=86152 trimmed=86152 rate=1000.01 Mb/s btree=4000 ring=5/1024 produced=862079 dropped=0
29-
2025/12/27 13:06:51 Clean shutdown completed in 80.05µs
30-
2025/12/27 13:06:51 === Final Statistics ===
31-
2025/12/27 13:06:51 Produced: 862079
32-
2025/12/27 13:06:51 Dropped: 0 (0.00%)
33-
2025/12/27 13:06:51 Consumed: 862079
34-
2025/12/27 13:06:51 Trimmed: 858074
35-
2025/12/27 13:06:51 BTree final size: 4005
36-
2025/12/27 13:06:51 profile: block profiling disabled, /home/das/Downloads/go-lock-free-ring/integration-tests/output/profiles/block.pprof
12+
2025/12/27 17:47:41 GOMAXPROCS: 24 (default)
13+
2025/12/27 17:47:41 Using retry strategy: SleepBackoff
14+
2025/12/27 17:47:41 Ring size auto-calculated: 64 (1 producers × 10.0 Mb/s, 10ms interval, 2.0x multiplier)
15+
2025/12/27 17:47:41 Starting: 1 producers @ 10.0 Mb/s, ring=64/4 shards, strategy=SleepBackoff, btree max=2000, consumer every 10ms
16+
2025/12/27 17:47:42 [stats] inserted=863 trimmed=0 rate=10.01 Mb/s btree=863 ring=0/64 produced=863 dropped=0
17+
2025/12/27 17:47:43 [stats] inserted=862 trimmed=0 rate=10.00 Mb/s btree=1725 ring=0/64 produced=1725 dropped=0
18+
2025/12/27 17:47:44 [stats] inserted=862 trimmed=587 rate=10.00 Mb/s btree=2000 ring=0/64 produced=2587 dropped=0
19+
2025/12/27 17:47:45 [stats] inserted=862 trimmed=862 rate=10.00 Mb/s btree=2000 ring=0/64 produced=3449 dropped=0
20+
2025/12/27 17:47:46 [stats] inserted=863 trimmed=863 rate=10.01 Mb/s btree=2000 ring=0/64 produced=4312 dropped=0
21+
2025/12/27 17:47:46 Duration 5s reached, shutting down...
22+
2025/12/27 17:47:46 Shutdown signal received, waiting for goroutines...
23+
2025/12/27 17:47:46 Clean shutdown completed in 905.804µs
24+
2025/12/27 17:47:46 === Final Statistics ===
25+
2025/12/27 17:47:46 Produced: 4312
26+
2025/12/27 17:47:46 Dropped: 0 (0.00%)
27+
2025/12/27 17:47:46 Consumed: 4312
28+
2025/12/27 17:47:46 Trimmed: 2312
29+
2025/12/27 17:47:46 BTree final size: 2000
Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
11
=== Test: T002 ===
22
Config: 4p×10Mb/1450b/10ms
33
Command: /home/das/Downloads/go-lock-free-ring/bin/ring [-producers=4 -rate=10.00 -packetSize=1450 -frequency=10 -duration=5s -btreeSize=2000 -debugLevel=3 -statsInterval=1]
4-
Start: 2025-12-27T13:04:17-08:00
5-
End: 2025-12-27T13:04:22-08:00
6-
Duration: 5.005786817s
4+
Start: 2025-12-27T17:47:46-08:00
5+
End: 2025-12-27T17:47:51-08:00
6+
Duration: 5.006317019s
77
Exit Code: 0
88

99
=== STDOUT ===
1010

1111
=== STDERR ===
12-
2025/12/27 13:04:17 GOMAXPROCS: 24 (default)
13-
2025/12/27 13:04:17 Using retry strategy: SleepBackoff
14-
2025/12/27 13:04:17 Ring size auto-calculated: 128 (4 producers × 10.0 Mb/s, 10ms interval, 2.0x multiplier)
15-
2025/12/27 13:04:17 Starting: 4 producers @ 10.0 Mb/s, ring=128/4 shards, strategy=SleepBackoff, btree max=2000, consumer every 10ms
16-
2025/12/27 13:04:18 [stats] inserted=3454 trimmed=1454 rate=40.03 Mb/s btree=2000 ring=0/128 produced=3454 dropped=0
17-
2025/12/27 13:04:19 [stats] inserted=3446 trimmed=3446 rate=40.01 Mb/s btree=2000 ring=0/128 produced=6900 dropped=0
18-
2025/12/27 13:04:20 [stats] inserted=3448 trimmed=3448 rate=39.98 Mb/s btree=2000 ring=0/128 produced=10348 dropped=0
19-
2025/12/27 13:04:21 [stats] inserted=3448 trimmed=3448 rate=39.99 Mb/s btree=2000 ring=0/128 produced=13796 dropped=0
20-
2025/12/27 13:04:22 Duration 5s reached, shutting down...
21-
2025/12/27 13:04:22 [stats] inserted=3448 trimmed=3448 rate=40.02 Mb/s btree=2000 ring=0/128 produced=17244 dropped=0
22-
2025/12/27 13:04:22 Shutdown signal received, waiting for goroutines...
23-
2025/12/27 13:04:22 Clean shutdown completed in 645.1µs
24-
2025/12/27 13:04:22 === Final Statistics ===
25-
2025/12/27 13:04:22 Produced: 17244
26-
2025/12/27 13:04:22 Dropped: 0 (0.00%)
27-
2025/12/27 13:04:22 Consumed: 17244
28-
2025/12/27 13:04:22 Trimmed: 15244
29-
2025/12/27 13:04:22 BTree final size: 2000
12+
2025/12/27 17:47:46 GOMAXPROCS: 24 (default)
13+
2025/12/27 17:47:46 Using retry strategy: SleepBackoff
14+
2025/12/27 17:47:46 Ring size auto-calculated: 128 (4 producers × 10.0 Mb/s, 10ms interval, 2.0x multiplier)
15+
2025/12/27 17:47:46 Starting: 4 producers @ 10.0 Mb/s, ring=128/4 shards, strategy=SleepBackoff, btree max=2000, consumer every 10ms
16+
2025/12/27 17:47:47 [stats] inserted=3452 trimmed=1452 rate=40.01 Mb/s btree=2000 ring=0/128 produced=3452 dropped=0
17+
2025/12/27 17:47:48 [stats] inserted=3448 trimmed=3448 rate=40.02 Mb/s btree=2000 ring=0/128 produced=6900 dropped=0
18+
2025/12/27 17:47:49 [stats] inserted=3452 trimmed=3452 rate=40.02 Mb/s btree=2000 ring=0/128 produced=10352 dropped=0
19+
2025/12/27 17:47:50 [stats] inserted=3448 trimmed=3448 rate=40.00 Mb/s btree=2000 ring=0/128 produced=13800 dropped=0
20+
2025/12/27 17:47:51 Duration 5s reached, shutting down...
21+
2025/12/27 17:47:51 [stats] inserted=3447 trimmed=3447 rate=39.99 Mb/s btree=2000 ring=0/128 produced=17247 dropped=0
22+
2025/12/27 17:47:51 Shutdown signal received, waiting for goroutines...
23+
2025/12/27 17:47:51 Clean shutdown completed in 1.067408ms
24+
2025/12/27 17:47:51 === Final Statistics ===
25+
2025/12/27 17:47:51 Produced: 17248
26+
2025/12/27 17:47:51 Dropped: 0 (0.00%)
27+
2025/12/27 17:47:51 Consumed: 17248
28+
2025/12/27 17:47:51 Trimmed: 15247
29+
2025/12/27 17:47:51 BTree final size: 2001
Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
11
=== Test: T003 ===
22
Config: 4p×50Mb/1450b/10ms
33
Command: /home/das/Downloads/go-lock-free-ring/bin/ring [-producers=4 -rate=50.00 -packetSize=1450 -frequency=10 -duration=5s -btreeSize=2000 -debugLevel=3 -statsInterval=1]
4-
Start: 2025-12-27T13:04:22-08:00
5-
End: 2025-12-27T13:04:27-08:00
6-
Duration: 5.005086656s
4+
Start: 2025-12-27T17:47:51-08:00
5+
End: 2025-12-27T17:47:56-08:00
6+
Duration: 5.005870118s
77
Exit Code: 0
88

99
=== STDOUT ===
1010

1111
=== STDERR ===
12-
2025/12/27 13:04:22 GOMAXPROCS: 24 (default)
13-
2025/12/27 13:04:22 Using retry strategy: SleepBackoff
14-
2025/12/27 13:04:22 Ring size auto-calculated: 512 (4 producers × 50.0 Mb/s, 10ms interval, 2.0x multiplier)
15-
2025/12/27 13:04:22 Starting: 4 producers @ 50.0 Mb/s, ring=512/4 shards, strategy=SleepBackoff, btree max=2000, consumer every 10ms
16-
2025/12/27 13:04:23 [stats] inserted=17247 trimmed=15247 rate=200.02 Mb/s btree=2000 ring=0/512 produced=17247 dropped=0
17-
2025/12/27 13:04:24 [stats] inserted=17250 trimmed=17250 rate=199.99 Mb/s btree=2000 ring=1/512 produced=34498 dropped=0
18-
2025/12/27 13:04:25 [stats] inserted=17234 trimmed=17234 rate=199.99 Mb/s btree=2000 ring=1/512 produced=51732 dropped=0
19-
2025/12/27 13:04:26 [stats] inserted=17236 trimmed=17236 rate=199.99 Mb/s btree=2000 ring=2/512 produced=68969 dropped=0
20-
2025/12/27 13:04:27 Duration 5s reached, shutting down...
21-
2025/12/27 13:04:27 Shutdown signal received, waiting for goroutines...
22-
2025/12/27 13:04:27 [stats] inserted=17248 trimmed=17248 rate=200.02 Mb/s btree=2000 ring=0/512 produced=86215 dropped=0
23-
2025/12/27 13:04:27 Clean shutdown completed in 155.712µs
24-
2025/12/27 13:04:27 === Final Statistics ===
25-
2025/12/27 13:04:27 Produced: 86215
26-
2025/12/27 13:04:27 Dropped: 0 (0.00%)
27-
2025/12/27 13:04:27 Consumed: 86215
28-
2025/12/27 13:04:27 Trimmed: 84215
29-
2025/12/27 13:04:27 BTree final size: 2000
12+
2025/12/27 17:47:51 GOMAXPROCS: 24 (default)
13+
2025/12/27 17:47:51 Using retry strategy: SleepBackoff
14+
2025/12/27 17:47:51 Ring size auto-calculated: 512 (4 producers × 50.0 Mb/s, 10ms interval, 2.0x multiplier)
15+
2025/12/27 17:47:51 Starting: 4 producers @ 50.0 Mb/s, ring=512/4 shards, strategy=SleepBackoff, btree max=2000, consumer every 10ms
16+
2025/12/27 17:47:52 [stats] inserted=17260 trimmed=15260 rate=200.01 Mb/s btree=2000 ring=3/512 produced=17263 dropped=0
17+
2025/12/27 17:47:53 [stats] inserted=17232 trimmed=17232 rate=200.02 Mb/s btree=2000 ring=0/512 produced=34492 dropped=0
18+
2025/12/27 17:47:54 [stats] inserted=17252 trimmed=17252 rate=200.00 Mb/s btree=2000 ring=0/512 produced=51744 dropped=0
19+
2025/12/27 17:47:55 [stats] inserted=17244 trimmed=17244 rate=200.01 Mb/s btree=2000 ring=0/512 produced=68988 dropped=0
20+
2025/12/27 17:47:56 Duration 5s reached, shutting down...
21+
2025/12/27 17:47:56 [stats] inserted=17235 trimmed=17235 rate=200.00 Mb/s btree=2000 ring=0/512 produced=86223 dropped=0
22+
2025/12/27 17:47:56 Shutdown signal received, waiting for goroutines...
23+
2025/12/27 17:47:56 Clean shutdown completed in 174.438µs
24+
2025/12/27 17:47:56 === Final Statistics ===
25+
2025/12/27 17:47:56 Produced: 86224
26+
2025/12/27 17:47:56 Dropped: 0 (0.00%)
27+
2025/12/27 17:47:56 Consumed: 86224
28+
2025/12/27 17:47:56 Trimmed: 84223
29+
2025/12/27 17:47:56 BTree final size: 2001
Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,28 @@
11
=== Test: T004 ===
22
Config: 8p×50Mb/1450b/10ms
3-
Command: /home/das/Downloads/go-lock-free-ring/bin/ring [-producers=8 -rate=50.00 -packetSize=1450 -frequency=10 -duration=10s -btreeSize=2000 -debugLevel=3 -statsInterval=1 -profile=block -profilepath=/home/das/Downloads/go-lock-free-ring/integration-tests/output/profiles]
4-
Start: 2025-12-27T13:05:22-08:00
5-
End: 2025-12-27T13:05:32-08:00
6-
Duration: 10.007199441s
3+
Command: /home/das/Downloads/go-lock-free-ring/bin/ring [-producers=8 -rate=50.00 -packetSize=1450 -frequency=10 -duration=5s -btreeSize=2000 -debugLevel=3 -statsInterval=1]
4+
Start: 2025-12-27T17:47:56-08:00
5+
End: 2025-12-27T17:48:01-08:00
6+
Duration: 5.005612546s
77
Exit Code: 0
88

99
=== STDOUT ===
1010

1111
=== STDERR ===
12-
2025/12/27 13:05:22 GOMAXPROCS: 24 (default)
13-
2025/12/27 13:05:22 Using retry strategy: SleepBackoff
14-
2025/12/27 13:05:22 profile: block profiling enabled, /home/das/Downloads/go-lock-free-ring/integration-tests/output/profiles/block.pprof
15-
2025/12/27 13:05:22 Ring size auto-calculated: 1024 (8 producers × 50.0 Mb/s, 10ms interval, 2.0x multiplier)
16-
2025/12/27 13:05:22 Starting: 8 producers @ 50.0 Mb/s, ring=1024/4 shards, strategy=SleepBackoff, btree max=2000, consumer every 10ms
17-
2025/12/27 13:05:23 [stats] inserted=34519 trimmed=32519 rate=399.94 Mb/s btree=2000 ring=5/1024 produced=34524 dropped=0
18-
2025/12/27 13:05:24 [stats] inserted=34447 trimmed=34447 rate=400.02 Mb/s btree=2000 ring=2/1024 produced=68968 dropped=0
19-
2025/12/27 13:05:25 [stats] inserted=34492 trimmed=34492 rate=399.99 Mb/s btree=2000 ring=4/1024 produced=103462 dropped=0
20-
2025/12/27 13:05:26 [stats] inserted=34490 trimmed=34490 rate=400.01 Mb/s btree=2000 ring=3/1024 produced=137951 dropped=0
21-
2025/12/27 13:05:27 [stats] inserted=34491 trimmed=34491 rate=399.97 Mb/s btree=2000 ring=5/1024 produced=172444 dropped=0
22-
2025/12/27 13:05:28 [stats] inserted=34480 trimmed=34480 rate=400.00 Mb/s btree=2000 ring=5/1024 produced=206924 dropped=0
23-
2025/12/27 13:05:29 [stats] inserted=34489 trimmed=34489 rate=400.00 Mb/s btree=2000 ring=6/1024 produced=241414 dropped=0
24-
2025/12/27 13:05:30 [stats] inserted=34476 trimmed=34476 rate=400.02 Mb/s btree=2000 ring=3/1024 produced=275887 dropped=0
25-
2025/12/27 13:05:31 [stats] inserted=34482 trimmed=34482 rate=399.98 Mb/s btree=2000 ring=6/1024 produced=310372 dropped=0
26-
2025/12/27 13:05:32 Duration 10s reached, shutting down...
27-
2025/12/27 13:05:32 Shutdown signal received, waiting for goroutines...
28-
2025/12/27 13:05:32 [stats] inserted=34489 trimmed=34489 rate=400.00 Mb/s btree=2000 ring=3/1024 produced=344858 dropped=0
29-
2025/12/27 13:05:32 Clean shutdown completed in 183.064µs
30-
2025/12/27 13:05:32 === Final Statistics ===
31-
2025/12/27 13:05:32 Produced: 344858
32-
2025/12/27 13:05:32 Dropped: 0 (0.00%)
33-
2025/12/27 13:05:32 Consumed: 344858
34-
2025/12/27 13:05:32 Trimmed: 342855
35-
2025/12/27 13:05:32 BTree final size: 2003
36-
2025/12/27 13:05:32 profile: block profiling disabled, /home/das/Downloads/go-lock-free-ring/integration-tests/output/profiles/block.pprof
12+
2025/12/27 17:47:56 GOMAXPROCS: 24 (default)
13+
2025/12/27 17:47:56 Using retry strategy: SleepBackoff
14+
2025/12/27 17:47:56 Ring size auto-calculated: 1024 (8 producers × 50.0 Mb/s, 10ms interval, 2.0x multiplier)
15+
2025/12/27 17:47:56 Starting: 8 producers @ 50.0 Mb/s, ring=1024/4 shards, strategy=SleepBackoff, btree max=2000, consumer every 10ms
16+
2025/12/27 17:47:57 [stats] inserted=34494 trimmed=32494 rate=400.03 Mb/s btree=2000 ring=2/1024 produced=34496 dropped=0
17+
2025/12/27 17:47:58 [stats] inserted=34512 trimmed=34512 rate=400.00 Mb/s btree=2000 ring=2/1024 produced=69008 dropped=0
18+
2025/12/27 17:47:59 [stats] inserted=34479 trimmed=34479 rate=399.99 Mb/s btree=2000 ring=3/1024 produced=103488 dropped=0
19+
2025/12/27 17:48:00 [stats] inserted=34457 trimmed=34457 rate=400.02 Mb/s btree=2000 ring=2/1024 produced=137944 dropped=0
20+
2025/12/27 17:48:01 Duration 5s reached, shutting down...
21+
2025/12/27 17:48:01 Shutdown signal received, waiting for goroutines...
22+
2025/12/27 17:48:01 Clean shutdown completed in 98.836µs
23+
2025/12/27 17:48:01 === Final Statistics ===
24+
2025/12/27 17:48:01 Produced: 172440
25+
2025/12/27 17:48:01 Dropped: 0 (0.00%)
26+
2025/12/27 17:48:01 Consumed: 172440
27+
2025/12/27 17:48:01 Trimmed: 170440
28+
2025/12/27 17:48:01 BTree final size: 2000

ring.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ type Shard struct {
3636

3737
// ShardedRing is a sharded lock-free MPSC ring buffer
3838
type ShardedRing struct {
39-
shards []*Shard
40-
numShards uint64
41-
mask uint64
39+
shards []*Shard
40+
numShards uint64
41+
mask uint64
42+
readStartShard uint64 // Rotating start position for fair shard reading (single consumer, no atomic needed)
4243
}
4344

4445
// NewShardedRing creates a new sharded ring buffer
@@ -123,9 +124,13 @@ func (s *Shard) write(value any) bool {
123124

124125
// TryRead attempts to read one item from any shard
125126
// Returns the value and true if an item was read, nil and false if all shards are empty
127+
// Uses a rotating start position to ensure fair distribution across shards
126128
func (r *ShardedRing) TryRead() (any, bool) {
129+
start := r.readStartShard
130+
r.readStartShard++
127131
for i := uint64(0); i < r.numShards; i++ {
128-
if val, ok := r.shards[i].tryRead(); ok {
132+
idx := (start + i) & r.mask
133+
if val, ok := r.shards[idx].tryRead(); ok {
129134
return val, true
130135
}
131136
}
@@ -170,6 +175,7 @@ func (r *ShardedRing) ReadBatch(maxItems int) []any {
170175
// ReadBatchInto reads up to maxItems into the provided slice (for zero-alloc operation)
171176
// The slice is reset to length 0, then items are appended up to maxItems
172177
// Returns the slice with items read (may be empty if ring is empty)
178+
// Uses a rotating start position to ensure fair distribution across shards
173179
// Usage with sync.Pool:
174180
//
175181
// buf := pool.Get().([]any)[:0]
@@ -178,10 +184,13 @@ func (r *ShardedRing) ReadBatch(maxItems int) []any {
178184
// pool.Put(buf)
179185
func (r *ShardedRing) ReadBatchInto(buf []any, maxItems int) []any {
180186
result := buf[:0]
187+
start := r.readStartShard
188+
r.readStartShard++
181189

182-
// Round-robin through all shards
190+
// Round-robin through all shards starting from rotating position
183191
for i := uint64(0); i < r.numShards && len(result) < maxItems; i++ {
184-
shard := r.shards[i]
192+
idx := (start + i) & r.mask
193+
shard := r.shards[idx]
185194
for len(result) < maxItems {
186195
if val, ok := shard.tryRead(); ok {
187196
result = append(result, val)

0 commit comments

Comments
 (0)