Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 106 additions & 75 deletions fibre/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (
ErrBlobTooLarge = errors.New("blob size exceeds maximum allowed size")
// ErrBlobCommitmentMismatch is returned when the reconstructed commitment doesn't match the expected one.
ErrBlobCommitmentMismatch = errors.New("commitment mismatch: reconstructed data doesn't match expected commitment")
// ErrBlobConsumed is returned when a blob is reused after being uploaded.
ErrBlobConsumed = errors.New("blob cannot be reused after upload")
)

// BlobConfig contains configuration parameters for blob encoding and decoding.
Expand All @@ -35,11 +37,26 @@ type BlobConfig struct {
MaxDataSize int
// CodingWorkers is the number of workers to use for encoding and decoding rsema1d.
CodingWorkers int

// Coder is a cached Reed-Solomon encoder/decoder for encoding blobs.
Coder *rsema1d.Coder
// Assembler builds pooled row layouts for encoding blobs.
Assembler *RowAssembler
}

// defaultBlobConfigV0 is the shared default config, created at init time.
var defaultBlobConfigV0 = func() BlobConfig {
cfg, err := NewBlobConfigFromParams(0, DefaultProtocolParams)
if err != nil {
panic(fmt.Sprintf("creating default blob config v0: %v", err))
}
return cfg
}()

// DefaultBlobConfigV0 returns a [BlobConfig] with default values for version 0.
// The config is created once at init and shared across all callers.
func DefaultBlobConfigV0() BlobConfig {
return NewBlobConfigFromParams(0, DefaultProtocolParams)
return defaultBlobConfigV0
}

