Skip to content

Commit e5cb881

Browse files
Wondertanclaude
andcommitted
feat(fibre): bucketed row batch pool with mmap-backed allocator
Introduces fibre/internal/row, a bucketed allocator of fixed-shape row batches used by the blob encode path and the rsema1d codec's work buffers. Replaces the per-encode sync.Pool with explicit retention (aged eviction, idle-grace drop) and mmap-backed regions above 1 MiB, keeping steady-state RSS proportional to concurrent in-flight encodes rather than worst-case per-worker reservation. Allocations run without holding the pool lock so a fresh mmap doesn't stall concurrent Gets/Puts behind a multi-ms syscall. row.Assembler layers a K+N row view on top of the pool: original rows alias input data zero-copy where possible, parity+head+tail come from a single pooled batch released as one unit. ProtocolParams.CodecWorkRows() exposes leopard-GF16's work-row count so callers size the pool without pool code needing to know codec internals. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bb697ec commit e5cb881

21 files changed

Lines changed: 1846 additions & 158 deletions

fibre/blob.go

Lines changed: 105 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99
"sync/atomic"
1010

11+
"github.com/celestiaorg/celestia-app/v9/fibre/internal/row"
1112
"github.com/celestiaorg/celestia-app/v9/pkg/rsema1d"
1213
"github.com/celestiaorg/celestia-app/v9/pkg/rsema1d/field"
1314
"github.com/klauspost/reedsolomon"
@@ -18,6 +19,8 @@ var (
1819
ErrBlobTooLarge = errors.New("blob size exceeds maximum allowed size")
1920
// ErrBlobCommitmentMismatch is returned when the reconstructed commitment doesn't match the expected one.
2021
ErrBlobCommitmentMismatch = errors.New("commitment mismatch: reconstructed data doesn't match expected commitment")
22+
// ErrBlobConsumed is returned when a blob is reused after being uploaded.
23+
ErrBlobConsumed = errors.New("blob cannot be reused after upload")
2124
)
2225

2326
// BlobConfig contains configuration parameters for blob encoding and decoding.
@@ -35,11 +38,26 @@ type BlobConfig struct {
3538
MaxDataSize int
3639
// CodingWorkers is the number of workers to use for encoding and decoding rsema1d.
3740
CodingWorkers int
41+
42+
// Coder is a cached Reed-Solomon encoder/decoder for encoding blobs.
43+
Coder *rsema1d.Coder
44+
// Assembler builds pooled row layouts for encoding blobs.
45+
Assembler *row.Assembler
3846
}
3947

48+
// defaultBlobConfigV0 is the shared default config, created at init time.
49+
var defaultBlobConfigV0 = func() BlobConfig {
50+
cfg, err := NewBlobConfigFromParams(0, DefaultProtocolParams)
51+
if err != nil {
52+
panic(fmt.Sprintf("creating default blob config v0: %v", err))
53+
}
54+
return cfg
55+
}()
56+
4057
// DefaultBlobConfigV0 returns a [BlobConfig] with default values for version 0.
58+
// The config is created once at init and shared across all callers.
4159
func DefaultBlobConfigV0() BlobConfig {
42-
return NewBlobConfigFromParams(0, DefaultProtocolParams)
60+
return defaultBlobConfigV0
4361
}
4462

