-
Notifications
You must be signed in to change notification settings - Fork 11
Implement the RPCBlockHeaderSubscriber
for indexing finalized results
#728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis change introduces support for indexing unsealed finalized execution results ("soft finality") in the EVM Gateway. It adds new configuration flags and experimental features for soft finality and sealing verification, implements new event subscribers and verification logic, and refactors transaction pool handling to support both single and batched transaction submission modes. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Gateway
participant AccessNode
participant Storage
User->>Gateway: Start with soft finality enabled
Gateway->>AccessNode: SubscribeBlockHeadersFromStartHeight
loop For each new finalized block
Gateway->>AccessNode: GetEventsForBlockIDs (EVM events)
Gateway->>Storage: Store unsealed events hash
alt Sealing verification enabled
Gateway->>AccessNode: Get sealed events hash
Gateway->>Storage: Compare and update processed sealed height
end
Gateway->>Gateway: Index events
end
sequenceDiagram
participant User
participant Gateway
participant BatchTxPool
participant FlowNetwork
User->>Gateway: Submit EVM transactions (batch mode)
Gateway->>BatchTxPool: Add(tx)
BatchTxPool-->>BatchTxPool: Group tx by sender
Note over BatchTxPool: On batch interval
BatchTxPool->>FlowNetwork: Submit batch Cadence transaction
FlowNetwork-->>BatchTxPool: Return result
BatchTxPool->>Gateway: Notify on success/error
Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (1.64.8)level=warning msg="[runner] Can't run linter goanalysis_metalinter: buildir: failed to load package crypto: could not load export data: no export data for "github.com/onflow/crypto"" ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
10a8bdd
to
4fd4a5d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a couple small comments, but otherwise this looks good. let's get it running against one of the live networks.
@peterargue Thanks for the review 🙌 I've addressed all the comments. |
…ersFromStartHeight
…or function resuse
314e7cc
to
bfe6188
Compare
Co-authored-by: Peter Argue <[email protected]>
…educe blocking time of Add()
Co-authored-by: Leo Zhang <[email protected]>
…ameAccount Co-authored-by: Leo Zhang <[email protected]>
…atch-run-transactions-back-port-soft-finality
…k-port-soft-finality [Backport] Implement `BatchTxPool` to handle nonce mismatch issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
services/requester/tx_pool.go (2)
12-14
: Consider making the regex more robust.The current regex assumes the error message ends with a newline, which might not always be the case.
const ( - evmErrorRegex = `evm_error=(.*)\n` + evmErrorRegex = `evm_error=([^\n]+)` )This pattern will capture everything after "evm_error=" up to but not including a newline, making it more flexible.
26-34
: Consider pre-compiling the regex for better performance.The regex is compiled on every function call. For better performance, especially under high load, consider compiling it once.
+var evmErrorPattern = regexp.MustCompile(evmErrorRegex) + func parseInvalidError(err error) (error, bool) { - r := regexp.MustCompile(evmErrorRegex) - matches := r.FindStringSubmatch(err.Error()) + matches := evmErrorPattern.FindStringSubmatch(err.Error()) if len(matches) != 2 { return nil, false } return errs.NewFailedTransactionError(matches[1]), true }tests/tx_batching_test.go (2)
111-112
: Consider reducing test flakiness from timing dependencies.The conditional sleep based on index could lead to flaky tests if the batch interval timing changes. Consider using a more deterministic approach or waiting for specific events.
Instead of fixed sleep times, consider waiting for transaction pool events or using a mock time source for more deterministic testing.
322-327
: Consider extracting test addresses to constants or helper function.The hardcoded test addresses and private keys could be generated dynamically or moved to test constants for better maintainability.
Consider creating a helper function to generate test EOA addresses and keys:
func generateTestEOAs(count int) map[common.Address]string { // Generate deterministic test addresses and keys }services/requester/batch_tx_pool.go (1)
109-111
: Consider adding batch size limits per sender.The current implementation can accumulate unlimited transactions per sender, which could lead to memory issues or oversized Flow transactions.
Consider adding a check and handling for maximum batch size:
const maxBatchSizePerSender = 100 // configurable userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} if len(t.pooledTxs[from]) >= maxBatchSizePerSender { // Either reject the transaction or trigger immediate batch submission return fmt.Errorf("batch size limit reached for sender %s", from.Hex()) } t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
go.sum
is excluded by!**/*.sum
tests/go.sum
is excluded by!**/*.sum
📒 Files selected for processing (14)
bootstrap/bootstrap.go
(6 hunks)cmd/run/cmd.go
(3 hunks)config/config.go
(1 hunks)go.mod
(4 hunks)services/requester/batch_tx_pool.go
(1 hunks)services/requester/cadence/batch_run.cdc
(1 hunks)services/requester/pool.go
(0 hunks)services/requester/requester.go
(5 hunks)services/requester/single_tx_pool.go
(1 hunks)services/requester/tx_pool.go
(1 hunks)tests/go.mod
(4 hunks)tests/helpers.go
(4 hunks)tests/integration_test.go
(1 hunks)tests/tx_batching_test.go
(1 hunks)
💤 Files with no reviewable changes (1)
- services/requester/pool.go
✅ Files skipped from review due to trivial changes (1)
- tests/go.mod
🚧 Files skipped from review as they are similar to previous changes (4)
- config/config.go
- bootstrap/bootstrap.go
- go.mod
- cmd/run/cmd.go
🔇 Additional comments (13)
tests/integration_test.go (1)
558-558
: LGTM! Excellent resource management improvement.The addition of explicit cleanup for both boot instances ensures proper resource management and prevents potential leaks. The deferred cleanup with nil-check is well-structured and follows Go best practices.
Also applies to: 565-569
tests/helpers.go (3)
95-96
: Configuration changes look good.Enabling EVM setup while disabling VM bridge aligns with the PR's focus on EVM functionality.
246-258
: Proper COA initialization with existence checks.Good defensive programming - checking for existing resources before creating new ones prevents runtime errors and follows Cadence best practices.
468-481
: Clean implementation of getBalance RPC method.The method follows the existing pattern and properly handles the JSON response with appropriate type conversion.
services/requester/requester.go (2)
38-39
: Proper embedding of batch run script.Good use of go:embed directive for the new batch transaction script.
237-239
: Clean delegation to transaction pool.The simplified implementation properly delegates transaction submission to the pool while maintaining all necessary validation.
tests/tx_batching_test.go (2)
128-164
: Excellent test coverage for nonce validation errors.The test properly validates both "nonce too low" and "nonce too high" scenarios, and correctly asserts that the batch fails with the appropriate error message.
347-382
: Well-structured concurrent testing with proper synchronization.Excellent use of errgroup for managing concurrent goroutines and proper error propagation. The random delays effectively simulate real-world concurrent transaction submissions.
services/requester/single_tx_pool.go (3)
154-198
: Well-implemented transaction building with proper resource management.The method correctly handles keystore locking, metric updates, and error scenarios. The pattern of taking a key and returning it on error is particularly good.
200-226
: Efficient concurrent fetching implementation.Good use of errgroup for parallel fetching of block and account information, which optimizes the transaction preparation time.
117-121
: Fix defer placement to avoid deleting non-existent entries.The
defer t.pool.Delete(tx.Hash())
statement will execute even if an error occurs before the transaction is stored in the pool. This could lead to unnecessary deletion attempts on non-existent entries.Move the defer statement after the Store operation:
if t.config.TxStateValidation == config.TxSealValidation { // add to pool and delete after transaction is sealed or errored out t.pool.Store(tx.Hash(), tx) - defer t.pool.Delete(tx.Hash()) + defer t.pool.Delete(tx.Hash())Actually, the better approach would be to restructure the code to ensure defer is only set after successful storage:
if t.config.TxStateValidation == config.TxSealValidation { - // add to pool and delete after transaction is sealed or errored out - t.pool.Store(tx.Hash(), tx) - defer t.pool.Delete(tx.Hash()) + // add to pool + t.pool.Store(tx.Hash(), tx) + // ensure cleanup after transaction is sealed or errored out + defer func() { + t.pool.Delete(tx.Hash()) + }()Likely an incorrect or invalid review comment.
services/requester/batch_tx_pool.go (2)
131-138
: Excellent concurrent design with atomic map swapping.The pattern of atomically swapping the pooled transactions map allows new transactions to be added without blocking during batch submission. This is a well-designed approach for high-throughput scenarios.
165-169
: Critical nonce sorting ensures transaction order preservation.Excellent implementation of nonce-based sorting to maintain transaction order within a batch. This is essential for preventing nonce-related errors.
for txResult in txResults { | ||
assert( | ||
txResult.status == EVM.Status.failed || txResult.status == EVM.Status.successful, | ||
message: "evm_error=".concat(txResult.errorMessage).concat("\n") | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
The loop is redundant - it will always fail on the first iteration.
Since this code is only reached when all transactions have invalid status (neither failed nor successful), the assertion txResult.status == EVM.Status.failed || txResult.status == EVM.Status.successful
will always be false on the first iteration, causing the transaction to fail immediately. The loop structure is unnecessary.
Consider simplifying to directly assert on the first transaction result:
- for txResult in txResults {
- assert(
- txResult.status == EVM.Status.failed || txResult.status == EVM.Status.successful,
- message: "evm_error=".concat(txResult.errorMessage).concat("\n")
- )
- }
+ assert(
+ false,
+ message: "evm_error=".concat(txResults[0].errorMessage).concat("\n")
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for txResult in txResults { | |
assert( | |
txResult.status == EVM.Status.failed || txResult.status == EVM.Status.successful, | |
message: "evm_error=".concat(txResult.errorMessage).concat("\n") | |
) | |
} | |
assert( | |
false, | |
message: "evm_error=".concat(txResults[0].errorMessage).concat("\n") | |
) |
🤖 Prompt for AI Agents
In services/requester/cadence/batch_run.cdc around lines 28 to 33, the for loop
asserting each txResult's status is redundant because the code path is only
reached when all transactions have invalid statuses, causing the assertion to
fail immediately on the first iteration. Replace the loop with a single
assertion on the first txResult's status to simplify the code and avoid
unnecessary iteration.
// buildTransaction creates a Cadence transaction from the provided script, | ||
// with the given arguments and signs it with the configured COA account. | ||
func (t *BatchTxPool) buildTransaction( | ||
latestBlock *flow.Block, | ||
account *flow.Account, | ||
script []byte, | ||
args ...cadence.Value, | ||
) (*flow.Transaction, error) { | ||
defer func() { | ||
t.collector.AvailableSigningKeys(t.keystore.AvailableKeys()) | ||
}() | ||
|
||
flowTx := flow.NewTransaction(). | ||
SetScript(script). | ||
SetReferenceBlockID(latestBlock.ID). | ||
SetComputeLimit(flowGo.DefaultMaxTransactionGasLimit) | ||
|
||
for _, arg := range args { | ||
if err := flowTx.AddArgument(arg); err != nil { | ||
return nil, fmt.Errorf("failed to add argument: %s, with %w", arg, err) | ||
} | ||
} | ||
|
||
// building and signing transactions should be blocking, | ||
// so we don't have keys conflict | ||
t.mux.Lock() | ||
defer t.mux.Unlock() | ||
|
||
accKey, err := t.keystore.Take() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if err := accKey.SetProposerPayerAndSign(flowTx, account); err != nil { | ||
accKey.Done() | ||
return nil, err | ||
} | ||
|
||
// now that the transaction is prepared, store the transaction's metadata | ||
accKey.SetLockMetadata(flowTx.ID(), latestBlock.Height) | ||
|
||
t.collector.OperatorBalance(account) | ||
|
||
return flowTx, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove duplicate buildTransaction method.
This method is an exact duplicate of the one in SingleTxPool
. Since BatchTxPool
embeds SingleTxPool
, the embedded method can be used directly.
Remove this entire method and use the embedded one from SingleTxPool
. If you need to override behavior, consider calling the parent method:
-// buildTransaction creates a Cadence transaction from the provided script,
-// with the given arguments and signs it with the configured COA account.
-func (t *BatchTxPool) buildTransaction(
- latestBlock *flow.Block,
- account *flow.Account,
- script []byte,
- args ...cadence.Value,
-) (*flow.Transaction, error) {
- // ... entire method body ...
-}
The embedded SingleTxPool.buildTransaction
method will be accessible directly on the BatchTxPool
instance.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// buildTransaction creates a Cadence transaction from the provided script, | |
// with the given arguments and signs it with the configured COA account. | |
func (t *BatchTxPool) buildTransaction( | |
latestBlock *flow.Block, | |
account *flow.Account, | |
script []byte, | |
args ...cadence.Value, | |
) (*flow.Transaction, error) { | |
defer func() { | |
t.collector.AvailableSigningKeys(t.keystore.AvailableKeys()) | |
}() | |
flowTx := flow.NewTransaction(). | |
SetScript(script). | |
SetReferenceBlockID(latestBlock.ID). | |
SetComputeLimit(flowGo.DefaultMaxTransactionGasLimit) | |
for _, arg := range args { | |
if err := flowTx.AddArgument(arg); err != nil { | |
return nil, fmt.Errorf("failed to add argument: %s, with %w", arg, err) | |
} | |
} | |
// building and signing transactions should be blocking, | |
// so we don't have keys conflict | |
t.mux.Lock() | |
defer t.mux.Unlock() | |
accKey, err := t.keystore.Take() | |
if err != nil { | |
return nil, err | |
} | |
if err := accKey.SetProposerPayerAndSign(flowTx, account); err != nil { | |
accKey.Done() | |
return nil, err | |
} | |
// now that the transaction is prepared, store the transaction's metadata | |
accKey.SetLockMetadata(flowTx.ID(), latestBlock.Height) | |
t.collector.OperatorBalance(account) | |
return flowTx, nil | |
} | |
// (entire buildTransaction method removed; use SingleTxPool.buildTransaction via embedding) |
🤖 Prompt for AI Agents
In services/requester/batch_tx_pool.go from lines 200 to 244, remove the entire
buildTransaction method since it is an exact duplicate of the method in
SingleTxPool, which BatchTxPool already embeds. Use the embedded
SingleTxPool.buildTransaction method directly on BatchTxPool instances. If any
behavior override is needed, call the embedded method instead of duplicating it.
txMux: sync.Mutex{}, | ||
} | ||
|
||
go batchPool.processPooledTransactions(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add panic recovery for background goroutine.
The background goroutine processing transactions could panic and terminate batching functionality. Add panic recovery to ensure resilience.
-go batchPool.processPooledTransactions(ctx)
+go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ logger.Error().Interface("panic", r).Msg("panic in batch transaction processor")
+ // Optionally restart the goroutine or implement a circuit breaker
+ }
+ }()
+ batchPool.processPooledTransactions(ctx)
+}()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
go batchPool.processPooledTransactions(ctx) | |
go func() { | |
defer func() { | |
if r := recover(); r != nil { | |
logger.Error().Interface("panic", r).Msg("panic in batch transaction processor") | |
// Optionally restart the goroutine or implement a circuit breaker | |
} | |
}() | |
batchPool.processPooledTransactions(ctx) | |
}() |
🤖 Prompt for AI Agents
In services/requester/batch_tx_pool.go at line 76, the background goroutine
started with go batchPool.processPooledTransactions(ctx) lacks panic recovery.
Wrap the call to processPooledTransactions in an anonymous function that defers
a recover function to catch and handle any panics, preventing the goroutine from
crashing and ensuring the batching functionality remains resilient.
Closes: #727
Description
For contributor use:
master
branchFiles changed
in the Github PR explorerSummary by CodeRabbit