-
Notifications
You must be signed in to change notification settings - Fork 14
Implement the new transactionReceipts subscription API
#917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds a TransactionReceipts streaming subscription, threads a receipts publisher through bootstrap → ingestion → API, publishes receipts in batches during event processing, validates tx-hash filters (limit 200), marshals receipts for notifications, and adds WebSocket integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant EventSource as Event Source
participant Ingestion as Ingestion Engine
participant Pub as Receipts Publisher
participant API as Stream API
participant Client as Subscriber (WS)
EventSource->>Ingestion: emit block, txs, receipts
Ingestion->>Pub: publish([]*Receipt) %% publish batch
Pub->>API: notify(receipt batch)
Client->>API: TransactionReceipts(ctx, filter)
API->>API: validate txHashes count (<=200)
API->>Pub: subscribe
alt on batch notification
Pub->>API: batch notification
API->>API: optionally filter by txHashes
API->>API: fetch txs, marshal receipts
API->>Client: send batch notification
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
transactionReceipts subscription API
ee75cc9 to
f9e2865
Compare
4925450 to
0bd1756
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
api/stream.go (1)
158-176: Consider precomputing hash lookups.Right now each receipt does an
O(len(txHashes))slices.Contains. With the 200-hash cap that’s fine, but a tiny helper that builds amap[common.Hash]struct{}up front would make the loop constant time and future-proof us if the cap ever grows.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
api/stream.go(6 hunks)bootstrap/bootstrap.go(4 hunks)models/errors/errors.go(1 hunks)models/events.go(1 hunks)services/ingestion/engine.go(4 hunks)services/ingestion/engine_test.go(5 hunks)tests/web3js/eth_streaming_test.js(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-19T11:36:25.478Z
Learnt from: m-Peter
Repo: onflow/flow-evm-gateway PR: 831
File: services/requester/batch_tx_pool.go:0-0
Timestamp: 2025-06-19T11:36:25.478Z
Learning: In Go, when copying maps that contain slices (like `map[gethCommon.Address][]pooledEvmTx`), perform deep copies by iterating over the map and copying each slice individually using `make()` and `copy()` to avoid shared references that could lead to race conditions and data corruption.
Applied to files:
api/stream.go
🧬 Code graph analysis (5)
tests/web3js/eth_streaming_test.js (2)
tests/web3js/config.js (2)
require(1-1)web3(2-2)tests/web3js/helpers.js (4)
signedTx(66-66)signedTx(79-79)conf(2-2)web3(5-5)
services/ingestion/engine_test.go (2)
models/stream.go (1)
NewPublisher(14-19)models/receipt.go (1)
Receipt(18-36)
api/stream.go (4)
models/stream.go (2)
Publisher(9-12)Subscription(49-53)models/receipt.go (1)
Receipt(18-36)models/errors/errors.go (1)
ErrExceedMaxTxHashes(20-20)eth/types/types.go (1)
MarshalReceipt(514-578)
services/ingestion/engine.go (4)
storage/index.go (1)
ReceiptIndexer(61-82)models/stream.go (1)
Publisher(9-12)models/receipt.go (1)
Receipt(18-36)storage/pebble/receipts.go (1)
Receipts(20-22)
bootstrap/bootstrap.go (3)
storage/pebble/receipts.go (1)
Receipts(20-22)models/stream.go (2)
Publisher(9-12)NewPublisher(14-19)models/receipt.go (1)
Receipt(18-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test
🔇 Additional comments (6)
models/events.go (1)
80-83: Nice alignment with upstream log schema.Adding
BlockTimestampto each log keeps our receipts consistent with go-ethereum’s v1.16.x payloads that already shipblockTimestamp, so downstream consumers don’t need an extra block lookup. (chainrelease.info)models/errors/errors.go (1)
19-21: Error surface matches the new subscription contract.Exporting
ErrExceedMaxTxHashesgives the API layer a precise failure mode when enforcing the hash cap.services/ingestion/engine_test.go (1)
74-83: Tests updated for the extra publisher.Thanks for threading the receipts publisher through the test harness—this guards the constructor signature change right away.
services/ingestion/engine.go (1)
208-214: Receipts published post-commit.Emitting through
receiptsPublisheronly afterindexEvents(and the batch commit) keeps subscribers from racing partially persisted data—good call.tests/web3js/eth_streaming_test.js (1)
166-204: Coverage for hash-filtered subscriptions.Great to see the second leg asserting that only the requested hashes surface—this will catch regressions in the filtering logic quickly.
bootstrap/bootstrap.go (1)
58-58: LGTM! Clean wiring of receipts publisher throughout the bootstrap.The receipts support is correctly threaded through all layers:
- Storage and publisher infrastructure added to core structs
- Publisher initialized in the constructor
- Properly wired to both ingestion engine (for publishing) and stream API (for subscriptions)
The implementation follows the established pattern for blocks, transactions, and logs, ensuring consistency across the codebase.
Also applies to: 65-65, 109-109, 204-204, 344-344
0bd1756 to
db4c441
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/web3js/eth_streaming_test.js (1)
73-165: Strengthen receipt validation and reduce flakiness in the firsttransactionReceiptsscenarioThe review comment is correct. Issues verified:
- Missing receipt match assertion (lines 167–180): The inner loop iterates over
receiptsbut never asserts that a matching receipt was found for eachtxHash. If the stream omits or mislabels receipts but still returns 10 items, the test silently passes. Add afoundflag and assertion:for (let txHash of sentHashes) { let txReceipt = await helpers.callRPCMethod( 'eth_getTransactionReceipt', [txHash] ) + let found = false for (let rcp of receipts) { if (rcp.transactionHash == txHash) { assert.deepEqual(rcp, txReceipt.body['result']) + found = true + break } } + assert.isTrue(found, `expected streamed receipt for tx ${txHash}`) }
Fixed
setTimeouttiming (line 116): Relying on a hardcoded 1-second delay before asserting onreceiptscan be brittle. Replace with a polling approach that waits untilreceipts.lengthreaches the expected count (with timeout protection).Magic number 10 (line 164): Express as
2 * testValues.lengthto clarify the "two receipts per transaction" invariant.
🧹 Nitpick comments (1)
tests/web3js/eth_streaming_test.js (1)
166-220: FilteredtransactionReceiptssubscription scenario looks solid, minor clean‑up optionalUsing a second subscription with an explicit
transactionHashesset and asserting a single streamed receipt that matcheseth_getTransactionReceiptexercises the filter path well.If you want to tighten things further, you could:
- Unsubscribe or otherwise retire the first
transactionReceiptssubscription to avoid extra unseen traffic on the same socket.- Optionally wait for
subID(the subscription ack) before sending the signed transaction, to rule out any theoretical race between subscription activation and first notification.Functionally, though, this block is correct as-is.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
api/stream.go(6 hunks)bootstrap/bootstrap.go(4 hunks)models/errors/errors.go(1 hunks)models/events.go(1 hunks)services/ingestion/engine.go(4 hunks)services/ingestion/engine_test.go(5 hunks)tests/web3js/eth_streaming_test.js(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- services/ingestion/engine.go
- services/ingestion/engine_test.go
- models/events.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-19T11:36:25.478Z
Learnt from: m-Peter
Repo: onflow/flow-evm-gateway PR: 831
File: services/requester/batch_tx_pool.go:0-0
Timestamp: 2025-06-19T11:36:25.478Z
Learning: In Go, when copying maps that contain slices (like `map[gethCommon.Address][]pooledEvmTx`), perform deep copies by iterating over the map and copying each slice individually using `make()` and `copy()` to avoid shared references that could lead to race conditions and data corruption.
Applied to files:
api/stream.go
🧬 Code graph analysis (3)
models/errors/errors.go (2)
bootstrap/bootstrap.go (1)
New(84-119)storage/pebble/storage.go (1)
New(19-24)
api/stream.go (4)
models/stream.go (2)
Publisher(9-12)Subscription(49-53)models/receipt.go (1)
Receipt(18-36)models/errors/errors.go (1)
ErrExceedMaxTxHashes(20-20)eth/types/types.go (1)
MarshalReceipt(514-578)
bootstrap/bootstrap.go (3)
storage/pebble/receipts.go (1)
Receipts(20-22)models/stream.go (2)
Publisher(9-12)NewPublisher(14-19)models/receipt.go (1)
Receipt(18-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test
🔇 Additional comments (5)
models/errors/errors.go (1)
16-21: NewErrExceedMaxTxHashesAPI error is consistent and well-scopedName, visibility, and message align with the existing API-specific error set and cleanly support the new subscription limit check in
StreamAPI.TransactionReceipts.api/stream.go (2)
24-61: Receipts publisher wiring intoStreamAPIlooks correctAdding
receiptsPublisher *models.Publisher[[]*models.Receipt]toStreamAPIand threading it throughNewStreamAPIcleanly mirrors the existing block/tx/log publishers and matches the bootstrap wiring. No issues here.
132-183: TightenTransactionReceiptsimplementation: avoid empty notifications and improve hash filtering structureThe overall shape looks good (limit check, optional hash filter, reuse of
ethTypes.MarshalReceipt), but a couple of refinements would make it more robust and efficient:
- Skip notifications when no receipts match the filter. Right now a batch is sent even when
marshaledReceiptsis empty. To reduce noise and better align with typical subscription semantics, guard the notify call:- // Send a batch of tx receipts in one notification - return notifier.Notify(sub.ID, marshaledReceipts) + // Send a batch of tx receipts in one notification (only if non-empty) + if len(marshaledReceipts) == 0 { + return nil + } + return notifier.Notify(sub.ID, marshaledReceipts)
- Precompute a hash set for filtering instead of using
slices.Containsin the hot loop. WithmaxTxHashes = 200this is not critical, but using amap[gethCommon.Hash]struct{}avoids repeated O(m) scans and is more idiomatic for membership tests:func (s *StreamAPI) TransactionReceipts( ctx context.Context, filter *filters.TransactionReceiptsQuery, ) (*rpc.Subscription, error) { @@ - var txHashes []gethCommon.Hash - - if filter != nil { - txHashes = filter.TransactionHashes - } + var txHashes []gethCommon.Hash + var txHashSet map[gethCommon.Hash]struct{} + if filter != nil && len(filter.TransactionHashes) > 0 { + txHashes = filter.TransactionHashes + txHashSet = make(map[gethCommon.Hash]struct{}, len(txHashes)) + for _, h := range txHashes { + txHashSet[h] = struct{}{} + } + } @@ - for _, receipt := range receipts { + for _, receipt := range receipts { // Check if the subscription is only interested for a given // set of tx receipts. - if len(txHashes) > 0 && !slices.Contains(txHashes, receipt.TxHash) { + if txHashSet != nil { + if _, ok := txHashSet[receipt.TxHash]; !ok { + continue + } + } continue }
- Go version note: importing
slicesassumes the repo is on Go ≥ 1.21. Please double‑check the toolchain andgo.modto ensure this is intentional and supported.[ suggest_recommended_refactor ]
bootstrap/bootstrap.go (2)
62-112: Receipts publisher addition toPublishersandNewis consistent with existing patternsDefining
Receipts *models.Publisher[[]*models.Receipt]alongsideBlock/Transaction/Logsand initializing it inNewviamodels.NewPublisher[[]*models.Receipt]()fits the existing publisher pattern and keeps bootstrap responsibilities clear.
192-208: Ingestion and API wiring for receipts publisher is coherent end‑to‑endPassing
b.publishers.Receiptsintoingestion.NewEventIngestionEngineand then intoapi.NewStreamAPIensures that newly indexed receipts are streamed out via thetransactionReceiptssubscription without introducing extra coupling. The argument ordering remains readable and consistent with other publishers.Also applies to: 336-346
db4c441 to
a281519
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tests/web3js/eth_streaming_test.js (1)
1-1: Strengthen receipt assertions and avoid magic10constantThe end‑to‑end flow looks good and exercises both unfiltered and hash‑filtered
transactionReceiptssubscriptions, but the checks are a bit looser than they could be:
assert.equal(10, receipts.length)bakes in a magic number that’s not obviously derived fromtestValuesor the number of transactions this test sends. If the background receipt volume or indexing behavior changes, this may become brittle.- For each
txHashyou scanreceiptsand assert equality when a match is found, but you don’t assert that every sent hash appears at least once, nor that it appears exactly once.Consider tightening this by:
- Deriving the expected count from the test itself (e.g. number of transactions you send, possibly times an expected multiplier if you know each triggers multiple receipts), and
- Grouping receipts by
transactionHashand asserting that eachsentHashesentry has exactly one corresponding streamed receipt and that there are no unexpected ones, e.g.:const receiptsByHash = receipts.reduce((m, r) => { (m[r.transactionHash] ||= []).push(r) return m }, {}) for (const txHash of sentHashes) { assert.property(receiptsByHash, txHash) assert.lengthOf(receiptsByHash[txHash], 1) assert.deepEqual(receiptsByHash[txHash][0], /* expected RPC receipt */) }This would make the test more robust to unrelated receipts on the node and avoid coupling to the hard‑coded
10.Also applies to: 73-99, 152-221
api/stream.go (1)
132-183:TransactionReceiptsimplementation is sound; consider a few small refinementsThe subscription logic looks correct overall: it enforces the max‑hash limit, treats a nil/empty filter as “no filtering”, filters by
TransactionHasheswhen provided, resolves the backing transaction, and marshals to aneth_getTransactionReceipt‑compatible shape before batching notifications.A few optional refinements you might want to consider:
Skip notifications when no receipts match the filter
Right now, even if
marshaledReceiptsis empty (e.g., all receipts in the batch are filtered out bytxHashes), you still send[]:// Send a batch of tx receipts in one notification return notifier.Notify(sub.ID, marshaledReceipts)To better match typical subscription semantics and avoid extra empty notifications, you could early‑return when there’s nothing to emit:
if len(marshaledReceipts) == 0 { return nil } return notifier.Notify(sub.ID, marshaledReceipts)Pre‑allocate the result slice
Since you already know the upper bound, a tiny perf win:
marshaledReceipts := make([]map[string]any, 0, len(receipts))Optional: use a set for hash membership if this ever grows
With
maxTxHashes = 200,slices.Containsis absolutely fine. If the limit is ever raised substantially, convertingtxHashesonce to amap[gethCommon.Hash]struct{}would make membership checks O(1) instead of O(n) per receipt.None of these are correctness issues; the current implementation is functionally solid.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
api/stream.go(6 hunks)bootstrap/bootstrap.go(4 hunks)models/errors/errors.go(1 hunks)models/events.go(1 hunks)services/ingestion/engine.go(4 hunks)services/ingestion/engine_test.go(5 hunks)tests/web3js/eth_streaming_test.js(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- models/errors/errors.go
- models/events.go
- services/ingestion/engine.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-19T11:36:25.478Z
Learnt from: m-Peter
Repo: onflow/flow-evm-gateway PR: 831
File: services/requester/batch_tx_pool.go:0-0
Timestamp: 2025-06-19T11:36:25.478Z
Learning: In Go, when copying maps that contain slices (like `map[gethCommon.Address][]pooledEvmTx`), perform deep copies by iterating over the map and copying each slice individually using `make()` and `copy()` to avoid shared references that could lead to race conditions and data corruption.
Applied to files:
api/stream.go
🧬 Code graph analysis (4)
api/stream.go (4)
models/stream.go (2)
Publisher(9-12)Subscription(49-53)models/receipt.go (1)
Receipt(18-36)models/errors/errors.go (1)
ErrExceedMaxTxHashes(20-20)eth/types/types.go (1)
MarshalReceipt(514-578)
tests/web3js/eth_streaming_test.js (4)
tests/web3js/eth_streaming_filters_test.js (5)
require(2-2)require(4-4)helpers(1-1)conf(3-3)web3(6-6)tests/web3js/config.js (2)
require(1-1)web3(2-2)tests/web3js/setup_test.js (2)
require(1-1)conf(2-2)tests/web3js/helpers.js (4)
signedTx(66-66)signedTx(79-79)conf(2-2)web3(5-5)
services/ingestion/engine_test.go (2)
models/stream.go (1)
NewPublisher(14-19)models/receipt.go (1)
Receipt(18-36)
bootstrap/bootstrap.go (3)
storage/pebble/receipts.go (1)
Receipts(20-22)models/stream.go (2)
Publisher(9-12)NewPublisher(14-19)models/receipt.go (1)
Receipt(18-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test
🔇 Additional comments (3)
services/ingestion/engine_test.go (1)
67-82: Receipts publisher wiring into the ingestion engine looks consistentAll
NewEventIngestionEnginecall sites now pass a*models.Publisher[[]*models.Receipt]after the logs publisher, matching the updated constructor shape and keeping publishers and indexers aligned across tests and production wiring. I don’t see any ordering or type mismatches here.Also applies to: 147-162, 269-284, 378-393, 473-488
api/stream.go (1)
6-9: New receipts publisher plumbing inStreamAPIis coherentThe additions of
maxTxHashes, thereceiptsPublisherfield, and the correspondingreceiptsPublisherparameter inNewStreamAPIare all consistent with the rest of the type’s design and with how publishers are used for heads, pending txs, and logs. No ordering or type issues stand out.Also applies to: 24-36, 39-61
bootstrap/bootstrap.go (1)
62-67: Receipts publisher wiring through bootstrap → ingestion → API is consistentThe new
Receiptspublisher is initialized once inNew, passed into the ingestion engine inStartEventIngestion, and then intoNewStreamAPIinStartAPIServer, mirroring how Block/Transaction/Logs publishers are handled. The ordering of arguments matches the updated constructor signatures, so the receipts stream should now be available end‑to‑end without any mis‑plumbing.Also applies to: 105-112, 192-208, 336-346
Closes: #906
Description
When
eth_subscribeis given thetransactionReceiptstype, it creates a subscription that fires transaction receipts when transactions are included in blocks. Optional tx hashes can be passed in.For contributor use:
masterbranchFiles changedin the Github PR explorerSummary by CodeRabbit
New Features
Behavioral
Tests
✏️ Tip: You can customize this high-level summary in your review settings.