Skip to content

Commit f0db5e3

Browse files
authored
feat(input): mempool input plugin (#609)
* feat(input): mempool input plugin Signed-off-by: cryptodj413 <shinjirohara2@gmail.com> * fix: resolve golangci-lint issues Signed-off-by: cryptodj413 <shinjirohara2@gmail.com> * fix(mempool): address code review Signed-off-by: cryptodj413 <shinjirohara2@gmail.com> * fix(mempool): close previous connection on Start() restart Signed-off-by: cryptodj413 <shinjirohara2@gmail.com> * fix(mempool): close connection on Dial failure and dedupe by last poll only Signed-off-by: cryptodj413 <shinjirohara2@gmail.com> * fix(mempool): channel reuse on restart Signed-off-by: cryptodj413 <shinjirohara2@gmail.com> * fix(mempool): idempotent Stop Signed-off-by: cryptodj413 <shinjirohara2@gmail.com> --------- Signed-off-by: cryptodj413 <shinjirohara2@gmail.com>
1 parent b2aec67 commit f0db5e3

File tree

6 files changed

+623
-0
lines changed

6 files changed

+623
-0
lines changed

event/tx.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,54 @@ func NewTransactionContext(
5959
return ctx
6060
}
6161

62+
// NewMempoolTransactionContext creates a context for a mempool (unconfirmed) transaction.
63+
// SlotNumber is the mempool snapshot slot from the node; BlockNumber and TransactionIdx are zero.
64+
func NewMempoolTransactionContext(
65+
tx ledger.Transaction,
66+
slotNumber uint64,
67+
networkMagic uint32,
68+
) TransactionContext {
69+
return TransactionContext{
70+
TransactionHash: tx.Hash().String(),
71+
SlotNumber: slotNumber,
72+
NetworkMagic: networkMagic,
73+
}
74+
}
75+
76+
// NewTransactionEventFromTx builds a TransactionEvent from a transaction only (no block).
77+
// Used for mempool transactions; BlockHash is left empty.
78+
func NewTransactionEventFromTx(tx ledger.Transaction, includeCbor bool) TransactionEvent {
79+
evt := TransactionEvent{
80+
Transaction: tx,
81+
Inputs: tx.Inputs(),
82+
Outputs: tx.Outputs(),
83+
Fee: tx.Fee().Uint64(),
84+
Witnesses: tx.Witnesses(),
85+
}
86+
if includeCbor {
87+
evt.TransactionCbor = tx.Cbor()
88+
}
89+
if tx.Certificates() != nil {
90+
evt.Certificates = tx.Certificates()
91+
}
92+
if tx.Metadata() != nil {
93+
evt.Metadata = tx.Metadata()
94+
}
95+
if tx.ReferenceInputs() != nil {
96+
evt.ReferenceInputs = tx.ReferenceInputs()
97+
}
98+
if tx.TTL() != 0 {
99+
evt.TTL = tx.TTL()
100+
}
101+
if withdrawals := tx.Withdrawals(); len(withdrawals) > 0 {
102+
evt.Withdrawals = make(map[string]uint64)
103+
for addr, amount := range withdrawals {
104+
evt.Withdrawals[addr.String()] = amount.Uint64()
105+
}
106+
}
107+
return evt
108+
}
109+
62110
func NewTransactionEvent(
63111
block ledger.Block,
64112
tx ledger.Transaction,

input/input.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ package input
1717
// We import the various plugins that we want to be auto-registered
1818
import (
1919
_ "github.com/blinklabs-io/adder/input/chainsync"
20+
_ "github.com/blinklabs-io/adder/input/mempool"
2021
)

input/mempool/mempool.go

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
// Copyright 2025 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package mempool
16+
17+
import (
18+
"errors"
19+
"fmt"
20+
"sync"
21+
"time"
22+
23+
"github.com/blinklabs-io/adder/event"
24+
"github.com/blinklabs-io/adder/plugin"
25+
ouroboros "github.com/blinklabs-io/gouroboros"
26+
"github.com/blinklabs-io/gouroboros/ledger"
27+
localtxmonitor "github.com/blinklabs-io/gouroboros/protocol/localtxmonitor"
28+
)
29+
30+
const (
31+
defaultPollInterval = 5 * time.Second
32+
)
33+
34+
type Mempool struct {
35+
logger plugin.Logger
36+
network string
37+
networkMagic uint32
38+
socketPath string
39+
address string
40+
ntcTcp bool
41+
includeCbor bool
42+
pollIntervalStr string
43+
pollInterval time.Duration
44+
45+
eventChan chan event.Event
46+
errorChan chan error
47+
doneChan chan struct{}
48+
wg sync.WaitGroup
49+
stopOnce sync.Once // idempotent Stop (same pattern as pipeline.Pipeline)
50+
oConn *ouroboros.Connection
51+
dialFamily string
52+
dialAddress string
53+
seenTxHashes map[string]struct{}
54+
}
55+
56+
// New returns a new Mempool input plugin
57+
func New(opts ...MempoolOptionFunc) *Mempool {
58+
m := &Mempool{}
59+
for _, opt := range opts {
60+
opt(m)
61+
}
62+
return m
63+
}
64+
65+
// Start connects to the node and starts polling the mempool.
66+
// Safe to call again to restart (e.g. when the pipeline is restarted via
67+
// Stop() then Start()). Event and error channels are reused when non-nil so
68+
// that the pipeline's goroutines reading from OutputChan()/ErrorChan() never
69+
// see a closed channel; after Stop() they are nil so the next Start() creates
70+
// new channels and the pipeline obtains fresh references.
71+
func (m *Mempool) Start() error {
72+
m.stopOnce = sync.Once{} // reset so next Stop() runs (Pipeline resets on restart too)
73+
if m.doneChan != nil {
74+
close(m.doneChan)
75+
m.wg.Wait()
76+
}
77+
if m.oConn != nil {
78+
_ = m.oConn.Close()
79+
m.oConn = nil
80+
}
81+
if m.eventChan == nil {
82+
m.eventChan = make(chan event.Event, 10)
83+
}
84+
if m.errorChan == nil {
85+
m.errorChan = make(chan error, 1)
86+
}
87+
m.doneChan = make(chan struct{})
88+
89+
if err := m.setupConnection(); err != nil {
90+
return err
91+
}
92+
93+
m.oConn.LocalTxMonitor().Client.Start()
94+
95+
m.wg.Add(1)
96+
go m.pollLoop()
97+
return nil
98+
}
99+
100+
// Stop shuts down the connection and stops polling.
101+
// Idempotent and safe to call multiple times, following the Pipeline's
102+
// pattern (pipeline/pipeline.go): shutdown logic runs inside sync.Once so
103+
// multiple Stop() calls never double-close channels.
104+
func (m *Mempool) Stop() error {
105+
m.stopOnce.Do(func() {
106+
if m.doneChan != nil {
107+
close(m.doneChan)
108+
m.doneChan = nil
109+
}
110+
if m.oConn != nil {
111+
_ = m.oConn.Close()
112+
m.oConn = nil
113+
}
114+
m.wg.Wait()
115+
if m.eventChan != nil {
116+
close(m.eventChan)
117+
m.eventChan = nil
118+
}
119+
if m.errorChan != nil {
120+
close(m.errorChan)
121+
m.errorChan = nil
122+
}
123+
})
124+
return nil
125+
}
126+
127+
// ErrorChan returns the plugin's error channel
128+
func (m *Mempool) ErrorChan() <-chan error {
129+
return m.errorChan
130+
}
131+
132+
// InputChan returns nil (mempool is an input-only plugin)
133+
func (m *Mempool) InputChan() chan<- event.Event {
134+
return nil
135+
}
136+
137+
// OutputChan returns the channel of mempool transaction events
138+
func (m *Mempool) OutputChan() <-chan event.Event {
139+
return m.eventChan
140+
}
141+
142+
func (m *Mempool) setupConnection() error {
143+
if m.network != "" {
144+
network, ok := ouroboros.NetworkByName(m.network)
145+
if !ok {
146+
return fmt.Errorf("unknown network: %s", m.network)
147+
}
148+
if m.networkMagic == 0 {
149+
m.networkMagic = network.NetworkMagic
150+
}
151+
}
152+
if m.address != "" {
153+
m.dialFamily = "tcp"
154+
m.dialAddress = m.address
155+
if !m.ntcTcp {
156+
return errors.New("address requires input-mempool-ntc-tcp=true for NtC over TCP")
157+
}
158+
} else if m.socketPath != "" {
159+
m.dialFamily = "unix"
160+
m.dialAddress = m.socketPath
161+
} else {
162+
return errors.New("must specify input-mempool-socket-path or input-mempool-address")
163+
}
164+
if m.networkMagic == 0 {
165+
return errors.New("must specify input-mempool-network or input-mempool-network-magic")
166+
}
167+
168+
m.pollInterval = defaultPollInterval
169+
if m.pollIntervalStr != "" {
170+
d, err := time.ParseDuration(m.pollIntervalStr)
171+
if err != nil {
172+
return fmt.Errorf("invalid poll interval: %w", err)
173+
}
174+
if d <= 0 {
175+
return errors.New("poll interval must be positive")
176+
}
177+
m.pollInterval = d
178+
}
179+
180+
cfg := localtxmonitor.NewConfig(
181+
localtxmonitor.WithAcquireTimeout(10*time.Second),
182+
localtxmonitor.WithQueryTimeout(30*time.Second),
183+
)
184+
oConn, err := ouroboros.NewConnection(
185+
ouroboros.WithNetworkMagic(m.networkMagic),
186+
ouroboros.WithNodeToNode(false),
187+
ouroboros.WithKeepAlive(true),
188+
ouroboros.WithLocalTxMonitorConfig(cfg),
189+
)
190+
if err != nil {
191+
return err
192+
}
193+
if err := oConn.Dial(m.dialFamily, m.dialAddress); err != nil {
194+
_ = oConn.Close()
195+
return err
196+
}
197+
m.oConn = oConn
198+
if m.logger != nil {
199+
m.logger.Info("connected to node for mempool", "address", m.dialAddress)
200+
}
201+
202+
m.wg.Add(1)
203+
go func() {
204+
defer m.wg.Done()
205+
for {
206+
select {
207+
case <-m.doneChan:
208+
return
209+
case err, ok := <-m.oConn.ErrorChan():
210+
if !ok {
211+
return
212+
}
213+
select {
214+
case <-m.doneChan:
215+
return
216+
case m.errorChan <- err:
217+
}
218+
}
219+
}
220+
}()
221+
return nil
222+
}
223+
224+
func (m *Mempool) pollLoop() {
225+
defer m.wg.Done()
226+
if m.pollInterval <= 0 {
227+
m.pollInterval = defaultPollInterval
228+
}
229+
ticker := time.NewTicker(m.pollInterval)
230+
defer ticker.Stop()
231+
232+
for {
233+
select {
234+
case <-m.doneChan:
235+
return
236+
case <-ticker.C:
237+
m.pollOnce()
238+
}
239+
}
240+
}
241+
242+
func (m *Mempool) pollOnce() {
243+
if m.oConn == nil {
244+
return
245+
}
246+
client := m.oConn.LocalTxMonitor().Client
247+
if client == nil {
248+
return
249+
}
250+
if err := client.Acquire(); err != nil {
251+
if m.logger != nil {
252+
m.logger.Warn("mempool acquire failed", "error", err)
253+
}
254+
return
255+
}
256+
defer func() {
257+
_ = client.Release()
258+
}()
259+
260+
_, _, numTxs, err := client.GetSizes()
261+
if err != nil {
262+
if m.logger != nil {
263+
m.logger.Warn("mempool GetSizes failed", "error", err)
264+
}
265+
return
266+
}
267+
if numTxs == 0 {
268+
return
269+
}
270+
if m.seenTxHashes == nil {
271+
m.seenTxHashes = make(map[string]struct{})
272+
}
273+
274+
// Collect all txs this poll. We only need to remember last poll's hashes
275+
// to emit events only for newly seen transactions.
276+
type pollTx struct {
277+
hash string
278+
tx ledger.Transaction
279+
}
280+
var pollTxs []pollTx
281+
for {
282+
select {
283+
case <-m.doneChan:
284+
return
285+
default:
286+
}
287+
txCbor, err := client.NextTx()
288+
if err != nil {
289+
if m.logger != nil {
290+
m.logger.Warn("mempool NextTx failed", "error", err)
291+
}
292+
return
293+
}
294+
if len(txCbor) == 0 {
295+
break
296+
}
297+
tx, err := m.parseTx(txCbor)
298+
if err != nil {
299+
if m.logger != nil {
300+
m.logger.Debug("mempool skip tx parse error", "error", err, "cbor_len", len(txCbor))
301+
}
302+
continue
303+
}
304+
txHash := tx.Hash().String()
305+
pollTxs = append(pollTxs, pollTx{hash: txHash, tx: tx})
306+
}
307+
308+
thisPollHashes := make(map[string]struct{}, len(pollTxs))
309+
for _, p := range pollTxs {
310+
thisPollHashes[p.hash] = struct{}{}
311+
}
312+
313+
for _, p := range pollTxs {
314+
if _, seen := m.seenTxHashes[p.hash]; seen {
315+
continue
316+
}
317+
ctx := event.NewMempoolTransactionContext(p.tx, 0, m.networkMagic)
318+
payload := event.NewTransactionEventFromTx(p.tx, m.includeCbor)
319+
evt := event.New("mempool.transaction", time.Now(), ctx, payload)
320+
select {
321+
case <-m.doneChan:
322+
return
323+
case m.eventChan <- evt:
324+
}
325+
}
326+
327+
// Remember only this poll's hashes for next time (no unbounded growth).
328+
m.seenTxHashes = thisPollHashes
329+
}
330+
331+
func (m *Mempool) parseTx(data []byte) (ledger.Transaction, error) {
332+
txType, err := ledger.DetermineTransactionType(data)
333+
if err != nil {
334+
return nil, err
335+
}
336+
return ledger.NewTransactionFromCbor(txType, data)
337+
}

0 commit comments

Comments
 (0)