From 42856805eece1ff82fa54645824fe42cc91660db Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Mon, 15 Dec 2025 18:35:08 +0530 Subject: [PATCH 1/5] parallel reconstruction changes --- beacon_chain/nimbus_beacon_node.nim | 6 +- beacon_chain/spec/peerdas_helpers.nim | 143 ++++++++++++++++++++++---- 2 files changed, 126 insertions(+), 23 deletions(-) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index c0e9cdfbcf..596ee54baf 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1771,9 +1771,9 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) = node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS div 2: return - # Currently, this logic is broken - if true: - return + # # Currently, this logic is broken + # if true: + # return logScope: slot = slot diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index eaf3fe1ac2..c3419968cd 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -23,6 +23,7 @@ import from std/algorithm import sort from std/sequtils import toSeq from stew/staticfor import staticFor +from system/ansi_c import c_malloc, c_free type CellBytes = array[fulu.CELLS_PER_EXT_BLOB, Cell] @@ -153,56 +154,158 @@ proc recover_cells_and_proofs_parallel*( tp: Taskpool, dataColumns: seq[ref fulu.DataColumnSidecar]): Result[seq[CellsAndProofs], cstring] = - ## This helper recovers blobs from the data column sidecars parallelly + ## Recover blobs from data column sidecars in parallel. + ## - Uses unmanaged C buffers for worker inputs so no Nim GC objects + ## - Bounds in-flight tasks to limit peak memory/alloc pressure. + ## - Ensures all spawned tasks are awaited (drained) on any early return. + if dataColumns.len == 0: return err("DataColumnSidecar: Length should not be 0") + if dataColumns.len > NUMBER_OF_COLUMNS: + return err("DataColumnSidecar: Length exceeds NUMBER_OF_COLUMNS") let columnCount = dataColumns.len blobCount = dataColumns[0].column.len for column in dataColumns: - if not (blobCount == column.column.len): + if blobCount != column.column.len: return err("DataColumns do not have the same length") + # Worker that runs on a taskpool thread. It only sees raw pointers and + # constructs its own worker-local seqs (on the worker's heap) before calling + # the KZG recovery routine. Keeps GC objects thread-local. + proc workerRecover(idxPtr: ptr CellIndex, cellsPtr: ptr Cell, + columnCount: int): Result[CellsAndProofs, void] = + ## Worker runs on a taskpool thread. It receives raw C buffers (ptr) and + ## converts them into worker-local seqs before calling the KZG recovery + ## routine, so no Nim GC objects cross thread-local heaps. + var + localIndices = newSeq[CellIndex](columnCount) + localCells = newSeq[Cell](columnCount) + let + idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) + cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) + for j in 0 ..< columnCount: + localIndices[j] = idxArr[j] + localCells[j] = cellsArr[j] + # use the task wrapper which maps string errors to void + recoverCellsAndKzgProofsTask(localIndices, localCells) + var - pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] + pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] = @[] + pendingIdxPtrs: seq[ptr CellIndex] = @[] + pendingCellsPtrs: seq[ptr Cell] = @[] res = newSeq[CellsAndProofs](blobCount) + # pre-size sequences so we can index-assign without reallocs + pendingFuts.setLen(blobCount) + pendingIdxPtrs.setLen(blobCount) + pendingCellsPtrs.setLen(blobCount) + + # track how many we've actually spawned + var spawned = 0 + + # Choose a sane limit for concurrent tasks to reduce peak memory/alloc pressure. + let maxInFlight = min(blobCount, 9) + let startTime = Moment.now() const reconstructionTimeout = 2.seconds - # ---- Spawn phase with time limit ---- + proc freePendingPtrPair(idxPtr: ptr CellIndex, cellsPtr: ptr Cell) = + c_free(idxPtr) + c_free(cellsPtr) + + proc drainPending(startIdx: int) = + for j in startIdx ..< spawned: + if pendingFuts[j].isReady(): + discard sync pendingFuts[j] + # Always free the memory regardless + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil + + var completed = 0 + + # ---- Spawn + bounded-await loop ---- for blobIdx in 0 ..< blobCount: let now = Moment.now() if (now - startTime) > reconstructionTimeout: - debug "PeerDAS reconstruction timed out while preparing columns", - spawned = pendingFuts.len, total = blobCount - break # Stop spawning new tasks + trace "PeerDAS reconstruction timed out while preparing columns", + spawned = spawned, total = blobCount + drainPending(0) + return err("Data column reconstruction timed out") - var - cellIndices = newSeq[CellIndex](columnCount) - cells = newSeq[Cell](columnCount) + # Allocate unmanaged C buffers and copy data into them + let + idxBytes = csize_t(columnCount) * csize_t(sizeof(CellIndex)) + cellsBytes = csize_t(columnCount) * csize_t(sizeof(Cell)) + idxPtr = cast[ptr CellIndex](c_malloc(idxBytes)) + if idxPtr == nil: + drainPending(0) + return err("Failed to allocate memory for cell indices during reconstruction") + let cellsPtr = cast[ptr Cell](c_malloc(cellsBytes)) + if cellsPtr == nil: + c_free(idxPtr) + drainPending(0) + return err("Failed to allocate memory for cell data during reconstruction") + + # populate C buffers via UncheckedArray casts + let + idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) + cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) for i in 0 ..< dataColumns.len: - cellIndices[i] = dataColumns[i][].index - cells[i] = dataColumns[i][].column[blobIdx] - pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells)) - - # ---- Sync phase ---- - for i in 0 ..< pendingFuts.len: + idxArr[i] = dataColumns[i][].index + cellsArr[i] = dataColumns[i][].column[blobIdx] + + # store into pre-sized arrays by index and spawn worker + pendingIdxPtrs[spawned] = idxPtr + pendingCellsPtrs[spawned] = cellsPtr + pendingFuts[spawned] = tp.spawn workerRecover(idxPtr, cellsPtr, columnCount) + inc spawned + + # If too many in-flight tasks, await the oldest one + while spawned - completed >= maxInFlight: + let now2 = Moment.now() + if (now2 - startTime) > reconstructionTimeout: + trace "PeerDAS reconstruction timed out while awaiting tasks", + completed = completed, totalSpawned = spawned + drainPending(completed) + return err("Data column reconstruction timed out") + + let futRes = sync pendingFuts[completed] + freePendingPtrPair(pendingIdxPtrs[completed], pendingCellsPtrs[completed]) + pendingIdxPtrs[completed] = nil + pendingCellsPtrs[completed] = nil + + if futRes.isErr: + drainPending(completed + 1) + return err("KZG cells and proofs recovery failed") + res[completed] = futRes.get + inc completed + + # ---- Wait for remaining spawned tasks ---- + for i in completed ..< spawned: let now = Moment.now() if (now - startTime) > reconstructionTimeout: - debug "PeerDAS reconstruction timed out", - completed = i, totalSpawned = pendingFuts.len + trace "PeerDAS reconstruction timed out during final sync", + completed = i, totalSpawned = spawned + drainPending(i) return err("Data column reconstruction timed out") let futRes = sync pendingFuts[i] + freePendingPtrPair(pendingIdxPtrs[i], pendingCellsPtrs[i]) + pendingIdxPtrs[i] = nil + pendingCellsPtrs[i] = nil + if futRes.isErr: + drainPending(i + 1) return err("KZG cells and proofs recovery failed") - res[i] = futRes.get - if pendingFuts.len < blobCount: + # If we spawned fewer than blobCount, spawn-phase timed out earlier + if spawned < blobCount: + drainPending(0) return err("Data column reconstruction timed out") ok(res) From 4d5dc4dd771d14955fe4a0017687bad6e0978f39 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Mon, 15 Dec 2025 23:35:44 +0530 Subject: [PATCH 2/5] avoid drain pending entirely --- beacon_chain/spec/peerdas_helpers.nim | 52 ++++++++++++++++----------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index c3419968cd..1ac36dcf76 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -213,14 +213,15 @@ proc recover_cells_and_proofs_parallel*( const reconstructionTimeout = 2.seconds proc freePendingPtrPair(idxPtr: ptr CellIndex, cellsPtr: ptr Cell) = - c_free(idxPtr) - c_free(cellsPtr) - - proc drainPending(startIdx: int) = - for j in startIdx ..< spawned: - if pendingFuts[j].isReady(): - discard sync pendingFuts[j] - # Always free the memory regardless + if not idxPtr.isNil: + c_free(idxPtr) + if not cellsPtr.isNil: + c_free(cellsPtr) + + proc freeAllAllocated() = + ## Free all C memory allocated so far, without syncing futures. + ## Only call this on error paths where we're aborting. + for j in 0 ..< spawned: freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) pendingIdxPtrs[j] = nil pendingCellsPtrs[j] = nil @@ -233,7 +234,7 @@ proc recover_cells_and_proofs_parallel*( if (now - startTime) > reconstructionTimeout: trace "PeerDAS reconstruction timed out while preparing columns", spawned = spawned, total = blobCount - drainPending(0) + freeAllAllocated() return err("Data column reconstruction timed out") # Allocate unmanaged C buffers and copy data into them @@ -242,12 +243,12 @@ proc recover_cells_and_proofs_parallel*( cellsBytes = csize_t(columnCount) * csize_t(sizeof(Cell)) idxPtr = cast[ptr CellIndex](c_malloc(idxBytes)) if idxPtr == nil: - drainPending(0) + freeAllAllocated() return err("Failed to allocate memory for cell indices during reconstruction") let cellsPtr = cast[ptr Cell](c_malloc(cellsBytes)) if cellsPtr == nil: c_free(idxPtr) - drainPending(0) + freeAllAllocated() return err("Failed to allocate memory for cell data during reconstruction") # populate C buffers via UncheckedArray casts @@ -270,7 +271,11 @@ proc recover_cells_and_proofs_parallel*( if (now2 - startTime) > reconstructionTimeout: trace "PeerDAS reconstruction timed out while awaiting tasks", completed = completed, totalSpawned = spawned - drainPending(completed) + # Free memory for tasks we haven't synced yet + for j in completed ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil return err("Data column reconstruction timed out") let futRes = sync pendingFuts[completed] @@ -279,7 +284,11 @@ proc recover_cells_and_proofs_parallel*( pendingCellsPtrs[completed] = nil if futRes.isErr: - drainPending(completed + 1) + # Free memory for remaining unsynced tasks + for j in completed + 1 ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil return err("KZG cells and proofs recovery failed") res[completed] = futRes.get inc completed @@ -290,7 +299,11 @@ proc recover_cells_and_proofs_parallel*( if (now - startTime) > reconstructionTimeout: trace "PeerDAS reconstruction timed out during final sync", completed = i, totalSpawned = spawned - drainPending(i) + # Free memory for tasks we haven't synced yet + for j in i ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil return err("Data column reconstruction timed out") let futRes = sync pendingFuts[i] @@ -299,15 +312,14 @@ proc recover_cells_and_proofs_parallel*( pendingCellsPtrs[i] = nil if futRes.isErr: - drainPending(i + 1) + # Free memory for remaining unsynced tasks + for j in i + 1 ..< spawned: + freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) + pendingIdxPtrs[j] = nil + pendingCellsPtrs[j] = nil return err("KZG cells and proofs recovery failed") res[i] = futRes.get - # If we spawned fewer than blobCount, spawn-phase timed out earlier - if spawned < blobCount: - drainPending(0) - return err("Data column reconstruction timed out") - ok(res) proc assemble_data_column_sidecars*( From 526037f4c6fa5ca0f8a61aae8b1f176da4b653bb Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Thu, 25 Dec 2025 02:37:56 +0530 Subject: [PATCH 3/5] remove isNil check here --- beacon_chain/spec/peerdas_helpers.nim | 2 -- 1 file changed, 2 deletions(-) diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index 1ac36dcf76..185439d62b 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -213,9 +213,7 @@ proc recover_cells_and_proofs_parallel*( const reconstructionTimeout = 2.seconds proc freePendingPtrPair(idxPtr: ptr CellIndex, cellsPtr: ptr Cell) = - if not idxPtr.isNil: c_free(idxPtr) - if not cellsPtr.isNil: c_free(cellsPtr) proc freeAllAllocated() = From 88efd31db39c9eb7057d72339a022affe06af213 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Thu, 25 Dec 2025 03:11:50 +0530 Subject: [PATCH 4/5] remove commented code --- beacon_chain/nimbus_beacon_node.nim | 4 ---- 1 file changed, 4 deletions(-) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 596ee54baf..25f5b122d4 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1771,10 +1771,6 @@ proc reconstructDataColumns(node: BeaconNode, slot: Slot) = node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS div 2: return - # # Currently, this logic is broken - # if true: - # return - logScope: slot = slot From e5f124304dc90198a954c76d3dc8837a61ee0ab7 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Thu, 25 Dec 2025 17:47:14 +0530 Subject: [PATCH 5/5] drop mallocs --- beacon_chain/spec/peerdas_helpers.nim | 115 +++++++------------------- 1 file changed, 28 insertions(+), 87 deletions(-) diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index 185439d62b..794fd0a363 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -23,7 +23,6 @@ import from std/algorithm import sort from std/sequtils import toSeq from stew/staticfor import staticFor -from system/ansi_c import c_malloc, c_free type CellBytes = array[fulu.CELLS_PER_EXT_BLOB, Cell] @@ -144,8 +143,8 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry], ok(extended_matrix) -proc recoverCellsAndKzgProofsTask(cellIndices: seq[CellIndex], - cells: seq[Cell]): Result[CellsAndProofs, void] = +proc recoverCellsAndKzgProofsTask(cellIndices: openArray[CellIndex], + cells: openArray[Cell]): Result[CellsAndProofs, void] = recoverCellsAndKzgProofs(cellIndices, cells).mapErr( proc (x: string) = discard) @@ -155,7 +154,7 @@ proc recover_cells_and_proofs_parallel*( dataColumns: seq[ref fulu.DataColumnSidecar]): Result[seq[CellsAndProofs], cstring] = ## Recover blobs from data column sidecars in parallel. - ## - Uses unmanaged C buffers for worker inputs so no Nim GC objects + ## - Uses Nim sequences with pointer passing for worker inputs ## - Bounds in-flight tasks to limit peak memory/alloc pressure. ## - Ensures all spawned tasks are awaited (drained) on any early return. @@ -172,58 +171,37 @@ proc recover_cells_and_proofs_parallel*( if blobCount != column.column.len: return err("DataColumns do not have the same length") - # Worker that runs on a taskpool thread. It only sees raw pointers and - # constructs its own worker-local seqs (on the worker's heap) before calling - # the KZG recovery routine. Keeps GC objects thread-local. proc workerRecover(idxPtr: ptr CellIndex, cellsPtr: ptr Cell, columnCount: int): Result[CellsAndProofs, void] = - ## Worker runs on a taskpool thread. It receives raw C buffers (ptr) and - ## converts them into worker-local seqs before calling the KZG recovery - ## routine, so no Nim GC objects cross thread-local heaps. - var - localIndices = newSeq[CellIndex](columnCount) - localCells = newSeq[Cell](columnCount) let idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) - for j in 0 ..< columnCount: - localIndices[j] = idxArr[j] - localCells[j] = cellsArr[j] - # use the task wrapper which maps string errors to void - recoverCellsAndKzgProofsTask(localIndices, localCells) + # Use toOpenArray to create views without copying + recoverCellsAndKzgProofsTask( + idxArr.toOpenArray(0, columnCount - 1), + cellsArr.toOpenArray(0, columnCount - 1)) var pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] = @[] - pendingIdxPtrs: seq[ptr CellIndex] = @[] - pendingCellsPtrs: seq[ptr Cell] = @[] + # Store actual data sequences instead of C pointers + pendingIndices: seq[seq[CellIndex]] = @[] + pendingCells: seq[seq[Cell]] = @[] res = newSeq[CellsAndProofs](blobCount) # pre-size sequences so we can index-assign without reallocs pendingFuts.setLen(blobCount) - pendingIdxPtrs.setLen(blobCount) - pendingCellsPtrs.setLen(blobCount) + pendingIndices.setLen(blobCount) + pendingCells.setLen(blobCount) # track how many we've actually spawned var spawned = 0 - # Choose a sane limit for concurrent tasks to reduce peak memory/alloc pressure. + # Choose a sane limit for concurrent tasks to reduce peak memory pressure. let maxInFlight = min(blobCount, 9) let startTime = Moment.now() const reconstructionTimeout = 2.seconds - proc freePendingPtrPair(idxPtr: ptr CellIndex, cellsPtr: ptr Cell) = - c_free(idxPtr) - c_free(cellsPtr) - - proc freeAllAllocated() = - ## Free all C memory allocated so far, without syncing futures. - ## Only call this on error paths where we're aborting. - for j in 0 ..< spawned: - freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) - pendingIdxPtrs[j] = nil - pendingCellsPtrs[j] = nil - var completed = 0 # ---- Spawn + bounded-await loop ---- @@ -232,35 +210,24 @@ proc recover_cells_and_proofs_parallel*( if (now - startTime) > reconstructionTimeout: trace "PeerDAS reconstruction timed out while preparing columns", spawned = spawned, total = blobCount - freeAllAllocated() return err("Data column reconstruction timed out") - # Allocate unmanaged C buffers and copy data into them - let - idxBytes = csize_t(columnCount) * csize_t(sizeof(CellIndex)) - cellsBytes = csize_t(columnCount) * csize_t(sizeof(Cell)) - idxPtr = cast[ptr CellIndex](c_malloc(idxBytes)) - if idxPtr == nil: - freeAllAllocated() - return err("Failed to allocate memory for cell indices during reconstruction") - let cellsPtr = cast[ptr Cell](c_malloc(cellsBytes)) - if cellsPtr == nil: - c_free(idxPtr) - freeAllAllocated() - return err("Failed to allocate memory for cell data during reconstruction") - - # populate C buffers via UncheckedArray casts - let - idxArr = cast[ptr UncheckedArray[CellIndex]](idxPtr) - cellsArr = cast[ptr UncheckedArray[Cell]](cellsPtr) + # Use regular Nim sequences + var + indices = newSeq[CellIndex](columnCount) + cells = newSeq[Cell](columnCount) + for i in 0 ..< dataColumns.len: - idxArr[i] = dataColumns[i][].index - cellsArr[i] = dataColumns[i][].column[blobIdx] - - # store into pre-sized arrays by index and spawn worker - pendingIdxPtrs[spawned] = idxPtr - pendingCellsPtrs[spawned] = cellsPtr - pendingFuts[spawned] = tp.spawn workerRecover(idxPtr, cellsPtr, columnCount) + indices[i] = dataColumns[i][].index + cells[i] = dataColumns[i][].column[blobIdx] + + # Store sequences and spawn worker with pointers to their data + pendingIndices[spawned] = indices + pendingCells[spawned] = cells + pendingFuts[spawned] = tp.spawn workerRecover( + addr pendingIndices[spawned][0], + addr pendingCells[spawned][0], + columnCount) inc spawned # If too many in-flight tasks, await the oldest one @@ -269,24 +236,11 @@ proc recover_cells_and_proofs_parallel*( if (now2 - startTime) > reconstructionTimeout: trace "PeerDAS reconstruction timed out while awaiting tasks", completed = completed, totalSpawned = spawned - # Free memory for tasks we haven't synced yet - for j in completed ..< spawned: - freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) - pendingIdxPtrs[j] = nil - pendingCellsPtrs[j] = nil return err("Data column reconstruction timed out") let futRes = sync pendingFuts[completed] - freePendingPtrPair(pendingIdxPtrs[completed], pendingCellsPtrs[completed]) - pendingIdxPtrs[completed] = nil - pendingCellsPtrs[completed] = nil if futRes.isErr: - # Free memory for remaining unsynced tasks - for j in completed + 1 ..< spawned: - freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) - pendingIdxPtrs[j] = nil - pendingCellsPtrs[j] = nil return err("KZG cells and proofs recovery failed") res[completed] = futRes.get inc completed @@ -297,24 +251,11 @@ proc recover_cells_and_proofs_parallel*( if (now - startTime) > reconstructionTimeout: trace "PeerDAS reconstruction timed out during final sync", completed = i, totalSpawned = spawned - # Free memory for tasks we haven't synced yet - for j in i ..< spawned: - freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) - pendingIdxPtrs[j] = nil - pendingCellsPtrs[j] = nil return err("Data column reconstruction timed out") let futRes = sync pendingFuts[i] - freePendingPtrPair(pendingIdxPtrs[i], pendingCellsPtrs[i]) - pendingIdxPtrs[i] = nil - pendingCellsPtrs[i] = nil if futRes.isErr: - # Free memory for remaining unsynced tasks - for j in i + 1 ..< spawned: - freePendingPtrPair(pendingIdxPtrs[j], pendingCellsPtrs[j]) - pendingIdxPtrs[j] = nil - pendingCellsPtrs[j] = nil return err("KZG cells and proofs recovery failed") res[i] = futRes.get