Skip to content

Commit e7e8d4e

Browse files
committed
feat(fibre/validator): stake-based assign
1 parent 3ec7eee commit e7e8d4e

File tree

9 files changed

+216
-47
lines changed

9 files changed

+216
-47
lines changed

fibre/blob.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ func NewBlobConfigFromParams(blobVersion uint8, p ProtocolParams) BlobConfig {
9191
}
9292
}
9393

94+
// TotalRows returns the total number of rows (OriginalRows + ParityRows).
95+
func (c BlobConfig) TotalRows() int {
96+
return c.OriginalRows + c.ParityRows
97+
}
98+
9499
// UploadSize calculates size of blob data with padding and w/o parity.
95100
// This is the size included in the [PaymentPromise] and the one actually paid for.
96101
func (c BlobConfig) UploadSize(dataLen int) int {

fibre/client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@ type ClientConfig struct {
3535
// ChainID is the chain identifier for domain separation in [PaymentPromise] signatures.
3636
ChainID string
3737

38-
// RowsPerShard computes the number of rows per shard given the total number of shards.
39-
RowsPerShard func(totalShards int) int
38+
// MinRowsPerValidator computes the minimum number of rows each validator must receive
39+
// given the total number of validators, for unique decodability security.
40+
MinRowsPerValidator func(totalValidators int) int
41+
// LivenessThreshold is the fraction of stake needed to cause a liveness failure (typically 1/3).
42+
LivenessThreshold cmtmath.Fraction
4043
// MaxMessageSize is the maximum gRPC message size for upload requests.
4144
MaxMessageSize int
4245

@@ -76,7 +79,8 @@ func NewClientConfigFromParams(p ProtocolParams) ClientConfig {
7679
return ClientConfig{
7780
DefaultKeyName: DefaultKeyName,
7881
ChainID: "celestia",
79-
RowsPerShard: p.RowsPerShard,
82+
MinRowsPerValidator: p.MinRowsPerValidator,
83+
LivenessThreshold: p.LivenessThreshold,
8084
MaxMessageSize: p.MaxMessageSize(p.MaxValidatorCount),
8185
UploadTargetVotingPower: p.SafetyThreshold,
8286
UploadTargetSignaturesCount: p.SafetyThreshold,

fibre/client_upload.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (c *Client) Upload(ctx context.Context, ns share.Namespace, blob *Blob) (re
6767
))
6868

6969
// 2) assign shards to validators
70-
shardMap := valSet.Assign(rsema1d.Commitment(blob.Commitment()), c.cfg.RowsPerShard(len(valSet.Validators)))
70+
shardMap := valSet.Assign(rsema1d.Commitment(blob.Commitment()), blob.Config().TotalRows(), blob.Config().OriginalRows, c.cfg.MinRowsPerValidator(len(valSet.Validators)), c.cfg.LivenessThreshold)
7171
span.AddEvent("shards_assigned")
7272

7373
validatorSignBytes, err := promise.SignBytesValidator()

fibre/protocol_params.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ func (p ProtocolParams) RowsPerShard(totalShards int) int {
113113
return max(uniqueDecodeSamples, reconstructionSamples)
114114
}
115115

116+
// MinRowsPerValidator returns the minimum number of rows each validator must receive
117+
// for unique decodability security, regardless of their stake percentage.
118+
// This is equivalent to RowsPerShard and serves as the floor in stake-aware assignment.
119+
func (p ProtocolParams) MinRowsPerValidator(totalValidators int) int {
120+
return p.RowsPerShard(totalValidators)
121+
}
122+
116123
// ShardsForReconstruction returns the minimum number of shards
117124
// needed to successfully reconstruct the original data.
118125
// Returns at least 1.

fibre/server.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/celestiaorg/celestia-app/v6/fibre/validator"
1010
"github.com/celestiaorg/celestia-app/v6/x/fibre/types"
1111
"github.com/cometbft/cometbft/crypto"
12+
cmtmath "github.com/cometbft/cometbft/libs/math"
1213
coregrpc "github.com/cometbft/cometbft/rpc/grpc"
1314
core "github.com/cometbft/cometbft/types"
1415
"github.com/cosmos/gogoproto/grpc"
@@ -24,8 +25,11 @@ type ServerConfig struct {
2425
BlobConfig
2526
StoreConfig
2627

27-
// RowsPerShard computes the number of rows per shard given the total number of shards.
28-
RowsPerShard func(totalShards int) int
28+
// MinRowsPerValidator computes the minimum number of rows each validator must receive
29+
// given the total number of validators, for unique decodability security.
30+
MinRowsPerValidator func(totalValidators int) int
31+
// LivenessThreshold is the fraction of stake needed for reconstruction (typically 1/3).
32+
LivenessThreshold cmtmath.Fraction
2933
// MaxMessageSize is the maximum gRPC message size for upload requests.
3034
MaxMessageSize int
3135

@@ -46,11 +50,12 @@ func DefaultServerConfig() ServerConfig {
4650
// Use this when you need a config with non-default protocol parameters (e.g., for testing).
4751
func NewServerConfigFromParams(p ProtocolParams) ServerConfig {
4852
return ServerConfig{
49-
ChainID: "celestia",
50-
BlobConfig: DefaultBlobConfigV0(), // currently hardcode support for version zero only
51-
StoreConfig: DefaultStoreConfig(),
52-
RowsPerShard: p.RowsPerShard,
53-
MaxMessageSize: p.MaxMessageSize(p.MaxValidatorCount),
53+
ChainID: "celestia",
54+
BlobConfig: DefaultBlobConfigV0(), // currently hardcode support for version zero only
55+
StoreConfig: DefaultStoreConfig(),
56+
MinRowsPerValidator: p.MinRowsPerValidator,
57+
LivenessThreshold: p.LivenessThreshold,
58+
MaxMessageSize: p.MaxMessageSize(p.MaxValidatorCount),
5459
}
5560
}
5661

fibre/server_upload.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (s *Server) verifyAssignment(ctx context.Context, promise *PaymentPromise,
157157
for i, row := range shard.GetRows() {
158158
rowIndices[i] = row.Index
159159
}
160-
shardMap := valSet.Assign(rsema1d.Commitment(promise.Commitment), s.cfg.RowsPerShard(len(valSet.Validators)))
160+
shardMap := valSet.Assign(rsema1d.Commitment(promise.Commitment), s.cfg.TotalRows(), s.cfg.OriginalRows, s.cfg.MinRowsPerValidator(len(valSet.Validators)), s.cfg.LivenessThreshold)
161161
if err := shardMap.Verify(ourValidator, rowIndices); err != nil {
162162
return err
163163
}

fibre/server_upload_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func TestServerUploadShard(t *testing.T) {
8484
// get commitment from the request (it's already a byte slice)
8585
var commitment rsema1d.Commitment
8686
copy(commitment[:], req.Promise.Commitment)
87-
shardMap := valSet.Assign(commitment, cfg.RowsPerShard(len(valSet.Validators)))
87+
shardMap := valSet.Assign(commitment, cfg.TotalRows(), cfg.OriginalRows, cfg.MinRowsPerValidator(len(valSet.Validators)), cfg.LivenessThreshold)
8888
for val, indices := range shardMap {
8989
if val.Address.String() != serverValidator.Address.String() && len(indices) > 0 {
9090
req.Shard.Rows[0].Index = uint32(indices[0])
@@ -227,7 +227,8 @@ func makeTestRequest(
227227
signPromise(promisePb)
228228

229229
// get row assignment for server validator
230-
shardMap := valSet.Assign(rsema1d.Commitment(blob.Commitment()), fibre.DefaultProtocolParams.RowsPerShard(len(valSet.Validators)))
230+
cfg := fibre.DefaultServerConfig()
231+
shardMap := valSet.Assign(rsema1d.Commitment(blob.Commitment()), cfg.TotalRows(), cfg.OriginalRows, cfg.MinRowsPerValidator(len(valSet.Validators)), cfg.LivenessThreshold)
231232
rowIndices := shardMap[serverValidator]
232233
require.NotEmpty(t, rowIndices, "server validator has no rows assigned")
233234

fibre/validator/set.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/celestiaorg/rsema1d"
88
"github.com/cometbft/cometbft/crypto"
9+
cmtmath "github.com/cometbft/cometbft/libs/math"
910
core "github.com/cometbft/cometbft/types"
1011
)
1112

@@ -33,25 +34,42 @@ type ShardMap map[*core.Validator][]int
3334
// Assign returns a ShardMap containing all validators and their assigned row indices
3435
// for the given commitment.
3536
//
36-
// The rowsPerValidator parameter specifies how many rows each validator receives.
37-
// Total rows distributed = rowsPerValidator * len(validators).
37+
// Rows are distributed based on the relation: originalRows / rows = livenessThreshold / stake%
38+
// This means 33% stake should have originalRows (4096), so each validator gets:
39+
// rows = ceil(originalRows * stake% / livenessThreshold)
3840
//
39-
// It uses a chacha8 RNG with the commitment as the seed to shuffle the row indices
40-
// using the Fisher-Yates algorithm.
41-
func (s Set) Assign(commitment rsema1d.Commitment, rowsPerValidator int) ShardMap {
42-
if len(s.Validators) == 0 || rowsPerValidator == 0 {
41+
// The minRows parameter ensures every validator receives at least that many rows
42+
// for unique decodability security, even if their proportional share would be less.
43+
//
44+
// When the sum of assigned rows exceeds totalRows (due to minRows floor guarantees),
45+
// row indices wrap around using modulo arithmetic. This means the same row may be
46+
// assigned to multiple validators, ensuring all validators receive their required
47+
// minimum while maintaining deterministic assignment.
48+
//
49+
// It uses a ChaCha8 RNG seeded with the commitment to shuffle the row indices
50+
// using the Fisher-Yates algorithm, ensuring deterministic and uniform distribution.
51+
func (s Set) Assign(commitment rsema1d.Commitment, totalRows, originalRows, minRows int, livenessThreshold cmtmath.Fraction) ShardMap {
52+
if len(s.Validators) == 0 || totalRows == 0 || minRows == 0 {
4353
return make(ShardMap)
4454
}
4555

46-
totalRows := rowsPerValidator * len(s.Validators)
56+
// rows = ceil(originalRows * stake% / livenessThreshold)
57+
// = ceil(originalRows * votingPower * denominator / (totalVotingPower * numerator))
58+
rowsPerValidator := make([]int, len(s.Validators))
59+
for i, v := range s.Validators {
60+
num := int64(originalRows) * v.VotingPower * int64(livenessThreshold.Denominator)
61+
den := s.TotalVotingPower() * int64(livenessThreshold.Numerator)
62+
rows := int((num + den - 1) / den) // ceil division
63+
rowsPerValidator[i] = max(rows, minRows)
64+
}
4765

4866
var seed [32]byte
4967
copy(seed[:], commitment[:])
5068

5169
// chacha8 RNG with seed being the commitment
5270
rng := rand.New(rand.NewChaCha8(seed))
5371

54-
// shuffle row indices with Fisher-Yates algorithm
72+
// shuffle all totalRows indices with Fisher-Yates algorithm
5573
// NOTE: std library Shuffle implements Fisher-Yates algorithm
5674
rowsIndicies := make([]int, totalRows)
5775
for i := range totalRows {
@@ -61,11 +79,17 @@ func (s Set) Assign(commitment rsema1d.Commitment, rowsPerValidator int) ShardMa
6179
rowsIndicies[i], rowsIndicies[j] = rowsIndicies[j], rowsIndicies[i]
6280
})
6381

64-
// assign rows to validators in a ShardMap
82+
// assign rows to validators, wrapping around with modulo if total assigned exceeds totalRows
6583
shardMap := make(ShardMap)
84+
offset := 0
6685
for i, validator := range s.Validators {
67-
offset := i * rowsPerValidator
68-
shardMap[validator] = rowsIndicies[offset : offset+rowsPerValidator]
86+
rows := make([]int, rowsPerValidator[i])
87+
for j := range rows {
88+
// modulo ensures wrap-around when minRows causes over-assignment
89+
rows[j] = rowsIndicies[(offset+j)%totalRows]
90+
}
91+
shardMap[validator] = rows
92+
offset += rowsPerValidator[i]
6993
}
7094

7195
return shardMap

0 commit comments

Comments
 (0)