diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index c0e9cdfbcf..25f5b122d4 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1771,10 +1771,6 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) = node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS div 2: return - # Currently, this logic is broken - if true: - return - logScope: slot = slot diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index eaf3fe1ac2..794fd0a363 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -143,8 +143,8 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry], ok(extended_matrix) -proc recoverCellsAndKzgProofsTask(cellIndices: seq[CellIndex], - cells: seq[Cell]): Result[CellsAndProofs, void] = +proc recoverCellsAndKzgProofsTask(cellIndices: openArray[CellIndex], + cells: openArray[Cell]): Result[CellsAndProofs, void] = recoverCellsAndKzgProofs(cellIndices, cells).mapErr( proc (x: string) = discard) @@ -153,58 +153,112 @@ proc recover_cells_and_proofs_parallel*( tp: Taskpool, dataColumns: seq[ref fulu.DataColumnSidecar]): Result[seq[CellsAndProofs], cstring] = - ## This helper recovers blobs from the data column sidecars parallelly + ## Recover blobs from data column sidecars in parallel. + ## - Uses Nim sequences with pointer passing for worker inputs + ## - Bounds in-flight tasks to limit peak memory/alloc pressure. + ## - Ensures all spawned tasks are awaited (drained) on any early return. + if dataColumns.len == 0: return err("DataColumnSidecar: Length should not be 0") + if dataColumns.len > NUMBER_OF_COLUMNS: + return err("DataColumnSidecar: Length exceeds NUMBER_OF_COLUMNS") let columnCount = dataColumns.len blobCount = dataColumns[0].column.len for column in dataColumns: - if not (blobCount == column.column.len): + if blobCount != column.column.len: return err("DataColumns do not have the same length") + proc workerRecover(idxPtr: ptr CellIndex, cellsPtr: ptr Cell, + columnCount: int): Result[CellsAndProofs, void] = + let + idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) + cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) + # Use toOpenArray to create views without copying + recoverCellsAndKzgProofsTask( + idxArr.toOpenArray(0, columnCount - 1), + cellsArr.toOpenArray(0, columnCount - 1)) + var - pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] + pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] = @[] + # Store actual data sequences instead of C pointers + pendingIndices: seq[seq[CellIndex]] = @[] + pendingCells: seq[seq[Cell]] = @[] res = newSeq[CellsAndProofs](blobCount) + # pre-size sequences so we can index-assign without reallocs + pendingFuts.setLen(blobCount) + pendingIndices.setLen(blobCount) + pendingCells.setLen(blobCount) + + # track how many we've actually spawned + var spawned = 0 + + # Choose a sane limit for concurrent tasks to reduce peak memory pressure. + let maxInFlight = min(blobCount, 9) + let startTime = Moment.now() const reconstructionTimeout = 2.seconds - # ---- Spawn phase with time limit ---- + var completed = 0 + + # ---- Spawn + bounded-await loop ---- for blobIdx in 0 ..< blobCount: let now = Moment.now() if (now - startTime) > reconstructionTimeout: - debug "PeerDAS reconstruction timed out while preparing columns", - spawned = pendingFuts.len, total = blobCount - break # Stop spawning new tasks + trace "PeerDAS reconstruction timed out while preparing columns", + spawned = spawned, total = blobCount + return err("Data column reconstruction timed out") + # Use regular Nim sequences var - cellIndices = newSeq[CellIndex](columnCount) + indices = newSeq[CellIndex](columnCount) cells = newSeq[Cell](columnCount) + for i in 0 ..< dataColumns.len: - cellIndices[i] = dataColumns[i][].index + indices[i] = dataColumns[i][].index cells[i] = dataColumns[i][].column[blobIdx] - pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells)) - # ---- Sync phase ---- - for i in 0 ..< pendingFuts.len: + # Store sequences and spawn worker with pointers to their data + pendingIndices[spawned] = indices + pendingCells[spawned] = cells + pendingFuts[spawned] = tp.spawn workerRecover( + addr pendingIndices[spawned][0], + addr pendingCells[spawned][0], + columnCount) + inc spawned + + # If too many in-flight tasks, await the oldest one + while spawned - completed >= maxInFlight: + let now2 = Moment.now() + if (now2 - startTime) > reconstructionTimeout: + trace "PeerDAS reconstruction timed out while awaiting tasks", + completed = completed, totalSpawned = spawned + return err("Data column reconstruction timed out") + + let futRes = sync pendingFuts[completed] + + if futRes.isErr: + return err("KZG cells and proofs recovery failed") + res[completed] = futRes.get + inc completed + + # ---- Wait for remaining spawned tasks ---- + for i in completed ..< spawned: let now = Moment.now() if (now - startTime) > reconstructionTimeout: - debug "PeerDAS reconstruction timed out", - completed = i, totalSpawned = pendingFuts.len + trace "PeerDAS reconstruction timed out during final sync", + completed = i, totalSpawned = spawned return err("Data column reconstruction timed out") let futRes = sync pendingFuts[i] + if futRes.isErr: return err("KZG cells and proofs recovery failed") - res[i] = futRes.get - if pendingFuts.len < blobCount: - return err("Data column reconstruction timed out") - ok(res) proc assemble_data_column_sidecars*(