Skip to content

Commit d5f71e8

Browse files
committed
bdk: emit update when tracked tx transitions pending to confirmed on new block
Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
1 parent de1e80f commit d5f71e8

1 file changed

Lines changed: 134 additions & 21 deletions

File tree

bdk-tokio-electrum/src/client.rs

Lines changed: 134 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,18 @@ where
216216
/// Information about a subscribed script
217217
#[derive(Debug, Clone)]
218218
struct ScriptSubscription {
219-
/// The set of transaction IDs we've seen for this script
220-
expected_txids: HashSet<Txid>,
219+
/// The expected transaction status keyed by txid.
220+
///
221+
/// Value is the Electrum height encoding (`>0` confirmed, `0/-1` mempool states).
222+
expected_tx_heights: HashMap<Txid, i64>,
221223
}
222224

223225
impl ScriptSubscription {
224226
#[inline]
225-
fn new(expected_txids: HashSet<Txid>) -> Self {
226-
Self { expected_txids }
227+
fn new(expected_tx_heights: HashMap<Txid, i64>) -> Self {
228+
Self {
229+
expected_tx_heights,
230+
}
227231
}
228232
}
229233

@@ -847,7 +851,7 @@ impl BdkElectrumClient {
847851
for (hash, _) in &scripts_to_subscribe {
848852
tracker
849853
.entry(*hash)
850-
.or_insert_with(|| ScriptSubscription::new(HashSet::new()));
854+
.or_insert_with(|| ScriptSubscription::new(HashMap::new()));
851855
}
852856
}
853857

@@ -879,17 +883,23 @@ impl BdkElectrumClient {
879883
};
880884
drop(tracker);
881885

882-
let expected_txids: HashSet<Txid> = subscription.expected_txids;
886+
let expected_tx_heights: HashMap<Txid, i64> = subscription.expected_tx_heights;
883887

884888
// Fetch current history for this script
885889
let history = self.inner.script_get_history(script_hash).await?;
886-
let current_txids: HashSet<Txid> = history.iter().map(|tx| tx.txid()).collect();
890+
let current_tx_heights: HashMap<Txid, i64> = history
891+
.iter()
892+
.map(|tx| (tx.txid(), tx.electrum_height()))
893+
.collect();
887894

888-
// Check if there are any changes
889-
if current_txids == expected_txids {
895+
// Check if there are any changes (new/evicted txs or status transitions).
896+
if current_tx_heights == expected_tx_heights {
890897
return Ok(None);
891898
}
892899

900+
let expected_txids: HashSet<Txid> = expected_tx_heights.keys().copied().collect();
901+
let current_txids: HashSet<Txid> = current_tx_heights.keys().copied().collect();
902+
893903
// Build a TxUpdate with the changes
894904
let mut tx_update: TxUpdate<ConfirmationBlockTime> = TxUpdate::default();
895905

@@ -900,17 +910,25 @@ impl BdkElectrumClient {
900910

901911
// Fetch new transactions
902912
for tx_res in history {
903-
if !expected_txids.contains(&tx_res.txid()) {
904-
let tx: Arc<Transaction> = self.fetch_tx(tx_res.txid()).await?;
913+
let txid = tx_res.txid();
914+
let electrum_height = tx_res.electrum_height();
915+
let previous_height = expected_tx_heights.get(&txid).copied();
916+
let is_new_tx = previous_height.is_none();
917+
let status_changed = previous_height.is_some_and(|h| h != electrum_height);
918+
919+
if is_new_tx {
920+
let tx: Arc<Transaction> = self.fetch_tx(txid).await?;
905921
tx_update.txs.push(tx);
922+
}
906923

907-
match tx_res.electrum_height().try_into() {
924+
if is_new_tx || status_changed {
925+
match electrum_height.try_into() {
908926
Ok(height) if height > 0 => {
909-
self.validate_merkle_for_anchor(&mut tx_update, tx_res.txid(), height)
927+
self.validate_merkle_for_anchor(&mut tx_update, txid, height)
910928
.await?;
911929
}
912930
_ => {
913-
tx_update.seen_ats.insert((tx_res.txid(), start_time));
931+
tx_update.seen_ats.insert((txid, start_time));
914932
}
915933
}
916934
}
@@ -924,7 +942,7 @@ impl BdkElectrumClient {
924942
// Update our tracker with the new expected txids
925943
let mut tracker = self.subscription_tracker.lock().await;
926944
if let Some(subscription) = tracker.get_mut(&script_hash) {
927-
subscription.expected_txids = current_txids;
945+
subscription.expected_tx_heights = current_tx_heights;
928946
}
929947

930948
Ok(Some(SyncResponse {
@@ -986,13 +1004,17 @@ impl BdkElectrumClient {
9861004
let mut batch_loaded_indexes: BTreeSet<u32> = BTreeSet::new();
9871005

9881006
for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) {
989-
let spk_history_set: HashSet<Txid> =
990-
spk_history.iter().map(|res| res.txid()).collect();
1007+
let spk_history_heights: HashMap<Txid, i64> = spk_history
1008+
.iter()
1009+
.map(|res| (res.txid(), res.electrum_height()))
1010+
.collect();
1011+
let spk_history_txids: HashSet<Txid> =
1012+
spk_history_heights.keys().copied().collect();
9911013
let script_hash = ElectrumScriptHash::new(&spk.spk);
9921014
scripts_to_subscribe.push((
9931015
script_hash,
9941016
spk_index,
995-
ScriptSubscription::new(spk_history_set.clone()),
1017+
ScriptSubscription::new(spk_history_heights.clone()),
9961018
));
9971019
result.max_subscribed_index = Some(
9981020
result
@@ -1013,7 +1035,7 @@ impl BdkElectrumClient {
10131035

10141036
tx_update.evicted_ats.extend(
10151037
spk.expected_txids
1016-
.difference(&spk_history_set)
1038+
.difference(&spk_history_txids)
10171039
.map(|&txid| (txid, start_time)),
10181040
);
10191041

@@ -2249,6 +2271,97 @@ mod tests {
22492271
assert!(saw_chain_update, "expected chain update after new block");
22502272
}
22512273

2274+
#[tokio::test]
2275+
async fn sync_stream_marks_pending_tx_confirmed_after_new_block() {
2276+
let env = TestEnv::new();
2277+
ensure_funded_wallet(&env);
2278+
let client = connected_bdk_client(&env).await;
2279+
2280+
let tracked_address = env.bitcoind.client.new_address().unwrap();
2281+
let request = FullScanRequest::<String>::builder_at(0)
2282+
.chain_tip(current_tip_checkpoint(&env))
2283+
.spks_for_keychain(
2284+
"external".to_string(),
2285+
vec![(0, tracked_address.script_pubkey())],
2286+
)
2287+
.build();
2288+
2289+
let mut stream = client.sync(request).await.unwrap();
2290+
let _ = stream
2291+
.next()
2292+
.await
2293+
.expect("expected initial event")
2294+
.expect("initial event should not error");
2295+
2296+
let txid = env
2297+
.bitcoind
2298+
.client
2299+
.send_to_address(&tracked_address, Amount::from_sat(22_000))
2300+
.unwrap()
2301+
.txid()
2302+
.unwrap();
2303+
env.electrsd.wait_tx(&txid);
2304+
2305+
let saw_pending = timeout(Duration::from_secs(20), async {
2306+
while let Some(event) = stream.next().await {
2307+
match event {
2308+
Ok(SubscribeEvent::Update(update))
2309+
if update
2310+
.tx_update
2311+
.seen_ats
2312+
.iter()
2313+
.any(|(seen_txid, _)| *seen_txid == txid) =>
2314+
{
2315+
return true;
2316+
}
2317+
Ok(SubscribeEvent::Disconnected) => return false,
2318+
Ok(_) => {}
2319+
Err(_) => return false,
2320+
}
2321+
}
2322+
false
2323+
})
2324+
.await
2325+
.unwrap_or(false);
2326+
assert!(
2327+
saw_pending,
2328+
"expected pending update for mempool transaction"
2329+
);
2330+
2331+
env.bitcoind
2332+
.client
2333+
.generate_to_address(1, &env.bitcoind.client.new_address().unwrap())
2334+
.unwrap();
2335+
let new_height = env.bitcoind.client.get_blockchain_info().unwrap().blocks as usize;
2336+
env.electrsd.wait_height(new_height);
2337+
2338+
let saw_confirmation_anchor = timeout(Duration::from_secs(20), async {
2339+
while let Some(event) = stream.next().await {
2340+
match event {
2341+
Ok(SubscribeEvent::Update(update))
2342+
if update
2343+
.tx_update
2344+
.anchors
2345+
.iter()
2346+
.any(|(_, anchor_txid)| *anchor_txid == txid) =>
2347+
{
2348+
return true;
2349+
}
2350+
Ok(SubscribeEvent::Disconnected) => return false,
2351+
Ok(_) => {}
2352+
Err(_) => return false,
2353+
}
2354+
}
2355+
false
2356+
})
2357+
.await
2358+
.unwrap_or(false);
2359+
assert!(
2360+
saw_confirmation_anchor,
2361+
"expected confirmation anchor after transaction is mined"
2362+
);
2363+
}
2364+
22522365
#[tokio::test]
22532366
async fn sync_stream_emits_disconnected_event_on_disconnect() {
22542367
let env = TestEnv::new();
@@ -2405,7 +2518,7 @@ mod tests {
24052518
let client = test_bdk_client();
24062519
let mut subscribed_hashes = HashSet::new();
24072520
let hash = ElectrumScriptHash::new(&script(0x71));
2408-
let scripts = vec![(hash, 0, ScriptSubscription::new(HashSet::new()))];
2521+
let scripts = vec![(hash, 0, ScriptSubscription::new(HashMap::new()))];
24092522

24102523
let mut saturated = false;
24112524
for i in 0..10_000_u32 {
@@ -2751,7 +2864,7 @@ mod tests {
27512864
let hash = ElectrumScriptHash::new(&script(tag));
27522865
scripts.insert(hash);
27532866
lookup.insert(hash, ("external".to_string(), index));
2754-
tracker.insert(hash, ScriptSubscription::new(HashSet::new()));
2867+
tracker.insert(hash, ScriptSubscription::new(HashMap::new()));
27552868
}
27562869
}
27572870
{

0 commit comments

Comments
 (0)