4
4
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
5
5
//!
6
6
//! To only get block updates (exclude mempool transactions), the caller can use
7
- //! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
8
- //! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
9
- //! mempool.
7
+ //! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
8
+ //! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
10
9
#![ warn( missing_docs) ]
11
10
12
11
use bdk_core:: { BlockId , CheckPoint } ;
13
- use bitcoin:: { block:: Header , Block , BlockHash , Transaction } ;
14
- use bitcoincore_rpc:: bitcoincore_rpc_json;
12
+ use bitcoin:: { Block , BlockHash , Transaction , Txid } ;
13
+ use bitcoincore_rpc:: { bitcoincore_rpc_json, RpcApi } ;
14
+ use std:: { collections:: HashSet , ops:: Deref } ;
15
15
16
16
pub mod bip158;
17
17
@@ -22,8 +22,8 @@ pub use bitcoincore_rpc;
22
22
/// Refer to [module-level documentation] for more.
23
23
///
24
24
/// [module-level documentation]: crate
25
- pub struct Emitter < ' c , C > {
26
- client : & ' c C ,
25
+ pub struct Emitter < C > {
26
+ client : C ,
27
27
start_height : u32 ,
28
28
29
29
/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
@@ -43,28 +43,65 @@ pub struct Emitter<'c, C> {
43
43
/// The last emitted block during our last mempool emission. This is used to determine whether
44
44
/// there has been a reorg since our last mempool emission.
45
45
last_mempool_tip : Option < u32 > ,
46
+
47
+ /// A set of txids currently assumed to still be in the mempool.
48
+ ///
49
+ /// This is used to detect mempool evictions by comparing the set against the latest mempool
50
+ /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is considered
51
+ /// evicted.
52
+ ///
53
+ /// When the emitter emits a block, confirmed txids are removed from this set. This prevents
54
+ /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
55
+ expected_mempool_txids : HashSet < Txid > ,
46
56
}
47
57
48
- impl < ' c , C : bitcoincore_rpc:: RpcApi > Emitter < ' c , C > {
58
+ /// Indicates that there are no initially expected mempool transactions.
59
+ ///
60
+ /// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
61
+ /// to start empty (i.e. with no unconfirmed transactions).
62
+ pub const NO_EXPECTED_MEMPOOL_TXIDS : core:: iter:: Empty < Txid > = core:: iter:: empty ( ) ;
63
+
64
+ impl < C > Emitter < C >
65
+ where
66
+ C : Deref ,
67
+ C :: Target : RpcApi ,
68
+ {
49
69
/// Construct a new [`Emitter`].
50
70
///
51
71
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
52
72
/// can start emission from a block that connects to the original chain.
53
73
///
54
74
/// `start_height` starts emission from a given height (if there are no conflicts with the
55
75
/// original chain).
56
- pub fn new ( client : & ' c C , last_cp : CheckPoint , start_height : u32 ) -> Self {
76
+ ///
77
+ /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
78
+ /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
79
+ /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
80
+ pub fn new (
81
+ client : C ,
82
+ last_cp : CheckPoint ,
83
+ start_height : u32 ,
84
+ expected_mempool_txids : impl IntoIterator < Item = impl Into < Txid > > ,
85
+ ) -> Self {
57
86
Self {
58
87
client,
59
88
start_height,
60
89
last_cp,
61
90
last_block : None ,
62
91
last_mempool_time : 0 ,
63
92
last_mempool_tip : None ,
93
+ expected_mempool_txids : expected_mempool_txids. into_iter ( ) . map ( Into :: into) . collect ( ) ,
64
94
}
65
95
}
66
96
67
- /// Emit mempool transactions, alongside their first-seen unix timestamps.
97
+ /// Emit mempool transactions and any evicted [`Txid`]s.
98
+ ///
99
+ /// This method returns a [`MempoolEvent`] containing the full transactions (with their
100
+ /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
101
+ /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
102
+ /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height
103
+ /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
104
+ /// return an empty `evicted_txids` set.
68
105
///
69
106
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
70
107
/// ancestors are already emitted.
@@ -74,8 +111,8 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
74
111
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
75
112
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
76
113
/// at height `h`.
77
- pub fn mempool ( & mut self ) -> Result < Vec < ( Transaction , u64 ) > , bitcoincore_rpc:: Error > {
78
- let client = self . client ;
114
+ pub fn mempool ( & mut self ) -> Result < MempoolEvent , bitcoincore_rpc:: Error > {
115
+ let client = & * self . client ;
79
116
80
117
// This is the emitted tip height during the last mempool emission.
81
118
let prev_mempool_tip = self
@@ -84,15 +121,46 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
84
121
// `start_height` has been emitted.
85
122
. unwrap_or ( self . start_height . saturating_sub ( 1 ) ) ;
86
123
124
+ // Loop to make sure that the fetched mempool content and the fetched tip are consistent
125
+ // with one another.
126
+ let ( raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
127
+ // Determine if height and hash matches the best block from the RPC. Evictions are deferred
128
+ // if we are not at the best block.
129
+ let height = client. get_block_count ( ) ?;
130
+ let hash = client. get_block_hash ( height) ?;
131
+
132
+ // Get the raw mempool result from the RPC client which will be used to determine if any
133
+ // transactions have been evicted.
134
+ let mp = client. get_raw_mempool_verbose ( ) ?;
135
+ let mp_txids: HashSet < Txid > = mp. keys ( ) . copied ( ) . collect ( ) ;
136
+
137
+ if height == client. get_block_count ( ) ? && hash == client. get_block_hash ( height) ? {
138
+ break ( mp, mp_txids, height, hash) ;
139
+ }
140
+ } ;
141
+
142
+ let at_tip =
143
+ rpc_height == self . last_cp . height ( ) as u64 && rpc_block_hash == self . last_cp . hash ( ) ;
144
+
145
+ // If at tip, any expected txid missing from raw mempool is considered evicted;
146
+ // if not at tip, we don't evict anything.
147
+ let evicted_txids: HashSet < Txid > = if at_tip {
148
+ self . expected_mempool_txids
149
+ . difference ( & raw_mempool_txids)
150
+ . copied ( )
151
+ . collect ( )
152
+ } else {
153
+ HashSet :: new ( )
154
+ } ;
155
+
87
156
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88
157
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
89
158
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
90
159
// be the new latest timestamp.
91
160
let prev_mempool_time = self . last_mempool_time ;
92
161
let mut latest_time = prev_mempool_time;
93
162
94
- let txs_to_emit = client
95
- . get_raw_mempool_verbose ( ) ?
163
+ let new_txs = raw_mempool
96
164
. into_iter ( )
97
165
. filter_map ( {
98
166
let latest_time = & mut latest_time;
@@ -101,25 +169,25 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
101
169
if tx_time > * latest_time {
102
170
* latest_time = tx_time;
103
171
}
104
-
105
- // Avoid emitting transactions that are already emitted if we can guarantee
106
- // blocks containing ancestors are already emitted. The bitcoind rpc interface
107
- // provides us with the block height that the tx is introduced to the mempool.
108
- // If we have already emitted the block of height, we can assume that all
109
- // ancestor txs have been processed by the receiver.
172
+ // Best-effort check to avoid re-emitting transactions we've already emitted.
173
+ //
174
+ // Complete suppression isn't possible, since a transaction may spend outputs
175
+ // owned by the wallet. To determine if such a transaction is relevant, we must
176
+ // have already seen its ancestor(s) that contain the spent prevouts.
177
+ //
178
+ // Fortunately, bitcoind provides the block height at which the transaction
179
+ // entered the mempool. If we've already emitted that block height, we can
180
+ // reasonably assume the receiver has seen all ancestor transactions.
110
181
let is_already_emitted = tx_time <= prev_mempool_time;
111
182
let is_within_height = tx_entry. height <= prev_mempool_tip as _ ;
112
183
if is_already_emitted && is_within_height {
113
184
return None ;
114
185
}
115
-
116
186
let tx = match client. get_raw_transaction ( & txid, None ) {
117
187
Ok ( tx) => tx,
118
- // the tx is confirmed or evicted since `get_raw_mempool_verbose`
119
188
Err ( err) if err. is_not_found_error ( ) => return None ,
120
189
Err ( err) => return Some ( Err ( err) ) ,
121
190
} ;
122
-
123
191
Some ( Ok ( ( tx, tx_time as u64 ) ) )
124
192
}
125
193
} )
@@ -128,26 +196,68 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
128
196
self . last_mempool_time = latest_time;
129
197
self . last_mempool_tip = Some ( self . last_cp . height ( ) ) ;
130
198
131
- Ok ( txs_to_emit)
132
- }
199
+ // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
200
+ // still catching up to the tip and keep accumulating.
201
+ if at_tip {
202
+ self . expected_mempool_txids = new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) . collect ( ) ;
203
+ } else {
204
+ self . expected_mempool_txids
205
+ . extend ( new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) ) ;
206
+ }
133
207
134
- /// Emit the next block height and header (if any).
135
- pub fn next_header ( & mut self ) -> Result < Option < BlockEvent < Header > > , bitcoincore_rpc:: Error > {
136
- Ok ( poll ( self , |hash| self . client . get_block_header ( hash) ) ?
137
- . map ( |( checkpoint, block) | BlockEvent { block, checkpoint } ) )
208
+ Ok ( MempoolEvent {
209
+ new_txs,
210
+ evicted_txids,
211
+ latest_update_time : latest_time as u64 ,
212
+ } )
138
213
}
139
214
140
215
/// Emit the next block height and block (if any).
141
216
pub fn next_block ( & mut self ) -> Result < Option < BlockEvent < Block > > , bitcoincore_rpc:: Error > {
142
- Ok ( poll ( self , |hash| self . client . get_block ( hash) ) ?
143
- . map ( |( checkpoint, block) | BlockEvent { block, checkpoint } ) )
217
+ if let Some ( ( checkpoint, block) ) = poll ( self , move |hash, client| client. get_block ( hash) ) ? {
218
+ // Stop tracking unconfirmed transactions that have been confirmed in this block.
219
+ for tx in & block. txdata {
220
+ self . expected_mempool_txids . remove ( & tx. compute_txid ( ) ) ;
221
+ }
222
+ return Ok ( Some ( BlockEvent { block, checkpoint } ) ) ;
223
+ }
224
+ Ok ( None )
225
+ }
226
+ }
227
+
228
+ /// A new emission from mempool.
229
+ #[ derive( Debug ) ]
230
+ pub struct MempoolEvent {
231
+ /// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
232
+ ///
233
+ /// To understand the second condition, consider a receiver which filters transactions based on
234
+ /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
235
+ /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
236
+ /// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
237
+ /// block at height `h`.
238
+ pub new_txs : Vec < ( Transaction , u64 ) > ,
239
+
240
+ /// [`Txid`]s of all transactions that have been evicted from mempool.
241
+ pub evicted_txids : HashSet < Txid > ,
242
+
243
+ /// The latest timestamp of when a transaction entered the mempool.
244
+ ///
245
+ /// This is useful for setting the timestamp for evicted transactions.
246
+ pub latest_update_time : u64 ,
247
+ }
248
+
249
+ impl MempoolEvent {
250
+ /// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions.
251
+ pub fn evicted_ats ( & self ) -> impl ExactSizeIterator < Item = ( Txid , u64 ) > + ' _ {
252
+ let time = self . latest_update_time ;
253
+ self . evicted_txids . iter ( ) . map ( move |& txid| ( txid, time) )
144
254
}
145
255
}
146
256
147
257
/// A newly emitted block from [`Emitter`].
148
258
#[ derive( Debug ) ]
149
259
pub struct BlockEvent < B > {
150
- /// Either a full [`Block`] or [`Header`] of the new block.
260
+ /// The block.
151
261
pub block : B ,
152
262
153
263
/// The checkpoint of the new block.
@@ -199,9 +309,10 @@ enum PollResponse {
199
309
200
310
fn poll_once < C > ( emitter : & Emitter < C > ) -> Result < PollResponse , bitcoincore_rpc:: Error >
201
311
where
202
- C : bitcoincore_rpc:: RpcApi ,
312
+ C : Deref ,
313
+ C :: Target : RpcApi ,
203
314
{
204
- let client = emitter. client ;
315
+ let client = & * emitter. client ;
205
316
206
317
if let Some ( last_res) = & emitter. last_block {
207
318
let next_hash = if last_res. height < emitter. start_height as _ {
@@ -255,15 +366,16 @@ fn poll<C, V, F>(
255
366
get_item : F ,
256
367
) -> Result < Option < ( CheckPoint , V ) > , bitcoincore_rpc:: Error >
257
368
where
258
- C : bitcoincore_rpc:: RpcApi ,
259
- F : Fn ( & BlockHash ) -> Result < V , bitcoincore_rpc:: Error > ,
369
+ C : Deref ,
370
+ C :: Target : RpcApi ,
371
+ F : Fn ( & BlockHash , & C :: Target ) -> Result < V , bitcoincore_rpc:: Error > ,
260
372
{
261
373
loop {
262
374
match poll_once ( emitter) ? {
263
375
PollResponse :: Block ( res) => {
264
376
let height = res. height as u32 ;
265
377
let hash = res. hash ;
266
- let item = get_item ( & hash) ?;
378
+ let item = get_item ( & hash, & emitter . client ) ?;
267
379
268
380
let new_cp = emitter
269
381
. last_cp
@@ -329,3 +441,81 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
329
441
}
330
442
}
331
443
}
444
+
445
+ #[ cfg( test) ]
446
+ mod test {
447
+ use crate :: { bitcoincore_rpc:: RpcApi , Emitter , NO_EXPECTED_MEMPOOL_TXIDS } ;
448
+ use bdk_chain:: local_chain:: LocalChain ;
449
+ use bdk_testenv:: { anyhow, TestEnv } ;
450
+ use bitcoin:: { hashes:: Hash , Address , Amount , ScriptBuf , Txid , WScriptHash } ;
451
+ use std:: collections:: HashSet ;
452
+
453
+ #[ test]
454
+ fn test_expected_mempool_txids_accumulate_and_remove ( ) -> anyhow:: Result < ( ) > {
455
+ let env = TestEnv :: new ( ) ?;
456
+ let chain = LocalChain :: from_genesis_hash ( env. rpc_client ( ) . get_block_hash ( 0 ) ?) . 0 ;
457
+ let chain_tip = chain. tip ( ) ;
458
+ let mut emitter = Emitter :: new (
459
+ env. rpc_client ( ) ,
460
+ chain_tip. clone ( ) ,
461
+ 1 ,
462
+ NO_EXPECTED_MEMPOOL_TXIDS ,
463
+ ) ;
464
+
465
+ env. mine_blocks ( 100 , None ) ?;
466
+ while emitter. next_block ( ) ?. is_some ( ) { }
467
+
468
+ let spk_to_track = ScriptBuf :: new_p2wsh ( & WScriptHash :: all_zeros ( ) ) ;
469
+ let addr_to_track = Address :: from_script ( & spk_to_track, bitcoin:: Network :: Regtest ) ?;
470
+ let mut mempool_txids = HashSet :: new ( ) ;
471
+
472
+ // Send a tx at different heights and ensure txs are accumulating in expected_mempool_txids.
473
+ for _ in 0 ..10 {
474
+ let sent_txid = env. send ( & addr_to_track, Amount :: from_sat ( 1_000 ) ) ?;
475
+ mempool_txids. insert ( sent_txid) ;
476
+ emitter. mempool ( ) ?;
477
+ env. mine_blocks ( 1 , None ) ?;
478
+
479
+ for txid in & mempool_txids {
480
+ assert ! (
481
+ emitter. expected_mempool_txids. contains( txid) ,
482
+ "Expected txid {:?} missing" ,
483
+ txid
484
+ ) ;
485
+ }
486
+ }
487
+
488
+ // Process each block and check that confirmed txids are removed from from
489
+ // expected_mempool_txids.
490
+ while let Some ( block_event) = emitter. next_block ( ) ? {
491
+ let confirmed_txids: HashSet < Txid > = block_event
492
+ . block
493
+ . txdata
494
+ . iter ( )
495
+ . map ( |tx| tx. compute_txid ( ) )
496
+ . collect ( ) ;
497
+ mempool_txids = mempool_txids
498
+ . difference ( & confirmed_txids)
499
+ . copied ( )
500
+ . collect :: < HashSet < _ > > ( ) ;
501
+ for txid in confirmed_txids {
502
+ assert ! (
503
+ !emitter. expected_mempool_txids. contains( & txid) ,
504
+ "Expected txid {:?} should have been removed" ,
505
+ txid
506
+ ) ;
507
+ }
508
+ for txid in & mempool_txids {
509
+ assert ! (
510
+ emitter. expected_mempool_txids. contains( txid) ,
511
+ "Expected txid {:?} missing" ,
512
+ txid
513
+ ) ;
514
+ }
515
+ }
516
+
517
+ assert ! ( emitter. expected_mempool_txids. is_empty( ) ) ;
518
+
519
+ Ok ( ( ) )
520
+ }
521
+ }
0 commit comments