4563
// BlobConfigForVersion returns the [BlobConfig] for the given blob version.
@@ -55,21 +73,39 @@ func BlobConfigForVersion(version uint8) (BlobConfig, error) {
5573

5674
// NewBlobConfigFromParams creates a [BlobConfig] with values derived from the given [ProtocolParams].
5775
// Use this when you need a config with non-default protocol parameters (e.g., for testing).
58-
func NewBlobConfigFromParams(blobVersion uint8, p ProtocolParams) BlobConfig {
76+
func NewBlobConfigFromParams(blobVersion uint8, params ProtocolParams) (BlobConfig, error) {
5977
if blobVersion != 0 {
60-
panic(fmt.Sprintf("unsupported blob version: %d", blobVersion))
78+
return BlobConfig{}, fmt.Errorf("unsupported blob version: %d", blobVersion)
79+
}
80+
81+
maxRowSize := params.MaxRowSize(blobVersion)
82+
pool := row.New(maxRowSize, row.AssemblerBatchRows(params.ParityRows()), params.CodecWorkRows())
83+
assembler, err := row.NewAssembler(params.Rows, params.ParityRows(), pool)
84+
if err != nil {
85+
return BlobConfig{}, fmt.Errorf("creating row assembler: %w", err)
86+
}
87+
88+
coder, err := rsema1d.NewCoder(&rsema1d.Config{
89+
K: params.Rows,
90+
N: params.ParityRows(),
91+
WorkerCount: runtime.GOMAXPROCS(0),
92+
}, reedsolomon.WithWorkAllocator(pool))
93+
if err != nil {
94+
return BlobConfig{}, fmt.Errorf("creating rsema1d coder: %w", err)
6195
}
6296

6397
return BlobConfig{
6498
BlobVersion: blobVersion,
65-
OriginalRows: p.Rows,
66-
ParityRows: p.ParityRows(),
99+
OriginalRows: params.Rows,
100+
ParityRows: params.ParityRows(),
67101
RowSize: func(dataLen int) int {
68-
return p.RowSize(blobVersion, dataLen+blobHeaderLen)
102+
return params.RowSize(blobVersion, dataLen+blobHeaderLen)
69103
},
70-
MaxDataSize: p.MaxBlobSize - blobHeaderLen, // subtract the header overhead
104+
MaxDataSize: params.MaxBlobSize - blobHeaderLen,
71105
CodingWorkers: runtime.GOMAXPROCS(0),
72-
}
106+
Coder: coder,
107+
Assembler: assembler,
108+
}, nil
73109
}
74110

75111
// TotalRows returns the total number of rows (OriginalRows + ParityRows).
@@ -93,7 +129,6 @@ type Blob struct {
93129
extendedData *rsema1d.ExtendedData
94130
id BlobID
95131
originalID BlobID
96-
rlcCoeffs []field.GF128
97132

98133
// holds meta fields about the blob
99134
header blobHeaderV0
@@ -102,40 +137,46 @@ type Blob struct {
102137

103138
// fields for reconstruction
104139
rows [][]byte
140+
141+
consumed atomic.Bool
142+
143+
// asm owns pooled row storage from the assembler. Nil for reconstructed
144+
// blobs.
145+
asm *row.Assembly
105146
}
106147

107148
// NewBlob creates a new [Blob] instance by encoding the data.
108-
// It takes the data and a [BlobConfig].
149+
// It takes ownership of data and may reuse it directly as backing storage for
150+
// original rows. Callers must not modify data after calling NewBlob.
151+
//
109152
// The data is prefixed with a header containing the blob version and data size.
110153
// Returns [ErrBlobTooLarge] if the data size exceeds BlobConfig.MaxDataSize.
111-
func NewBlob(data []byte, cfg BlobConfig) (d *Blob, err error) {
154+
//
155+
// The returned blob holds pooled row buffers from the [row.Assembler].
156+
// Pooled storage is released automatically when [Client.Upload] completes.
157+
// Data must not be modified after ownership is transferred to the blob.
158+
func NewBlob(data []byte, cfg BlobConfig) (*Blob, error) {
112159
if len(data) == 0 {
113160
return nil, fmt.Errorf("data cannot be empty")
114161
}
115162
if len(data) > cfg.MaxDataSize {
116163
return nil, fmt.Errorf("%w: data size %d exceeds maximum %d", ErrBlobTooLarge, len(data), cfg.MaxDataSize)
117164
}
118165

119-
d = &Blob{
120-
cfg: cfg,
121-
header: newBlobHeaderV0(len(data)),
122-
data: data,
123-
}
124-
125-
rows := d.header.encodeToRows(data, cfg)
126-
var rsemaCommitment rsema1d.Commitment
127-
d.extendedData, rsemaCommitment, d.rlcCoeffs, err = rsema1d.Encode(rows, &rsema1d.Config{
128-
K: cfg.OriginalRows,
129-
N: cfg.ParityRows,
130-
RowSize: len(rows[0]),
131-
WorkerCount: cfg.CodingWorkers,
132-
})
166+
header := newBlobHeaderV0(len(data))
167+
extendedData, asm, err := header.encode(data, cfg)
133168
if err != nil {
134-
return nil, fmt.Errorf("encoding data: %w", err)
169+
return nil, err
135170
}
136-
d.id = NewBlobID(cfg.BlobVersion, rsemaCommitment)
137171

138-
return d, nil
172+
return &Blob{
173+
cfg: cfg,
174+
extendedData: extendedData,
175+
id: NewBlobID(cfg.BlobVersion, extendedData.Commitment()),
176+
header: header,
177+
data: data,
178+
asm: asm,
179+
}, nil
139180
}
140181

141182
// NewEmptyBlob creates a new [Blob] instance for receiving and reconstructing data.
@@ -191,9 +232,12 @@ func (d *Blob) Config() BlobConfig {
191232
return d.cfg
192233
}
193234

194-
// RLCCoeffs returns RLC coefficients of the original data.
195-
func (d *Blob) RLCCoeffs() []field.GF128 {
196-
return d.rlcCoeffs
235+
// RLC returns the computed random linear combination values for the original rows.
236+
func (d *Blob) RLC() []field.GF128 {
237+
if d.extendedData == nil {
238+
return nil
239+
}
240+
return d.extendedData.RLC()
197241
}
198242

199243
// RowSize returns the size of each row in bytes.
@@ -220,12 +264,16 @@ func (d *Blob) Data() []byte {
220264
return d.data
221265
}
222266

223-
// Row returns the [rsema1d.RowInclusionProof] for the given index from the extended data.
267+
// Row returns the [rsema1d.RowInclusionProof] for the given index. Rows are
268+
// served until the blob's pooled storage is released; after release, every
269+
// call returns an error.
224270
func (d *Blob) Row(index int) (*rsema1d.RowInclusionProof, error) {
225271
if d.extendedData == nil {
226272
return nil, fmt.Errorf("no extended data available")
227273
}
228-
274+
if d.asm.Released() {
275+
return nil, fmt.Errorf("row %d: storage has been released", index)
276+
}
229277
return d.extendedData.GenerateRowInclusionProof(index)
230278
}
231279

@@ -389,7 +437,7 @@ func (d *Blob) Reconstruct(opts ...ReconstructOption) error {
389437
RowSize: len(d.rows[0]), // NOTE: successful reconstruct must fill all rows, so if this ever panics something is really wrong
390438
WorkerCount: d.cfg.CodingWorkers,
391439
}
392-
extendedData, reconstructedCommitment, rlcCoeffs, err := rsema1d.EncodeParity(d.rows, config)
440+
extendedData, reconstructedCommitment, _, err := rsema1d.EncodeParity(d.rows, config)
393441
if err != nil {
394442
return fmt.Errorf("encoding parity: %w", err)
395443
}
@@ -408,14 +456,18 @@ func (d *Blob) Reconstruct(opts ...ReconstructOption) error {
408456

409457
d.data = originalData
410458
d.extendedData = extendedData
411-
d.rlcCoeffs = rlcCoeffs
412459
if opt.skipCommitmentCheck {
413460
d.originalID = d.id
414461
d.id = NewBlobID(d.cfg.BlobVersion, Commitment(reconstructedCommitment))
415462
}
416463
return nil
417464
}
418465

466+
// consume marks the blob as owned by an upload. Returns false if already consumed.
467+
func (d *Blob) consume() bool {
468+
return d.consumed.CompareAndSwap(false, true)
469+
}
470+
419471
const (
420472
// blobVersionLen is the length of the version field in bytes.
421473
blobVersionLen = 1
@@ -440,42 +492,21 @@ func newBlobHeaderV0(dataSize int) blobHeaderV0 {
440492
}
441493
}
442494

443-
// encodeToRows encodes the data into rows with version 0 header format.
444-
// Returns OriginalRows rows of calculated rowSize bytes each, padding with zeros as needed.
445-
// The first row contains the header followed by data.
446-
func (h blobHeaderV0) encodeToRows(data []byte, cfg BlobConfig) [][]byte {
495+
// encode assembles rows from data, writes the blob header, and produces the
496+
// commitment via the [rsema1d.Coder]. Returns the extended data and an
497+
// [Assembly] that owns the pooled row storage for the blob's lifetime.
498+
func (h blobHeaderV0) encode(data []byte, cfg BlobConfig) (*rsema1d.ExtendedData, *row.Assembly, error) {
447499
rowSize := cfg.RowSize(len(data))
448-
rows := make([][]byte, cfg.OriginalRows)
449-
450-
// First row: allocate and write header + beginning of data
451-
rows[0] = make([]byte, rowSize)
452-
h.encode(rows[0])
453-
454-
// Copy as much data as fits in the first row after the header
455-
firstRowDataSize := min(rowSize-blobHeaderLen, len(data))
456-
copy(rows[0][blobHeaderLen:], data[:firstRowDataSize])
457-
458-
// Remaining rows: use slices from data (offset by what we already used)
459-
dataOffset := firstRowDataSize
460-
for i := 1; i < cfg.OriginalRows; i++ {
461-
start := dataOffset
462-
end := start + rowSize
463-
dataOffset += rowSize
464-
465-
if end <= len(data) {
466-
// Full row available in data - use slice directly
467-
rows[i] = data[start:end:end]
468-
continue
469-
}
470-
// Some or no data left - allocate zero-filled padded row
471-
rows[i] = make([]byte, rowSize)
472-
if start < len(data) {
473-
// Partial row - insert the remaining data into the row
474-
copy(rows[i], data[start:])
475-
}
476-
}
500+
asm := cfg.Assembler.Assemble(data, rowSize, blobHeaderLen)
501+
rows := asm.Rows()
502+
h.marshalTo(rows[0])
477503

478-
return rows
504+
extData, err := cfg.Coder.Encode(rows)
505+
if err != nil {
506+
asm.Free()
507+
return nil, nil, fmt.Errorf("encoding data: %w", err)
508+
}
509+
return extData, asm, nil
479510
}
480511

481512
// decodeFromRows decodes the data from rows with version 0 header format.
@@ -491,7 +522,7 @@ func (h *blobHeaderV0) decodeFromRows(rows [][]byte, cfg BlobConfig) ([]byte, er
491522
}
492523

493524
// decode header from first row
494-
if err := h.decode(rows[0]); err != nil {
525+
if err := h.unmarshalFrom(rows[0]); err != nil {
495526
return nil, fmt.Errorf("decoding header: %w", err)
496527
}
497528

@@ -525,18 +556,18 @@ func (h *blobHeaderV0) decodeFromRows(rows [][]byte, cfg BlobConfig) ([]byte, er
525556
return data, nil
526557
}
527558

528-
// encode writes the version 0 blob header into the provided buffer.
559+
// marshalTo writes the version 0 blob header into the provided buffer.
529560
// The buffer must be at least blobHeaderLen bytes long.
530561
// Always writes version byte as 0.
531-
func (h blobHeaderV0) encode(buf []byte) {
562+
func (h blobHeaderV0) marshalTo(buf []byte) {
532563
buf[0] = 0 // version 0
533564
binary.BigEndian.PutUint32(buf[blobVersionLen:blobHeaderLen], h.dataSize)
534565
}
535566

536-
// decode reads the blob header from the provided buffer.
567+
// unmarshalFrom reads the blob header from the provided buffer.
537568
// The buffer must be at least blobHeaderLen bytes long.
538569
// Returns an error if the version byte is not 0.
539-
func (h *blobHeaderV0) decode(buf []byte) error {
570+
func (h *blobHeaderV0) unmarshalFrom(buf []byte) error {
540571
if buf[0] != 0 {
541572
return fmt.Errorf("invalid blob version: expected 0, got %d", buf[0])
542573
}

0 commit comments

Comments
 (0)