feat(input): mempool input plugin#609
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a mempool input plugin that polls a Cardano node via NtC LocalTxMonitor to read unconfirmed transactions and emit mempool.transaction events. Introduces package input/mempool with a Mempool type, constructor, Start/Stop lifecycle, connection setup, polling loop, CBOR parsing, deduplication, and event emission. Adds functional options, command-line plugin registration, a side-effect import to enable the plugin, tests, and two event helpers in event/tx.go: NewMempoolTransactionContext and NewTransactionEventFromTx. Possibly related PRs
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@event/tx.go`:
- Around line 62-74: NewMempoolTransactionContext is documented to use the
mempool snapshot slot but callers (e.g., the LocalTxMonitor Acquire call that
returns a slot) are passing 0; propagate the real snapshot slot instead. Update
the call site that currently passes 0 to use the slot value returned by
LocalTxMonitor.Acquire (or equivalent) and ensure
NewMempoolTransactionContext(slotNumber ...) receives that slot; adjust any
upstream signatures if needed so the slot is returned/available and update the
doc comment if semantics change.
- Around line 79-84: The struct literal initializing evt (TransactionEvent) has
misaligned field indentation causing gci/gofmt CI failures; fix by aligning all
fields (Transaction, Inputs, Outputs, Fee, Witnesses) consistently in the
TransactionEvent literal and reformat the file with gofmt (or run `gci write`)
so the spacing/indentation matches gofmt rules and the linter passes.
In `@input/mempool/mempool.go`:
- Around line 43-44: The linter failure is due to import ordering/grouping in
mempool.go; run the project's formatter (the gci-based import ordering tool or
the repository's "make format" / format script) to reorder and group imports per
project style, then re-run linters; after formatting commit the changes so the
file (mempool.go) with declarations like eventChan and errorChan passes the gci
check.
- Around line 247-278: The loop in pollOnce repeatedly emits every transaction
each poll, causing duplicates; add a seen-tx set on the mempool struct (e.g.,
seenTx map[string]struct{}) and use it inside the loop: after parsing the tx
(m.parseTx) compute a deterministic tx identifier (transaction hash/ID/CID on
the parsed tx), skip emitting if that id exists in seenTx, and otherwise add it
to seenTx and send the event to m.eventChan; also ensure you clear or rotate the
seenTx map when the mempool snapshot is reset or the component restarts to avoid
unbounded growth.
- Around line 62-80: Start() currently attempts to restart by manually closing
m.doneChan and replacing m.eventChan/m.errorChan which leaks the old channels
and goroutines; change the restart path to either refuse a second Start() or
call Stop() to fully tear down the prior instance before reinitializing.
Concretely, in Start() check running state (e.g., if m.doneChan != nil) and call
m.Stop() (which must close old eventChan/errorChan, wait on m.wg and stop any
goroutines spawned by setupConnection()) or return an error; then only after
Stop() completes re-create m.eventChan, m.errorChan, m.doneChan, call
setupConnection() and launch pollLoop(), ensuring setupConnection()’s
error-forwarding goroutine is terminated before reusing channels.
- Around line 178-196: Replace the single receive with a loop that drains
m.oConn.ErrorChan() (e.g., for err := range m.oConn.ErrorChan() { ... }) so all
errors are forwarded; inside the loop, avoid the TOCTOU by using a blocking
select that first checks <-m.doneChan to return and otherwise sends the error
into m.errorChan (only if m.errorChan != nil), e.g. if m.errorChan != nil {
select { case <-m.doneChan: return; case m.errorChan <- err: } } else { select {
case <-m.doneChan: return } }; keep the m.wg.Add/Done wrapping the goroutine.
In `@input/mempool/plugin.go`:
- Around line 24-32: The struct definition for cmdlineOptions is mis-formatted
per gci rules; run the project's formatter (or gci formatting command) to
reformat the struct declaration and its fields (cmdlineOptions, network,
address, socketPath, networkMagic, ntcTcp, includeCbor, pollInterval) so it
matches the project's import/format conventions and resolves the linter error.
- Around line 98-102: In NewFromCmdlineOptions the range check for
cmdlineOptions.networkMagic incorrectly excludes math.MaxUint32 so nm falls back
to 0; change the comparison from `< math.MaxUint32` to `<= math.MaxUint32` (keep
the same variables nm and cmdlineOptions.networkMagic) so MaxUint32 is accepted
as a valid uint32 value.
🧹 Nitpick comments (2)
event/tx.go (1)
78-108: Significant code duplication withNewTransactionEvent.
NewTransactionEventFromTxduplicates ~90% ofNewTransactionEvent(lines 110-149). Consider extracting the shared body into a private helper, with both public functions delegating to it.♻️ Suggested refactor to reduce duplication
+func buildTransactionEvent(tx ledger.Transaction, includeCbor bool) TransactionEvent { + evt := TransactionEvent{ + Transaction: tx, + Inputs: tx.Inputs(), + Outputs: tx.Outputs(), + Fee: tx.Fee().Uint64(), + Witnesses: tx.Witnesses(), + } + if includeCbor { + evt.TransactionCbor = tx.Cbor() + } + if tx.Certificates() != nil { + evt.Certificates = tx.Certificates() + } + if tx.Metadata() != nil { + evt.Metadata = tx.Metadata() + } + if tx.ReferenceInputs() != nil { + evt.ReferenceInputs = tx.ReferenceInputs() + } + if tx.TTL() != 0 { + evt.TTL = tx.TTL() + } + if withdrawals := tx.Withdrawals(); len(withdrawals) > 0 { + evt.Withdrawals = make(map[string]uint64) + for addr, amount := range withdrawals { + evt.Withdrawals[addr.String()] = amount.Uint64() + } + } + return evt +} + func NewTransactionEventFromTx(tx ledger.Transaction, includeCbor bool) TransactionEvent { - evt := TransactionEvent{ - Transaction: tx, - Inputs: tx.Inputs(), - Outputs: tx.Outputs(), - Fee: tx.Fee().Uint64(), - Witnesses: tx.Witnesses(), - } - // ... all the optional field population ... - return evt + return buildTransactionEvent(tx, includeCbor) } func NewTransactionEvent( block ledger.Block, tx ledger.Transaction, includeCbor bool, resolvedInputs []ledger.TransactionOutput, ) TransactionEvent { - evt := TransactionEvent{ - Transaction: tx, - BlockHash: block.Hash().String(), - Inputs: tx.Inputs(), - // ... duplicated logic ... - } + evt := buildTransactionEvent(tx, includeCbor) + evt.BlockHash = block.Hash().String() + if len(resolvedInputs) > 0 { + evt.ResolvedInputs = resolvedInputs + } return evt }input/mempool/plugin_test.go (1)
24-45: Tests cover basic construction but miss core logic.The current tests validate interface conformance and option propagation, which is a good start. However, there's no coverage for critical paths:
setupConnectionvalidation errors (missing socket/address, invalid network, bad poll interval),parseTx, or theStart/Stoplifecycle. Consider adding table-driven tests forsetupConnectionerror cases at minimum, as those are pure validation logic with no external dependencies.
There was a problem hiding this comment.
1 issue found across 6 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="input/mempool/mempool.go">
<violation number="1" location="input/mempool/mempool.go:86">
P2: Setting `m.doneChan` to nil while goroutines are still selecting on it can cause them to miss the shutdown signal and keep running, which may block `Stop()` indefinitely. Keep the channel reference until goroutines exit (e.g., set to nil after `wg.Wait()` or avoid nil assignment).</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@input/mempool/mempool.go`:
- Around line 82-101: The Stop method on Mempool can panic if called
concurrently because multiple goroutines may close m.doneChan (and other
channels) without synchronization; add a synchronization guard (preferably a new
field stopOnce of type sync.Once on the Mempool struct) and wrap the shutdown
logic in stopOnce.Do(func(){ ... }) so all closes and nil assignments for
doneChan, eventChan, errorChan, and the oConn Close call and wg.Wait() happen
exactly once; ensure the Stop method returns any existing error from oConn.Close
if needed or still returns nil, and remove the current unsynchronized nil checks
inside Stop since the once wrapper enforces single execution.
- Line 75: The call m.oConn.LocalTxMonitor().Client.Start() can panic if
LocalTxMonitor() or its Client is nil; update the code to fetch the monitor into
a variable (e.g., mon := m.oConn.LocalTxMonitor()), check mon != nil and
mon.Client != nil before calling mon.Client.Start(), and handle the nil case
(log an error/warning or return) instead of unguarded dereference so Start() is
only invoked on a valid client.
🧹 Nitpick comments (2)
event/tx.go (1)
78-107: Extract shared event-building logic to reduce duplication withNewTransactionEvent.Lines 78–107 duplicate nearly all of the optional-field population logic from
NewTransactionEvent(lines 110–148). The only differences are the absence ofBlockHashandResolvedInputs. Consider extracting the common body into a private helper (e.g.,buildTransactionEvent) and calling it from both functions to keep these in sync.♻️ Sketch of the refactor
// unexported helper shared by both constructors func buildTransactionEvent(tx ledger.Transaction, includeCbor bool) TransactionEvent { evt := TransactionEvent{ Transaction: tx, Inputs: tx.Inputs(), Outputs: tx.Outputs(), Fee: tx.Fee().Uint64(), Witnesses: tx.Witnesses(), } if includeCbor { evt.TransactionCbor = tx.Cbor() } if tx.Certificates() != nil { evt.Certificates = tx.Certificates() } if tx.Metadata() != nil { evt.Metadata = tx.Metadata() } if tx.ReferenceInputs() != nil { evt.ReferenceInputs = tx.ReferenceInputs() } if tx.TTL() != 0 { evt.TTL = tx.TTL() } if withdrawals := tx.Withdrawals(); len(withdrawals) > 0 { evt.Withdrawals = make(map[string]uint64) for addr, amount := range withdrawals { evt.Withdrawals[addr.String()] = amount.Uint64() } } return evt }Then both public constructors become thin wrappers:
func NewTransactionEventFromTx(tx ledger.Transaction, includeCbor bool) TransactionEvent { return buildTransactionEvent(tx, includeCbor) } func NewTransactionEvent(block ledger.Block, tx ledger.Transaction, includeCbor bool, resolvedInputs []ledger.TransactionOutput) TransactionEvent { evt := buildTransactionEvent(tx, includeCbor) evt.BlockHash = block.Hash().String() if len(resolvedInputs) > 0 { evt.ResolvedInputs = resolvedInputs } return evt }input/mempool/mempool.go (1)
281-287:parseTxis straightforward — consider wrapping errors for context.The function works correctly but wrapping the error from
DetermineTransactionTypewould help distinguish "unknown tx type" from "invalid CBOR" during debugging.♻️ Suggested improvement
func (m *Mempool) parseTx(data []byte) (ledger.Transaction, error) { txType, err := ledger.DetermineTransactionType(data) if err != nil { - return nil, err + return nil, fmt.Errorf("determine tx type: %w", err) } - return ledger.NewTransactionFromCbor(txType, data) + tx, err := ledger.NewTransactionFromCbor(txType, data) + if err != nil { + return nil, fmt.Errorf("decode tx from CBOR: %w", err) + } + return tx, nil }
input/mempool/mempool.go
Outdated
| // Stop shuts down the connection and stops polling | ||
| func (m *Mempool) Stop() error { | ||
| if m.doneChan != nil { | ||
| close(m.doneChan) | ||
| m.doneChan = nil | ||
| } | ||
| if m.oConn != nil { | ||
| _ = m.oConn.Close() | ||
| m.oConn = nil | ||
| } | ||
| m.wg.Wait() | ||
| if m.eventChan != nil { | ||
| close(m.eventChan) | ||
| m.eventChan = nil | ||
| } | ||
| if m.errorChan != nil { | ||
| close(m.errorChan) | ||
| m.errorChan = nil | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Stop() can panic on double-call if doneChan is closed concurrently.
If two goroutines call Stop() concurrently, both could pass the m.doneChan != nil check before either sets it to nil, resulting in a double-close panic. Since there's no mutex protecting these fields, this is a latent race. If concurrent lifecycle calls are possible, protect with a sync.Once or a mutex.
🤖 Prompt for AI Agents
In `@input/mempool/mempool.go` around lines 82 - 101, The Stop method on Mempool
can panic if called concurrently because multiple goroutines may close
m.doneChan (and other channels) without synchronization; add a synchronization
guard (preferably a new field stopOnce of type sync.Once on the Mempool struct)
and wrap the shutdown logic in stopOnce.Do(func(){ ... }) so all closes and nil
assignments for doneChan, eventChan, errorChan, and the oConn Close call and
wg.Wait() happen exactly once; ensure the Stop method returns any existing error
from oConn.Close if needed or still returns nil, and remove the current
unsynchronized nil checks inside Stop since the once wrapper enforces single
execution.
There was a problem hiding this comment.
This is valid. Do what chainsync does using sync.Once.
0411ee4 to
d630d75
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@input/mempool/mempool.go`:
- Around line 66-93: Start() currently replaces m.oConn via setupConnection()
without closing the previous connection, leaking sockets; before calling
setupConnection() add a nil-check and close the existing connection (e.g., call
m.oConn.Close() or the appropriate shutdown method on the oConn type) and handle
any returned error, then proceed to recreate m.oConn via setupConnection();
ensure this happens during the restart path in Start() so the old Ouroboros
connection is properly released.
🧹 Nitpick comments (1)
input/mempool/mempool.go (1)
259-264: Consider evicting confirmed txs instead of full reset for dedup.The hard reset at
maxSeenTxHashes(line 262) means a burst of re-emitted duplicates whenever the threshold is crossed. An alternative is to rebuild the set from the current snapshot's hashes each cycle, which naturally evicts confirmed txs and keeps the set bounded to mempool size. That said, the current approach is simple and pragmatic — just flagging for awareness.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@input/mempool/mempool.go`:
- Around line 263-268: The code currently replaces m.seenTxHashes wholesale when
len(m.seenTxHashes) > maxSeenTxHashes which causes all current mempool
transactions to be treated as new on the next poll; instead rebuild or trim the
map from the current mempool snapshot: iterate the current mempool transaction
hashes (from whatever method/variable supplies the snapshot in the poll routine)
and create a new map containing only those hashes, or implement a simple
trimming strategy (e.g., remove approx. half of entries at random or
least-recently-seen) from m.seenTxHashes rather than replacing it entirely;
update the logic around m.seenTxHashes and the poll flow so only missing/new
hashes are emitted as "new" events.
- Around line 178-189: The newly created ouroboros connection (oConn from
ouroboros.NewConnection) is not closed if oConn.Dial(m.dialFamily,
m.dialAddress) fails; ensure resources are released by calling oConn.Close()
when Dial returns an error. Modify the code so that after successful
NewConnection you either defer oConn.Close() with a success flag that cancels
the defer on overall success, or explicitly call oConn.Close() immediately
before returning the Dial error (referencing oConn, Dial, Close, and the dial
args m.dialFamily/m.dialAddress).
Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
…l only Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
88578ee to
e4a0344
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@input/mempool/mempool.go`:
- Around line 64-96: Start() currently closes and replaces eventChan/errorChan
(and doneChan), causing consumers that kept references from
OutputChan()/ErrorChan() to receive from closed channels; change Start() to
reuse existing channels instead of closing/replacing them: only create
eventChan/errorChan/doneChan if they are nil, do not close them on restart, and
instead signal shutdown via the existing doneChan or a new stop signal flow (or
document that Start() must only be called when no consumers are active). Update
the logic around m.doneChan, m.eventChan, and m.errorChan in Start() and any
shutdown path so channels are not closed/replaced while consumers may hold
references, and ensure consumers detect restarts via the done signal returned by
Done()/OutputChan()/ErrorChan().
🧹 Nitpick comments (3)
event/tx.go (1)
78-108: Significant duplication withNewTransactionEvent(lines 110–148).The conditional field population (CBOR, Certificates, Metadata, ReferenceInputs, TTL, Withdrawals) is copy-pasted from
NewTransactionEvent. Consider extracting a shared helper that populates these optional fields on a*TransactionEvent, then calling it from both constructors.♻️ Sketch
+func populateOptionalFields(evt *TransactionEvent, tx ledger.Transaction, includeCbor bool) { + if includeCbor { + evt.TransactionCbor = tx.Cbor() + } + if tx.Certificates() != nil { + evt.Certificates = tx.Certificates() + } + if tx.Metadata() != nil { + evt.Metadata = tx.Metadata() + } + if tx.ReferenceInputs() != nil { + evt.ReferenceInputs = tx.ReferenceInputs() + } + if tx.TTL() != 0 { + evt.TTL = tx.TTL() + } + if withdrawals := tx.Withdrawals(); len(withdrawals) > 0 { + evt.Withdrawals = make(map[string]uint64) + for addr, amount := range withdrawals { + evt.Withdrawals[addr.String()] = amount.Uint64() + } + } +}input/mempool/mempool.go (2)
320-321: Dedup window only remembers the previous poll — transactions that leave and re-enter the mempool will be re-emitted.
seenTxHashesis replaced entirely withthisPollHasheseach cycle. If a transaction leaves the mempool between two polls and then reappears (e.g., eviction + resubmission), it will be re-emitted. This is a reasonable trade-off for bounded memory, but worth documenting for downstream consumers.
195-213: Error-forwarding goroutine will block indefinitely iferrorChanis full anddoneChannever fires.
errorChanhas a buffer of 1. If the Ouroboros connection produces errors faster than they're consumed, the forwarding goroutine blocks at line 209 until either the error is drained ordoneChancloses. This means subsequent connection errors are not forwarded and the goroutine is effectively stalled. A non-blocking send with a log/drop would keep the goroutine responsive.
input/mempool/mempool.go
Outdated
| // Start connects to the node and starts polling the mempool | ||
| func (m *Mempool) Start() error { | ||
| if m.doneChan != nil { | ||
| close(m.doneChan) | ||
| m.wg.Wait() | ||
| } | ||
| if m.oConn != nil { | ||
| _ = m.oConn.Close() | ||
| m.oConn = nil | ||
| } | ||
| if m.eventChan != nil { | ||
| close(m.eventChan) | ||
| m.eventChan = nil | ||
| } | ||
| if m.errorChan != nil { | ||
| close(m.errorChan) | ||
| m.errorChan = nil | ||
| } | ||
|
|
||
| m.eventChan = make(chan event.Event, 10) | ||
| m.errorChan = make(chan error, 1) | ||
| m.doneChan = make(chan struct{}) | ||
|
|
||
| if err := m.setupConnection(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| m.oConn.LocalTxMonitor().Client.Start() | ||
|
|
||
| m.wg.Add(1) | ||
| go m.pollLoop() | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Restarting replaces channels that consumers may already hold references to.
OutputChan() and ErrorChan() return the current channel value. If a consumer stores that reference and then Start() is called again (lines 74-84 close and replace the channels), the consumer silently reads from a closed channel. Consider documenting that Start() must not be called while consumers are active, or reuse the same channels across restarts.
🤖 Prompt for AI Agents
In `@input/mempool/mempool.go` around lines 64 - 96, Start() currently closes and
replaces eventChan/errorChan (and doneChan), causing consumers that kept
references from OutputChan()/ErrorChan() to receive from closed channels; change
Start() to reuse existing channels instead of closing/replacing them: only
create eventChan/errorChan/doneChan if they are nil, do not close them on
restart, and instead signal shutdown via the existing doneChan or a new stop
signal flow (or document that Start() must only be called when no consumers are
active). Update the logic around m.doneChan, m.eventChan, and m.errorChan in
Start() and any shutdown path so channels are not closed/replaced while
consumers may hold references, and ensure consumers detect restarts via the done
signal returned by Done()/OutputChan()/ErrorChan().
Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
|
Sorry in advance for the bot spam. @coderabbitai resume |
@wolf31o2 I have started the AI code review. It will take a few minutes to complete. |
wolf31o2
left a comment
There was a problem hiding this comment.
One comment the bot had pointed out before.
input/mempool/mempool.go
Outdated
| // Stop shuts down the connection and stops polling | ||
| func (m *Mempool) Stop() error { | ||
| if m.doneChan != nil { | ||
| close(m.doneChan) | ||
| m.doneChan = nil | ||
| } | ||
| if m.oConn != nil { | ||
| _ = m.oConn.Close() | ||
| m.oConn = nil | ||
| } | ||
| m.wg.Wait() | ||
| if m.eventChan != nil { | ||
| close(m.eventChan) | ||
| m.eventChan = nil | ||
| } | ||
| if m.errorChan != nil { | ||
| close(m.errorChan) | ||
| m.errorChan = nil | ||
| } | ||
| return nil |
There was a problem hiding this comment.
This is valid. Do what chainsync does using sync.Once.
Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
673c2c7 to
8fb67cd
Compare
Closes #496
Summary by cubic
Add a mempool input plugin that streams unconfirmed Cardano transactions via LocalTxMonitor and emits mempool.transaction events. Implements unconfirmed tx ingestion from #496 with safe restarts, per‑poll dedupe, and auto‑registration.
New Features
Bug Fixes
Written for commit 8fb67cd. Summary will update on new commits.
Summary by CodeRabbit
New Features
Enhancements
Chores
Tests