Skip to content

Commit d0441d0

Browse files
Giulio2002Giulio
andauthored
TxPool: map reduce and batch load (#18526)
Batches and deduplicates incoming transaction messages to reduce redundant computation and MDBX transaction overhead. ### Architecture & Rationale Two problems were identified in profiling: 1. **Blob verification allocations**: Duplicate transactions arriving from multiple peers caused the same expensive blob verification to run multiple times, leading to excessive allocations. 2. **MDBX transaction overhead**: Each incoming message opened its own read transaction. Profiling showed this accounted for ~15% of CPU time. The fix introduces: - **LRU-based deduplication**: An LRU cache (4096 entries) filters duplicate messages before they enter the processing batch. Uses `maphash` on the raw message data to avoid string allocations (we dont need to care about collision as it is a random seed for each instance and this is not a consensus component anyways). - **1-second batching window**: Instead of processing messages immediately, they accumulate in a batch and are flushed every second (or on stream close). A single MDBX read transaction is opened per batch and reused for all messages in that batch. This ensures each unique transaction is processed exactly once per batch window, and the MDBX transaction cost is amortized across potentially hundreds of messages. --------- Co-authored-by: Giulio <monkeair@MacBook-Air-di-Giulio.local>
1 parent 18f1bff commit d0441d0

File tree

1 file changed

+86
-14
lines changed

1 file changed

+86
-14
lines changed

txnprovider/txpool/fetch.go

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"hash/maphash"
2324
"sync"
2425
"time"
2526

27+
"github.com/hashicorp/golang-lru/v2/simplelru"
2628
"github.com/holiman/uint256"
2729
"google.golang.org/grpc"
2830
"google.golang.org/protobuf/types/known/emptypb"
@@ -176,6 +178,74 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentryproto.Sen
176178
}
177179
return err
178180
}
181+
182+
var (
183+
batch = make([]*sentryproto.InboundMessage, 0, 256)
184+
batchLock sync.Mutex
185+
)
186+
187+
// LRU cache for deduplication - filters duplicates before they enter the batch
188+
// Uses maphash to avoid string allocations
189+
var hasher maphash.Hash
190+
hasher.Reset() // just to be sure the seed is randomly set.
191+
seenLRU, _ := simplelru.NewLRU[uint64, struct{}](4096, nil)
192+
193+
flushBatch := func() {
194+
batchLock.Lock()
195+
if len(batch) == 0 {
196+
batchLock.Unlock()
197+
return
198+
}
199+
toProcess := batch
200+
batch = make([]*sentryproto.InboundMessage, 0, 256)
201+
batchLock.Unlock()
202+
203+
if !f.pool.Started() {
204+
return
205+
}
206+
207+
if f.db == nil {
208+
for range toProcess {
209+
if f.wg != nil {
210+
f.wg.Done()
211+
}
212+
}
213+
return
214+
}
215+
216+
tx, txErr := f.db.BeginRo(streamCtx)
217+
if txErr != nil {
218+
f.logger.Warn("[txpool.fetch] failed to begin batch transaction", "err", txErr)
219+
return
220+
}
221+
defer tx.Rollback()
222+
223+
for _, req := range toProcess {
224+
if handleErr := f.handleInboundMessageWithTx(streamCtx, tx, req, sentryClient); handleErr != nil {
225+
if !grpcutil.IsRetryLater(handleErr) && !grpcutil.IsEndOfStream(handleErr) {
226+
f.logger.Trace("[txpool.fetch] Handling batched message", "reqID", req.Id.String(), "err", handleErr)
227+
}
228+
}
229+
if f.wg != nil {
230+
f.wg.Done()
231+
}
232+
}
233+
}
234+
235+
// Start ticker goroutine to flush batch every second
236+
ticker := time.NewTicker(1 * time.Second)
237+
defer ticker.Stop()
238+
go func() {
239+
for {
240+
select {
241+
case <-streamCtx.Done():
242+
return
243+
case <-ticker.C:
244+
flushBatch()
245+
}
246+
}
247+
}()
248+
179249
var req *sentryproto.InboundMessage
180250
for req, err = stream.Recv(); ; req, err = stream.Recv() {
181251
if err != nil {
@@ -187,22 +257,29 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentryproto.Sen
187257
return fmt.Errorf("txpool.receiveMessage: %w", err)
188258
}
189259
if req == nil {
260+
flushBatch()
190261
return nil
191262
}
192-
if err = f.handleInboundMessage(streamCtx, req, sentryClient); err != nil {
193-
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
194-
time.Sleep(3 * time.Second)
195-
continue
263+
264+
// Skip duplicates using LRU cache with maphash to avoid string allocs
265+
hasher.Reset()
266+
hasher.Write(req.Data)
267+
hash := hasher.Sum64()
268+
if _, seen := seenLRU.Get(hash); seen {
269+
if f.wg != nil {
270+
f.wg.Done()
196271
}
197-
f.logger.Trace("[txpool.fetch] Handling incoming message", "reqID", req.Id.String(), "err", err)
198-
}
199-
if f.wg != nil {
200-
f.wg.Done()
272+
continue
201273
}
274+
seenLRU.Add(hash, struct{}{})
275+
276+
batchLock.Lock()
277+
batch = append(batch, req)
278+
batchLock.Unlock()
202279
}
203280
}
204281

205-
func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentryproto.InboundMessage, sentryClient sentryproto.SentryClient) (err error) {
282+
func (f *Fetch) handleInboundMessageWithTx(ctx context.Context, tx kv.Tx, req *sentryproto.InboundMessage, sentryClient sentryproto.SentryClient) (err error) {
206283
defer func() {
207284
if rec := recover(); rec != nil {
208285
err = fmt.Errorf("%+v, trace: %s, rlp: %x", rec, dbg.Stack(), req.Data)
@@ -212,11 +289,6 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentryproto.Inbou
212289
if !f.pool.Started() {
213290
return nil
214291
}
215-
tx, err := f.db.BeginRo(ctx)
216-
if err != nil {
217-
return err
218-
}
219-
defer tx.Rollback()
220292

221293
switch req.Id {
222294
case sentryproto.MessageId_NEW_POOLED_TRANSACTION_HASHES_66:

0 commit comments

Comments
 (0)