-
Notifications
You must be signed in to change notification settings - Fork 213
Improve BFT resilience of BlockBuffer stores #8196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 23 commits
e6b4e8c
880557b
09eb86c
46000ac
d54e974
9530e1e
7235f7a
e3acbf0
56be6f0
760434b
99b181f
3ab8dd5
a4529e8
b281bc0
5dbfe91
39f010a
49a196b
b3b1460
beec64c
e813f5c
e1bed68
718a820
846a5f8
292a175
acccf49
7f89681
e21fa82
37c9569
8fb4c54
5108c97
d350dbe
df61acd
5279647
b001e9b
3326e63
cb267a5
cffc38d
ad038a3
ac0cdb2
bee053b
9e7ec7e
923770b
b84bd5e
5cf0bf0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,7 +100,10 @@ func NewCore( | |
| if err != nil { | ||
| return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err) | ||
| } | ||
| c.ProcessFinalizedBlock(final) | ||
| err = c.ProcessFinalizedBlock(final) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("could not process finalized block: %w", err) | ||
| } | ||
|
|
||
| c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) | ||
|
|
||
|
|
@@ -200,7 +203,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { | |
| _, found := c.pending.ByID(header.ParentID) | ||
| if found { | ||
| // add the block to the cache | ||
| _ = c.pending.Add(proposal) | ||
| if err := c.pending.Add(proposal); err != nil { | ||
| if mempool.IsBeyondActiveRangeError(err) { | ||
| // In general we expect the block buffer to use SkipNewProposalsThreshold, | ||
| // however since it is instantiated outside this component, we allow the thresholds to differ | ||
| log.Debug().Err(err).Msg("dropping block beyond block buffer active range") | ||
| return nil | ||
| } | ||
| return fmt.Errorf("could not add proposal to pending buffer: %w", err) | ||
| } | ||
| c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) | ||
|
|
||
| return nil | ||
|
|
@@ -214,7 +225,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error { | |
| return fmt.Errorf("could not check parent exists: %w", err) | ||
| } | ||
| if !exists { | ||
| _ = c.pending.Add(proposal) | ||
| if err := c.pending.Add(proposal); err != nil { | ||
| if mempool.IsBeyondActiveRangeError(err) { | ||
| // In general we expect the block buffer to use SkipNewProposalsThreshold, | ||
| // however since it is instantiated outside this component, we allow the thresholds to differ | ||
| log.Debug().Err(err).Msg("dropping block beyond block buffer active range") | ||
| return nil | ||
| } | ||
| return fmt.Errorf("could not add proposal to pending buffer: %w", err) | ||
| } | ||
|
Comment on lines
+299
to
+307
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Find PendingBlockBuffer interface definition
echo "=== Searching for PendingBlockBuffer interface ==="
rg -n --type=go "type PendingBlockBuffer" -A 20
echo -e "\n=== Searching for mempool error types ==="
rg -n --type=go "(IsBeyondActiveRangeError|IsOutOfBoundsError|IsInvalidError)" -A 2 -B 2
echo -e "\n=== Checking PendingBlockBuffer.Add implementations ==="
rg -n --type=go "func.*\(.*\).*Add.*Slashable.*Proposal.*error" -A 15
echo -e "\n=== Searching for mempool package errors ==="
fd -e go "mempool" | head -20 | xargs grep -l "error\|Error" 2>/dev/nullRepository: onflow/flow-go Length of output: 8667 🏁 Script executed: #!/bin/bash
echo "=== Checking irrecoverable package usage in compliance components ==="
rg -n --type=go "irrecoverable" engine/consensus/compliance/ -B 2 -A 2
echo -e "\n=== Checking error handling patterns in core.go ==="
rg -n --type=go "return fmt.Errorf|return irrecoverable" engine/consensus/compliance/core.go -B 1 -A 1
echo -e "\n=== Checking how BeyondActiveRangeError should be handled ==="
rg -n --type=go "log.*BeyondActiveRange|return nil" module/buffer/pending_blocks_test.go -B 2 -A 2
echo -e "\n=== Check mempool errors package ==="
cat -n module/mempool/errors.go | head -100Repository: onflow/flow-go Length of output: 6612 Verify error handling aligns with coding guidelines. All error types from
Per guidelines, either escalate the error through the |
||
| c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) | ||
|
|
||
| c.sync.RequestBlock(header.ParentID, header.Height-1) | ||
|
|
@@ -300,9 +319,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*flow.Proposal | |
| } | ||
| } | ||
|
|
||
| // drop all the children that should have been processed now | ||
| c.pending.DropForParent(blockID) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -392,14 +408,19 @@ func (c *Core) processBlockProposal(proposal *flow.Proposal) error { | |
|
|
||
| // ProcessFinalizedBlock performs pruning of stale data based on finalization event | ||
| // removes pending blocks below the finalized view | ||
| func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) { | ||
| // No errors are expected during normal operation. | ||
| func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error { | ||
| // remove all pending blocks at or below the finalized view | ||
| c.pending.PruneByView(finalized.View) | ||
| err := c.pending.PruneByView(finalized.View) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| c.finalizedHeight.Set(finalized.Height) | ||
| c.finalizedView.Set(finalized.View) | ||
|
|
||
| // always record the metric | ||
| c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size()) | ||
| return nil | ||
| } | ||
|
|
||
| // checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -200,7 +200,7 @@ func (cs *CommonSuite) SetupTest() { | |
|
|
||
| // set up pending module mock | ||
| cs.pending = &module.PendingBlockBuffer{} | ||
| cs.pending.On("Add", mock.Anything, mock.Anything).Return(true) | ||
| cs.pending.On("Add", mock.Anything, mock.Anything) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing mock return value for The 🔎 Proposed fix- cs.pending.On("Add", mock.Anything, mock.Anything)
+ cs.pending.On("Add", mock.Anything).Return(nil)Note: The method signature also appears to have changed to accept a single argument ( 🤖 Prompt for AI Agents |
||
| cs.pending.On("ByID", mock.Anything).Return( | ||
| func(blockID flow.Identifier) flow.Slashable[*flow.Proposal] { | ||
| return cs.pendingDB[blockID] | ||
|
|
@@ -219,9 +219,8 @@ func (cs *CommonSuite) SetupTest() { | |
| return ok | ||
| }, | ||
| ) | ||
| cs.pending.On("DropForParent", mock.Anything).Return() | ||
| cs.pending.On("Size").Return(uint(0)) | ||
| cs.pending.On("PruneByView", mock.Anything).Return() | ||
| cs.pending.On("PruneByView", mock.Anything).Return(nil) | ||
|
|
||
| // set up hotstuff module mock | ||
| cs.hotstuff = module.NewHotStuff(cs.T()) | ||
|
|
@@ -565,9 +564,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() { | |
| Message: proposal0, | ||
| }) | ||
| require.NoError(cs.T(), err, "should pass handling children") | ||
|
|
||
| // make sure we drop the cache after trying to process | ||
| cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID()) | ||
| } | ||
|
|
||
| func (cs *CoreSuite) TestProposalBufferingOrder() { | ||
|
|
@@ -588,7 +584,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() { | |
| } | ||
|
|
||
| // replace the engine buffer with the real one | ||
| cs.core.pending = real.NewPendingBlocks() | ||
| cs.core.pending = real.NewPendingBlocks(cs.head.View, 100_000) | ||
|
|
||
| // check that we request the ancestor block each time | ||
| cs.sync.On("RequestBlock", missingBlock.ID(), missingBlock.Height).Times(len(proposals)) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this line is causing the integration test failure. We assume that
c.pendingonly includes blocks that have not been processed and added to our local state. However, with the change in this PR, we retain blocks until their view is finalized.Specific problematic case:
There are blocks like
A<-B<-C.flow-go/engine/consensus/compliance/core.go
Line 203 in e813f5c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
37c9569