@@ -27,7 +27,10 @@ import (
2727 localtxmonitor "github.com/blinklabs-io/gouroboros/protocol/localtxmonitor"
2828)
2929
30- const defaultPollInterval = 5 * time .Second
30+ const (
31+ defaultPollInterval = 5 * time .Second
32+ maxSeenTxHashes = 100_000
33+ )
3134
3235type Mempool struct {
3336 logger plugin.Logger
@@ -40,13 +43,14 @@ type Mempool struct {
4043 pollIntervalStr string
4144 pollInterval time.Duration
4245
43- eventChan chan event.Event
44- errorChan chan error
45- doneChan chan struct {}
46- wg sync.WaitGroup
47- oConn * ouroboros.Connection
48- dialFamily string
49- dialAddress string
46+ eventChan chan event.Event
47+ errorChan chan error
48+ doneChan chan struct {}
49+ wg sync.WaitGroup
50+ oConn * ouroboros.Connection
51+ dialFamily string
52+ dialAddress string
53+ seenTxHashes map [string ]struct {}
5054}
5155
5256// New returns a new Mempool input plugin
@@ -64,6 +68,15 @@ func (m *Mempool) Start() error {
6468 close (m .doneChan )
6569 m .wg .Wait ()
6670 }
71+ if m .eventChan != nil {
72+ close (m .eventChan )
73+ m .eventChan = nil
74+ }
75+ if m .errorChan != nil {
76+ close (m .errorChan )
77+ m .errorChan = nil
78+ }
79+
6780 m .eventChan = make (chan event.Event , 10 )
6881 m .errorChan = make (chan error , 1 )
6982 m .doneChan = make (chan struct {})
@@ -178,18 +191,18 @@ func (m *Mempool) setupConnection() error {
178191 m .wg .Add (1 )
179192 go func () {
180193 defer m .wg .Done ()
181- err , ok := <- m .oConn .ErrorChan ()
182- if ! ok {
183- return
184- }
185- select {
186- case <- m .doneChan :
187- return
188- default :
189- if m .errorChan != nil {
194+ for {
195+ select {
196+ case <- m .doneChan :
197+ return
198+ case err , ok := <- m .oConn .ErrorChan ():
199+ if ! ok {
200+ return
201+ }
190202 select {
203+ case <- m .doneChan :
204+ return
191205 case m .errorChan <- err :
192- default :
193206 }
194207 }
195208 }
@@ -243,6 +256,12 @@ func (m *Mempool) pollOnce() {
243256 if numTxs == 0 {
244257 return
245258 }
259+ if m .seenTxHashes == nil {
260+ m .seenTxHashes = make (map [string ]struct {})
261+ }
262+ if len (m .seenTxHashes ) > maxSeenTxHashes {
263+ m .seenTxHashes = make (map [string ]struct {})
264+ }
246265
247266 for {
248267 select {
@@ -267,6 +286,11 @@ func (m *Mempool) pollOnce() {
267286 }
268287 continue
269288 }
289+ txHash := tx .Hash ().String ()
290+ if _ , seen := m .seenTxHashes [txHash ]; seen {
291+ continue
292+ }
293+ m .seenTxHashes [txHash ] = struct {}{}
270294 ctx := event .NewMempoolTransactionContext (tx , 0 , m .networkMagic )
271295 payload := event .NewTransactionEventFromTx (tx , m .includeCbor )
272296 evt := event .New ("mempool.transaction" , time .Now (), ctx , payload )
0 commit comments