11package vocone
22
33import (
4+ "bytes"
45 "context"
56 "encoding/binary"
67 "errors"
@@ -24,20 +25,38 @@ func (vc *Vocone) loadMempool() error {
2425 if err != nil && ! errors .Is (err , db .ErrKeyNotFound ) {
2526 return fmt .Errorf ("could not read mempool sequence: %w" , err )
2627 }
28+ var persistedSeq uint64
2729 if len (seqBytes ) == 8 {
28- vc . mempoolSeq = binary .BigEndian .Uint64 (seqBytes )
30+ persistedSeq = binary .BigEndian .Uint64 (seqBytes )
2931 }
3032
3133 // Iterate over all pending transactions and rebuild the key list.
3234 vc .mempoolKeys = nil
35+ var maxObservedSeq uint64
3336 if err := vc .mempoolDB .Iterate ([]byte (prefixMempool ), func (key , _ []byte ) bool {
37+ switch {
38+ case len (key ) == 8 :
39+ // Some db wrappers return prefix-stripped keys.
40+ if seq := binary .BigEndian .Uint64 (key ); seq > maxObservedSeq {
41+ maxObservedSeq = seq
42+ }
43+ case len (key ) >= len (prefixMempool )+ 8 && bytes .HasPrefix (key , []byte (prefixMempool )):
44+ if seq := binary .BigEndian .Uint64 (key [len (key )- 8 :]); seq > maxObservedSeq {
45+ maxObservedSeq = seq
46+ }
47+ }
3448 keyCopy := make ([]byte , len (key ))
3549 copy (keyCopy , key )
3650 vc .mempoolKeys = append (vc .mempoolKeys , keyCopy )
3751 return true
3852 }); err != nil {
3953 return fmt .Errorf ("could not iterate mempool: %w" , err )
4054 }
55+ if persistedSeq >= maxObservedSeq {
56+ vc .mempoolSeq = persistedSeq
57+ } else {
58+ vc .mempoolSeq = maxObservedSeq
59+ }
4160
4261 if len (vc .mempoolKeys ) > 0 {
4362 log .Infow ("recovered pending transactions from mempool" , "count" , len (vc .mempoolKeys ))
@@ -139,10 +158,10 @@ func mempoolKey(seq uint64) []byte {
139158}
140159
141160// prepareBlock drains transactions from the mempool, re-validates them,
142- // stores valid ones in the block store, and returns the raw tx list .
143- func ( vc * Vocone ) prepareBlock () [][] byte {
144- height := vc . height . Load ()
145-
161+ // and returns the raw tx list along with the mempool keys that were consumed .
162+ // It does NOT modify the mempool or blockstore — the caller is responsible
163+ // for persisting writes and cleaning up the mempool after a successful commit.
164+ func ( vc * Vocone ) prepareBlock () ( txs [][] byte , consumedKeys [][] byte ) {
146165 vc .mempoolMtx .Lock ()
147166 // Take up to txsPerBlock keys from the front of the queue.
148167 count := min (vc .txsPerBlock , len (vc .mempoolKeys ))
@@ -151,20 +170,9 @@ func (vc *Vocone) prepareBlock() [][]byte {
151170 vc .mempoolMtx .Unlock ()
152171
153172 if len (pendingKeys ) == 0 {
154- return nil
173+ return nil , nil
155174 }
156175
157- blockStoreTx := vc .blockStore .WriteTx ()
158- defer blockStoreTx .Discard ()
159- mempoolWTx := vc .mempoolDB .WriteTx ()
160- defer mempoolWTx .Discard ()
161-
162- var (
163- transactions [][]byte
164- consumedKeys [][]byte
165- txCount int32
166- )
167-
168176 for _ , key := range pendingKeys {
169177 txData , err := vc .mempoolDB .Get (key )
170178 if err != nil {
@@ -187,42 +195,53 @@ func (vc *Vocone) prepareBlock() [][]byte {
187195 continue
188196 }
189197
190- // Store in the block store.
191- if err := blockStoreTx .Set (txKey (height , txCount ), txData ); err != nil {
192- log .Errorw (err , "error storing tx in blockstore" )
193- break
194- }
195- transactions = append (transactions , txData )
198+ txs = append (txs , txData )
196199 consumedKeys = append (consumedKeys , key )
197- txCount ++
198200 }
199201
200- // Commit block store writes.
201- if len (transactions ) > 0 {
202- if err := blockStoreTx .Commit (); err != nil {
203- log .Errorw (err , "could not commit block store transaction" )
204- return nil
205- }
202+ if len (txs ) > 0 {
203+ log .Infow ("prepared block transactions" ,
204+ "count" , len (txs ), "height" , vc .height .Load ())
206205 }
206+ return txs , consumedKeys
207+ }
207208
208- // Remove consumed transactions from the mempool db.
209+ // commitMempoolCleanup removes consumed keys from the persistent mempool
210+ // and the in-memory index. Must be called after a successful block commit.
211+ func (vc * Vocone ) commitMempoolCleanup (consumedKeys [][]byte ) {
212+ if len (consumedKeys ) == 0 {
213+ return
214+ }
215+ wTx := vc .mempoolDB .WriteTx ()
216+ defer wTx .Discard ()
209217 for _ , key := range consumedKeys {
210- if err := mempoolWTx .Delete (key ); err != nil {
218+ if err := wTx .Delete (key ); err != nil {
211219 log .Errorw (err , "could not delete mempool entry" )
212220 }
213221 }
214- if err := mempoolWTx .Commit (); err != nil {
222+ if err := wTx .Commit (); err != nil {
215223 log .Errorw (err , "could not commit mempool cleanup" )
216224 }
217225
218- // Update the in-memory index.
226+ // Update the in-memory index by removing exactly the consumed keys .
219227 vc .mempoolMtx .Lock ()
220- vc .mempoolKeys = vc .mempoolKeys [len (consumedKeys ):]
221- vc .mempoolMtx .Unlock ()
222-
223- if len (transactions ) > 0 {
224- log .Infow ("prepared block transactions" ,
225- "count" , len (transactions ), "height" , height )
228+ defer vc .mempoolMtx .Unlock ()
229+ consumed := make (map [string ]struct {}, len (consumedKeys ))
230+ for _ , key := range consumedKeys {
231+ consumed [string (key )] = struct {}{}
232+ }
233+ filtered := make ([][]byte , 0 , len (vc .mempoolKeys ))
234+ removed := 0
235+ for _ , key := range vc .mempoolKeys {
236+ if _ , ok := consumed [string (key )]; ok {
237+ removed ++
238+ continue
239+ }
240+ filtered = append (filtered , key )
241+ }
242+ if removed < len (consumed ) {
243+ log .Warnw ("mempool cleanup removed fewer keys than expected" ,
244+ "expectedUnique" , len (consumed ), "removed" , removed )
226245 }
227- return transactions
246+ vc . mempoolKeys = filtered
228247}
0 commit comments