Skip to content

Commit 24c09a0

Browse files
gupadhyayaGanesha Upadhyaya
andauthored
only remove successfully submitted blocks from the pending blocks instead of reset (#1376) (#1394)
cherry-picking <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced block submission process with updated logic for handling and caching submitted blocks. - Improved block management with a new method for removing specific submitted blocks. - **Refactor** - Transitioned `PendingBlocks` storage from a slice to a map for optimized block handling. - Streamlined the retrieval of pending blocks with a new sorting mechanism based on block height. - **Tests** - Added comprehensive test cases for `PendingBlocks` to ensure correct functionality when adding and removing blocks. - Introduced a new integration test to verify the successful submission of blocks in a simulated environment. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Ganesha Upadhyaya <[email protected]>
1 parent 028074c commit 24c09a0

File tree

4 files changed

+134
-11
lines changed

4 files changed

+134
-11
lines changed

block/manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,11 @@ func (m *Manager) submitBlocksToDA(ctx context.Context) error {
746746
if int(res.SubmittedCount) == len(blocks) {
747747
submitted = true
748748
}
749-
m.pendingBlocks.resetPendingBlocks(res.SubmittedCount)
749+
submittedBlocks := blocks[:res.SubmittedCount]
750+
for _, block := range submittedBlocks {
751+
m.blockCache.setDAIncluded(block.Hash().String())
752+
}
753+
m.pendingBlocks.removeSubmittedBlocks(submittedBlocks)
750754
case da.StatusError, da.StatusNotFound, da.StatusUnknown:
751755
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
752756
time.Sleep(backoff)

block/pending_blocks.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,57 @@
11
package block
22

33
import (
4+
"sort"
45
"sync"
56

67
"github.com/rollkit/rollkit/types"
78
)
89

910
// PendingBlocks maintains blocks that need to be published to DA layer
1011
type PendingBlocks struct {
11-
pendingBlocks []*types.Block
12+
pendingBlocks map[uint64]*types.Block
1213
mtx *sync.RWMutex
1314
}
1415

1516
// NewPendingBlocks returns a new PendingBlocks struct
1617
func NewPendingBlocks() *PendingBlocks {
1718
return &PendingBlocks{
18-
pendingBlocks: make([]*types.Block, 0),
19+
pendingBlocks: make(map[uint64]*types.Block),
1920
mtx: new(sync.RWMutex),
2021
}
2122
}
2223

24+
// getPendingBlocks returns a sorted slice of pending blocks
25+
// that need to be published to DA layer in order of block height
2326
func (pb *PendingBlocks) getPendingBlocks() []*types.Block {
2427
pb.mtx.RLock()
2528
defer pb.mtx.RUnlock()
26-
return pb.pendingBlocks
29+
blocks := make([]*types.Block, 0, len(pb.pendingBlocks))
30+
for _, block := range pb.pendingBlocks {
31+
blocks = append(blocks, block)
32+
}
33+
sort.Slice(blocks, func(i, j int) bool {
34+
return blocks[i].Height() < blocks[j].Height()
35+
})
36+
return blocks
2737
}
2838

2939
func (pb *PendingBlocks) isEmpty() bool {
30-
pendingBlocks := pb.getPendingBlocks()
31-
return len(pendingBlocks) == 0
40+
pb.mtx.RLock()
41+
defer pb.mtx.RUnlock()
42+
return len(pb.pendingBlocks) == 0
3243
}
3344

3445
func (pb *PendingBlocks) addPendingBlock(block *types.Block) {
3546
pb.mtx.Lock()
3647
defer pb.mtx.Unlock()
37-
pb.pendingBlocks = append(pb.pendingBlocks, block)
48+
pb.pendingBlocks[block.Height()] = block
3849
}
3950

40-
func (pb *PendingBlocks) resetPendingBlocks(submitted uint64) {
51+
func (pb *PendingBlocks) removeSubmittedBlocks(blocks []*types.Block) {
4152
pb.mtx.Lock()
4253
defer pb.mtx.Unlock()
43-
if submitted > uint64(len(pb.pendingBlocks)) {
44-
submitted = uint64(len(pb.pendingBlocks))
54+
for _, block := range blocks {
55+
delete(pb.pendingBlocks, block.Height())
4556
}
46-
pb.pendingBlocks = pb.pendingBlocks[submitted:]
4757
}

block/pending_blocks_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package block
2+
3+
import (
4+
"sort"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/rollkit/rollkit/types"
10+
)
11+
12+
func TestGetPendingBlocks(t *testing.T) {
13+
require := require.New(t)
14+
pb := NewPendingBlocks()
15+
for i := uint64(0); i < 5; i++ {
16+
pb.addPendingBlock(types.GetRandomBlock(i, 0))
17+
}
18+
blocks := pb.getPendingBlocks()
19+
require.True(sort.SliceIsSorted(blocks, func(i, j int) bool {
20+
return blocks[i].Height() < blocks[j].Height()
21+
}))
22+
}
23+
24+
func TestRemoveSubmittedBlocks(t *testing.T) {
25+
require := require.New(t)
26+
pb := NewPendingBlocks()
27+
for i := uint64(0); i < 5; i++ {
28+
pb.addPendingBlock(types.GetRandomBlock(i, 0))
29+
}
30+
blocks := pb.getPendingBlocks()
31+
pb.removeSubmittedBlocks(blocks)
32+
require.True(pb.isEmpty())
33+
}
34+
35+
func TestRemoveSubsetOfBlocks(t *testing.T) {
36+
require := require.New(t)
37+
pb := NewPendingBlocks()
38+
for i := uint64(0); i < 5; i++ {
39+
pb.addPendingBlock(types.GetRandomBlock(i, 0))
40+
}
41+
// Remove blocks with height 1 and 2
42+
pb.removeSubmittedBlocks([]*types.Block{
43+
types.GetRandomBlock(1, 0),
44+
types.GetRandomBlock(2, 0),
45+
})
46+
remainingBlocks := pb.getPendingBlocks()
47+
require.Len(remainingBlocks, 3, "There should be 3 blocks remaining")
48+
for _, block := range remainingBlocks {
49+
require.Contains([]uint64{0, 3, 4}, block.Height(), "Only blocks with height 0, 3, and 4 should remain")
50+
}
51+
}
52+
53+
func TestRemoveAllBlocksAndVerifyEmpty(t *testing.T) {
54+
require := require.New(t)
55+
pb := NewPendingBlocks()
56+
for i := uint64(0); i < 5; i++ {
57+
pb.addPendingBlock(types.GetRandomBlock(i, 0))
58+
}
59+
// Remove all blocks
60+
pb.removeSubmittedBlocks(pb.getPendingBlocks())
61+
require.True(pb.isEmpty(), "PendingBlocks should be empty after removing all blocks")
62+
}
63+
64+
func TestRemoveBlocksFromEmptyPendingBlocks(t *testing.T) {
65+
require := require.New(t)
66+
pb := NewPendingBlocks()
67+
// Attempt to remove blocks from an empty PendingBlocks
68+
require.NotPanics(func() {
69+
pb.removeSubmittedBlocks([]*types.Block{
70+
types.GetRandomBlock(1, 0),
71+
types.GetRandomBlock(2, 0),
72+
})
73+
}, "Removing blocks from an empty PendingBlocks should not cause a panic")
74+
}

node/full_node_integration_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,41 @@ func TestHeaderExchange(t *testing.T) {
370370
t.Run("SingleAggregatorSingleFullNodeSingleLightNode", testSingleAggregatorSingleFullNodeSingleLightNode)
371371
}
372372

373+
func TestSubmitBlocksToDA(t *testing.T) {
374+
require := require.New(t)
375+
376+
clientNodes := 1
377+
ctx, cancel := context.WithCancel(context.Background())
378+
defer cancel()
379+
nodes, _ := createNodes(
380+
ctx,
381+
context.Background(),
382+
clientNodes,
383+
config.BlockManagerConfig{
384+
DABlockTime: 20 * time.Millisecond,
385+
BlockTime: 10 * time.Millisecond,
386+
},
387+
t,
388+
)
389+
seq := nodes[0]
390+
require.NoError(seq.Start())
391+
defer func() {
392+
require.NoError(seq.Stop())
393+
}()
394+
395+
timer := time.NewTimer(5 * seq.nodeConfig.DABlockTime)
396+
<-timer.C
397+
398+
numberOfBlocksToSyncTill := seq.Store.Height()
399+
400+
//Make sure all produced blocks made it to DA
401+
for i := uint64(1); i <= numberOfBlocksToSyncTill; i++ {
402+
block, err := seq.Store.GetBlock(i)
403+
require.NoError(err)
404+
require.True(seq.blockManager.IsDAIncluded(block.Hash()), block.Height())
405+
}
406+
}
407+
373408
func testSingleAggregatorSingleFullNode(t *testing.T, source Source) {
374409
require := require.New(t)
375410

0 commit comments

Comments
 (0)