Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1771,10 +1771,6 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) =
node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS div 2:
return

# Currently, this logic is broken
if true:
return

logScope:
slot = slot

Expand Down
94 changes: 74 additions & 20 deletions beacon_chain/spec/peerdas_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry],

ok(extended_matrix)

proc recoverCellsAndKzgProofsTask(cellIndices: seq[CellIndex],
cells: seq[Cell]): Result[CellsAndProofs, void] =
proc recoverCellsAndKzgProofsTask(cellIndices: openArray[CellIndex],
cells: openArray[Cell]): Result[CellsAndProofs, void] =
recoverCellsAndKzgProofs(cellIndices, cells).mapErr(
proc (x: string) =
discard)
Expand All @@ -153,58 +153,112 @@ proc recover_cells_and_proofs_parallel*(
tp: Taskpool,
dataColumns: seq[ref fulu.DataColumnSidecar]):
Result[seq[CellsAndProofs], cstring] =
## This helper recovers blobs from the data column sidecars parallelly
## Recover blobs from data column sidecars in parallel.
## - Uses Nim sequences with pointer passing for worker inputs
## - Bounds in-flight tasks to limit peak memory/alloc pressure.
## - Ensures all spawned tasks are awaited (drained) on any early return.

if dataColumns.len == 0:
return err("DataColumnSidecar: Length should not be 0")
if dataColumns.len > NUMBER_OF_COLUMNS:
return err("DataColumnSidecar: Length exceeds NUMBER_OF_COLUMNS")

let
columnCount = dataColumns.len
blobCount = dataColumns[0].column.len

for column in dataColumns:
if not (blobCount == column.column.len):
if blobCount != column.column.len:
return err("DataColumns do not have the same length")

proc workerRecover(idxPtr: ptr CellIndex, cellsPtr: ptr Cell,
columnCount: int): Result[CellsAndProofs, void] =
let
idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr)
cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr)
# Use toOpenArray to create views without copying
recoverCellsAndKzgProofsTask(
idxArr.toOpenArray(0, columnCount - 1),
cellsArr.toOpenArray(0, columnCount - 1))

var
pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]]
pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] = @[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The = @[] doesn't do anything here.

# Store actual data sequences instead of C pointers
pendingIndices: seq[seq[CellIndex]] = @[]
pendingCells: seq[seq[Cell]] = @[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise either of these places. https://nim-lang.org/docs/manual.html#statements-and-expressions-var-statement

Variables are always initialized with a default value if there is no initializing expression. The default
value depends on the type and is always a zero in binary.

where for a sequence, this default is documented to be @[].

res = newSeq[CellsAndProofs](blobCount)

# pre-size sequences so we can index-assign without reallocs
pendingFuts.setLen(blobCount)
pendingIndices.setLen(blobCount)
pendingCells.setLen(blobCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these are going to be setLen'd immediately, why not newSeq[Flowvar[Result[CellsAndProofs, void]]](blobCount) (respectively, each seq type) them to begin with?


# track how many we've actually spawned
var spawned = 0

# Choose a sane limit for concurrent tasks to reduce peak memory pressure.
let maxInFlight = min(blobCount, 9)

let startTime = Moment.now()
const reconstructionTimeout = 2.seconds

# ---- Spawn phase with time limit ----
var completed = 0

# ---- Spawn + bounded-await loop ----
for blobIdx in 0 ..< blobCount:
let now = Moment.now()
if (now - startTime) > reconstructionTimeout:
debug "PeerDAS reconstruction timed out while preparing columns",
spawned = pendingFuts.len, total = blobCount
break # Stop spawning new tasks
trace "PeerDAS reconstruction timed out while preparing columns",
spawned = spawned, total = blobCount
return err("Data column reconstruction timed out")

# Use regular Nim sequences
var
cellIndices = newSeq[CellIndex](columnCount)
indices = newSeq[CellIndex](columnCount)
cells = newSeq[Cell](columnCount)

for i in 0 ..< dataColumns.len:
cellIndices[i] = dataColumns[i][].index
indices[i] = dataColumns[i][].index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The redundant/indirect assignment/copying via indices and cells is fairly expensive. Can directly ensure that pendingIndices[spawned] and pendingCells[spawned] are the right side, the have this loop assign into pendingIndices[spawned] item by item directly (respectively, pendingCells[spawned])

Can also use addr pendingIndices[spawned]/addr pendingCells[spawned] can avoid dataColumns.len lookups/bound checks/etc and allow only dereferencing two pointers each loop iteration to do so, basically caching the address of each seq item.

cells[i] = dataColumns[i][].column[blobIdx]
pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells))

# ---- Sync phase ----
for i in 0 ..< pendingFuts.len:
# Store sequences and spawn worker with pointers to their data
pendingIndices[spawned] = indices
pendingCells[spawned] = cells
pendingFuts[spawned] = tp.spawn workerRecover(
addr pendingIndices[spawned][0],
addr pendingCells[spawned][0],
columnCount)
inc spawned

# If too many in-flight tasks, await the oldest one
while spawned - completed >= maxInFlight:
let now2 = Moment.now()
if (now2 - startTime) > reconstructionTimeout:
trace "PeerDAS reconstruction timed out while awaiting tasks",
completed = completed, totalSpawned = spawned
return err("Data column reconstruction timed out")

let futRes = sync pendingFuts[completed]

if futRes.isErr:
return err("KZG cells and proofs recovery failed")
res[completed] = futRes.get
inc completed

# ---- Wait for remaining spawned tasks ----
for i in completed ..< spawned:
let now = Moment.now()
if (now - startTime) > reconstructionTimeout:
debug "PeerDAS reconstruction timed out",
completed = i, totalSpawned = pendingFuts.len
trace "PeerDAS reconstruction timed out during final sync",
completed = i, totalSpawned = spawned
return err("Data column reconstruction timed out")

let futRes = sync pendingFuts[i]

if futRes.isErr:
return err("KZG cells and proofs recovery failed")

res[i] = futRes.get

if pendingFuts.len < blobCount:
return err("Data column reconstruction timed out")

ok(res)

proc assemble_data_column_sidecars*(
Expand Down
Loading