Skip to content

Commit 706e191

Browse files
committed
feat(fibre/validator): add Select for stake-weighted validator selection
Add Select function that returns validators shuffled by stake for load balancing during downloads. Higher-stake validators are more likely to appear earlier, distributing load proportionally.
1 parent bc30a08 commit 706e191

File tree

3 files changed

+358
-233
lines changed

3 files changed

+358
-233
lines changed

fibre/validator/set.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,78 @@ func (s Set) Assign(commitment rsema1d.Commitment, totalRows, originalRows, minR
9696
return shardMap
9797
}
9898

99+
// Select returns validators to download shards from, shuffled by stake for load balancing.
100+
// Returns all validators and the minimum count needed for reconstruction (covering livenessThreshold stake).
101+
func (s Set) Select(originalRows, minRows int, livenessThreshold cmtmath.Fraction) (validators []*core.Validator, minRequired int) {
102+
if len(s.Validators) == 0 {
103+
return nil, 0
104+
}
105+
106+
validators = make([]*core.Validator, len(s.Validators))
107+
copy(validators, s.Validators)
108+
109+
// minStakeFraction is the minimum contribution per validator for unique decodability
110+
// e.g., 148 / (4096 * 3) ≈ 1.2% for livenessThreshold=1/3
111+
totalDistributedRows := originalRows * int(livenessThreshold.Denominator) / int(livenessThreshold.Numerator)
112+
minStakeFraction := float64(minRows) / float64(totalDistributedRows)
113+
114+
// find last non-overlapping validator using effective stake (actual stake floored by minStakeFraction)
115+
totalStake := float64(s.TotalVotingPower())
116+
accumulated := 0.0
117+
splitIdx := len(validators)
118+
for i, v := range validators {
119+
accumulated += max(float64(v.VotingPower)/totalStake, minStakeFraction)
120+
if accumulated >= 1.0 {
121+
splitIdx = i + 1
122+
break
123+
}
124+
}
125+
126+
// shuffle each group by stake for load balancing
127+
// NOTE: doesn't require cryptographic randomness
128+
rng := rand.New(rand.NewPCG(rand.Uint64(), rand.Uint64()))
129+
shuffleByStake(validators[:splitIdx], rng)
130+
shuffleByStake(validators[splitIdx:], rng)
131+
132+
// count validators needed to cover livenessThreshold stake
133+
livenessStake := s.TotalVotingPower() * int64(livenessThreshold.Numerator) / int64(livenessThreshold.Denominator)
134+
coveredStake := int64(0)
135+
for _, v := range validators[:splitIdx] {
136+
minRequired++
137+
coveredStake += v.VotingPower
138+
if coveredStake >= livenessStake {
139+
break
140+
}
141+
}
142+
143+
return validators, minRequired
144+
}
145+
146+
// shuffleByStake shuffles validators in-place using stake-weighted random selection.
147+
// Validators with higher voting power are more likely to appear earlier.
148+
func shuffleByStake(validators []*core.Validator, rng *rand.Rand) {
149+
for i := range len(validators) - 1 {
150+
// calculate total weight of remaining validators
151+
var totalWeight int64
152+
for j := i; j < len(validators); j++ {
153+
totalWeight += validators[j].VotingPower
154+
}
155+
156+
// pick random point in weight space
157+
point := rng.Int64N(totalWeight)
158+
159+
// find and swap the selected validator
160+
var cumul int64
161+
for j := i; j < len(validators); j++ {
162+
cumul += validators[j].VotingPower
163+
if point < cumul {
164+
validators[i], validators[j] = validators[j], validators[i]
165+
break
166+
}
167+
}
168+
}
169+
}
170+
99171
// Verify checks if all given row indices are assigned to [core.Validator].
100172
// Returns error if validator is not in the map, count doesn't match, or any row is not assigned.
101173
// This method builds a temporary set for O(r + n) complexity instead of O(n × r).

fibre/validator/set_bench_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package validator_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/celestiaorg/celestia-app/v6/fibre/validator"
8+
"github.com/cometbft/cometbft/crypto/ed25519"
9+
core "github.com/cometbft/cometbft/types"
10+
)
11+
12+
// BenchmarkSet_Assign measures row assignment performance.
13+
// Dominated by Fisher-Yates shuffle of 16384 rows, constant across validator counts.
14+
//
15+
// Results (AMD Ryzen 9 7940HS):
16+
//
17+
// 10_validators 135 µs/op 235 kB/op 18 allocs/op
18+
// 50_validators 136 µs/op 239 kB/op 62 allocs/op
19+
// 100_validators 135 µs/op 244 kB/op 114 allocs/op
20+
func BenchmarkSet_Assign(b *testing.B) {
21+
for _, n := range []int{10, 50, 100} {
22+
b.Run(fmt.Sprintf("%d_validators", n), func(b *testing.B) {
23+
valSet := makeBenchValidatorSet(n)
24+
for b.Loop() {
25+
_ = valSet.Assign(testCommitment, 16384, 4096, 16, testLivenessThreshold)
26+
}
27+
})
28+
}
29+
}
30+
31+
// BenchmarkSet_Select measures validator selection performance.
32+
// Scales O(n²) due to stake-weighted shuffle but remains negligible for typical validator counts.
33+
//
34+
// Results (AMD Ryzen 9 7940HS):
35+
//
36+
// 10_validators 196 ns/op 96 B/op 2 allocs/op
37+
// 50_validators 1.5 µs/op 432 B/op 2 allocs/op
38+
// 100_validators 3.7 µs/op 912 B/op 2 allocs/op
39+
func BenchmarkSet_Select(b *testing.B) {
40+
for _, n := range []int{10, 50, 100} {
41+
b.Run(fmt.Sprintf("%d_validators", n), func(b *testing.B) {
42+
valSet := makeBenchValidatorSet(n)
43+
for b.Loop() {
44+
_, _ = valSet.Select(testOriginalRows, testMinRows, testLivenessThreshold)
45+
}
46+
})
47+
}
48+
}
49+
50+
func makeBenchValidatorSet(n int) validator.Set {
51+
validators := make([]*core.Validator, n)
52+
for i := range n {
53+
validators[i] = core.NewValidator(ed25519.GenPrivKey().PubKey(), 1)
54+
}
55+
return validator.Set{
56+
ValidatorSet: core.NewValidatorSet(validators),
57+
Height: 100,
58+
}
59+
}

0 commit comments

Comments
 (0)