// BlobConfigForVersion returns the [BlobConfig] for the given blob version.
Expand All @@ -55,21 +72,40 @@ func BlobConfigForVersion(version uint8) (BlobConfig, error) {

// NewBlobConfigFromParams creates a [BlobConfig] with values derived from the given [ProtocolParams].
// Use this when you need a config with non-default protocol parameters (e.g., for testing).
func NewBlobConfigFromParams(blobVersion uint8, p ProtocolParams) BlobConfig {
func NewBlobConfigFromParams(blobVersion uint8, params ProtocolParams) (BlobConfig, error) {
if blobVersion != 0 {
panic(fmt.Sprintf("unsupported blob version: %d", blobVersion))
return BlobConfig{}, fmt.Errorf("unsupported blob version: %d", blobVersion)
}

workers := runtime.GOMAXPROCS(0)
codecCfg := &rsema1d.Config{
K: params.Rows,
N: params.ParityRows(),
WorkerCount: workers,
}

assembler, err := NewRowAssembler(codecCfg, params.MaxRowSize(blobVersion))
if err != nil {
return BlobConfig{}, fmt.Errorf("creating row assembler: %w", err)
}

coder, err := rsema1d.NewCoder(codecCfg, reedsolomon.WithWorkAllocator(assembler.WorkAllocator()))
if err != nil {
return BlobConfig{}, fmt.Errorf("creating rsema1d coder: %w", err)
}

return BlobConfig{
BlobVersion: blobVersion,
OriginalRows: p.Rows,
ParityRows: p.ParityRows(),
OriginalRows: params.Rows,
ParityRows: params.ParityRows(),
RowSize: func(dataLen int) int {
return p.RowSize(blobVersion, dataLen+blobHeaderLen)
return params.RowSize(blobVersion, dataLen+blobHeaderLen)
},
MaxDataSize: p.MaxBlobSize - blobHeaderLen, // subtract the header overhead
CodingWorkers: runtime.GOMAXPROCS(0),
}
MaxDataSize: params.MaxBlobSize - blobHeaderLen,
CodingWorkers: workers,
Coder: coder,
Assembler: assembler,
}, nil
}

// TotalRows returns the total number of rows (OriginalRows + ParityRows).
Expand All @@ -93,7 +129,6 @@ type Blob struct {
extendedData *rsema1d.ExtendedData
id BlobID
originalID BlobID
rlcCoeffs []field.GF128

// holds meta fields about the blob
header blobHeaderV0
Expand All @@ -102,40 +137,46 @@ type Blob struct {

// fields for reconstruction
rows [][]byte

consumed atomic.Bool

// asm owns pooled row storage from the assembler and supports per-validator
// release of parity slots. Nil for reconstructed blobs.
asm *Assembly
}

// NewBlob creates a new [Blob] instance by encoding the data.
// It takes the data and a [BlobConfig].
// It takes ownership of data and may reuse it directly as backing storage for
// original rows. Callers must not modify data after calling NewBlob.
//
// The data is prefixed with a header containing the blob version and data size.
// Returns [ErrBlobTooLarge] if the data size exceeds BlobConfig.MaxDataSize.
func NewBlob(data []byte, cfg BlobConfig) (d *Blob, err error) {
//
// The returned blob holds pooled row buffers from the [RowAssembler].
// Pooled storage is released automatically when [Client.Upload] completes.
// Data must not be modified after ownership is transferred to the blob.
func NewBlob(data []byte, cfg BlobConfig) (*Blob, error) {
if len(data) == 0 {
return nil, fmt.Errorf("data cannot be empty")
}
if len(data) > cfg.MaxDataSize {
return nil, fmt.Errorf("%w: data size %d exceeds maximum %d", ErrBlobTooLarge, len(data), cfg.MaxDataSize)
}

d = &Blob{
cfg: cfg,
header: newBlobHeaderV0(len(data)),
data: data,
}

rows := d.header.encodeToRows(data, cfg)
var rsemaCommitment rsema1d.Commitment
d.extendedData, rsemaCommitment, d.rlcCoeffs, err = rsema1d.Encode(rows, &rsema1d.Config{
K: cfg.OriginalRows,
N: cfg.ParityRows,
RowSize: len(rows[0]),
WorkerCount: cfg.CodingWorkers,
})
header := newBlobHeaderV0(len(data))
extendedData, asm, err := header.encode(data, cfg)
if err != nil {
return nil, fmt.Errorf("encoding data: %w", err)
return nil, err
}
d.id = NewBlobID(cfg.BlobVersion, rsemaCommitment)

return d, nil
return &Blob{
cfg: cfg,
extendedData: extendedData,
id: NewBlobID(cfg.BlobVersion, extendedData.Commitment()),
header: header,
data: data,
asm: asm,
}, nil
}

// NewEmptyBlob creates a new [Blob] instance for receiving and reconstructing data.
Expand Down Expand Up @@ -191,9 +232,12 @@ func (d *Blob) Config() BlobConfig {
return d.cfg
}

// RLCCoeffs returns RLC coefficients of the original data.
func (d *Blob) RLCCoeffs() []field.GF128 {
return d.rlcCoeffs
// RLC returns the computed random linear combination values for the original rows.
func (d *Blob) RLC() []field.GF128 {
if d.extendedData == nil {
return nil
}
return d.extendedData.RLC()
}

// RowSize returns the size of each row in bytes.
Expand All @@ -220,12 +264,17 @@ func (d *Blob) Data() []byte {
return d.data
}

// Row returns the [rsema1d.RowInclusionProof] for the given index from the extended data.
// Row returns the [rsema1d.RowInclusionProof] for the given index. Live parity
// rows are served even after other validators have released theirs; only the
// specific released rows return an error. After terminal release, all calls
// return an error.
func (d *Blob) Row(index int) (*rsema1d.RowInclusionProof, error) {
if d.extendedData == nil {
return nil, fmt.Errorf("no extended data available")
}

if d.asm.Freed(index) {
return nil, fmt.Errorf("row %d: storage has been released", index)
}
return d.extendedData.GenerateRowInclusionProof(index)
}

Expand Down Expand Up @@ -389,7 +438,7 @@ func (d *Blob) Reconstruct(opts ...ReconstructOption) error {
RowSize: len(d.rows[0]), // NOTE: successful reconstruct must fill all rows, so if this ever panics something is really wrong
WorkerCount: d.cfg.CodingWorkers,
}
extendedData, reconstructedCommitment, rlcCoeffs, err := rsema1d.EncodeParity(d.rows, config)
extendedData, reconstructedCommitment, _, err := rsema1d.EncodeParity(d.rows, config)
if err != nil {
return fmt.Errorf("encoding parity: %w", err)
}
Expand All @@ -408,14 +457,18 @@ func (d *Blob) Reconstruct(opts ...ReconstructOption) error {

d.data = originalData
d.extendedData = extendedData
d.rlcCoeffs = rlcCoeffs
if opt.skipCommitmentCheck {
d.originalID = d.id
d.id = NewBlobID(d.cfg.BlobVersion, Commitment(reconstructedCommitment))
}
return nil
}

// consume marks the blob as owned by an upload. Returns false if already consumed.
func (d *Blob) consume() bool {
return d.consumed.CompareAndSwap(false, true)
}

const (
// blobVersionLen is the length of the version field in bytes.
blobVersionLen = 1
Expand All @@ -440,42 +493,20 @@ func newBlobHeaderV0(dataSize int) blobHeaderV0 {
}
}

// encodeToRows encodes the data into rows with version 0 header format.
// Returns OriginalRows rows of calculated rowSize bytes each, padding with zeros as needed.
// The first row contains the header followed by data.
func (h blobHeaderV0) encodeToRows(data []byte, cfg BlobConfig) [][]byte {
// encode assembles rows from data, writes the blob header, and produces the
// commitment via the [rsema1d.Coder]. Returns the extended data and an
// [Assembly] that owns the pooled row storage for the blob's lifetime.
func (h blobHeaderV0) encode(data []byte, cfg BlobConfig) (*rsema1d.ExtendedData, *Assembly, error) {
rowSize := cfg.RowSize(len(data))
rows := make([][]byte, cfg.OriginalRows)

// First row: allocate and write header + beginning of data
rows[0] = make([]byte, rowSize)
h.encode(rows[0])

// Copy as much data as fits in the first row after the header
firstRowDataSize := min(rowSize-blobHeaderLen, len(data))
copy(rows[0][blobHeaderLen:], data[:firstRowDataSize])

// Remaining rows: use slices from data (offset by what we already used)
dataOffset := firstRowDataSize
for i := 1; i < cfg.OriginalRows; i++ {
start := dataOffset
end := start + rowSize
dataOffset += rowSize

if end <= len(data) {
// Full row available in data - use slice directly
rows[i] = data[start:end:end]
continue
}
// Some or no data left - allocate zero-filled padded row
rows[i] = make([]byte, rowSize)
if start < len(data) {
// Partial row - insert the remaining data into the row
copy(rows[i], data[start:])
}
}
rows, asm := cfg.Assembler.Assemble(data, rowSize, blobHeaderLen)
h.marshalTo(rows[0])

return rows
extData, err := cfg.Coder.Encode(rows)
if err != nil {
asm.Free(nil)
return nil, nil, fmt.Errorf("encoding data: %w", err)
}
return extData, asm, nil
}

// decodeFromRows decodes the data from rows with version 0 header format.
Expand All @@ -491,7 +522,7 @@ func (h *blobHeaderV0) decodeFromRows(rows [][]byte, cfg BlobConfig) ([]byte, er
}

// decode header from first row
if err := h.decode(rows[0]); err != nil {
if err := h.unmarshalFrom(rows[0]); err != nil {
return nil, fmt.Errorf("decoding header: %w", err)
}

Expand Down Expand Up @@ -525,18 +556,18 @@ func (h *blobHeaderV0) decodeFromRows(rows [][]byte, cfg BlobConfig) ([]byte, er
return data, nil
}

// encode writes the version 0 blob header into the provided buffer.
// marshalTo writes the version 0 blob header into the provided buffer.
// The buffer must be at least blobHeaderLen bytes long.
// Always writes version byte as 0.
func (h blobHeaderV0) encode(buf []byte) {
func (h blobHeaderV0) marshalTo(buf []byte) {
buf[0] = 0 // version 0
binary.BigEndian.PutUint32(buf[blobVersionLen:blobHeaderLen], h.dataSize)
}

// decode reads the blob header from the provided buffer.
// unmarshalFrom reads the blob header from the provided buffer.
// The buffer must be at least blobHeaderLen bytes long.
// Returns an error if the version byte is not 0.
func (h *blobHeaderV0) decode(buf []byte) error {
func (h *blobHeaderV0) unmarshalFrom(buf []byte) error {
if buf[0] != 0 {
return fmt.Errorf("invalid blob version: expected 0, got %d", buf[0])
}
Expand Down
Loading
Loading