@@ -3,23 +3,31 @@ use std::{collections::HashSet, time::Duration};
3
3
use futures:: StreamExt as _;
4
4
use jsonrpsee:: tracing:: { self , debug, info, warn} ;
5
5
use tokio:: time;
6
+ use transparent:: {
7
+ address:: Script ,
8
+ bundle:: { OutPoint , TxOut } ,
9
+ } ;
6
10
use zaino_proto:: proto:: service:: GetAddressUtxosArg ;
7
11
use zaino_state:: { FetchServiceSubscriber , LightWalletIndexer as _, ZcashIndexer } ;
8
- use zcash_client_backend:: data_api:: {
9
- OutputStatusFilter , TransactionDataRequest , TransactionStatus , TransactionStatusFilter ,
10
- WalletRead , WalletWrite ,
11
- chain:: { BlockCache , scan_cached_blocks} ,
12
- scanning:: { ScanPriority , ScanRange } ,
13
- wallet:: decrypt_and_store_transaction,
12
+ use zcash_client_backend:: {
13
+ data_api:: {
14
+ OutputStatusFilter , TransactionDataRequest , TransactionStatus , TransactionStatusFilter ,
15
+ WalletRead , WalletWrite ,
16
+ chain:: { BlockCache , scan_cached_blocks} ,
17
+ scanning:: { ScanPriority , ScanRange } ,
18
+ wallet:: decrypt_and_store_transaction,
19
+ } ,
20
+ wallet:: WalletTransparentOutput ,
14
21
} ;
15
22
use zcash_keys:: encoding:: AddressCodec ;
16
23
use zcash_primitives:: transaction:: Transaction ;
17
24
use zcash_protocol:: {
18
25
TxId ,
19
26
consensus:: { self , BlockHeight } ,
27
+ value:: Zatoshis ,
20
28
} ;
21
29
use zebra_chain:: transaction:: SerializedTransaction ;
22
- use zebra_rpc:: methods:: GetAddressTxIdsRequest ;
30
+ use zebra_rpc:: methods:: { AddressStrings , GetAddressTxIdsRequest } ;
23
31
24
32
use super :: {
25
33
TaskHandle ,
@@ -46,7 +54,7 @@ impl WalletSync {
46
54
config : & ZalletConfig ,
47
55
db : Database ,
48
56
chain_view : ChainView ,
49
- ) -> Result < ( TaskHandle , TaskHandle , TaskHandle ) , Error > {
57
+ ) -> Result < ( TaskHandle , TaskHandle , TaskHandle , TaskHandle ) , Error > {
50
58
let params = config. network ( ) ;
51
59
52
60
// Ensure the wallet is in a state that the sync tasks can work with.
@@ -68,14 +76,26 @@ impl WalletSync {
68
76
Ok ( ( ) )
69
77
} ) ;
70
78
79
+ let chain = chain_view. subscribe ( ) . await ?. inner ( ) ;
80
+ let mut db_data = db. handle ( ) . await ?;
81
+ let poll_transparent_task = tokio:: spawn ( async move {
82
+ poll_transparent ( chain, & params, db_data. as_mut ( ) ) . await ?;
83
+ Ok ( ( ) )
84
+ } ) ;
85
+
71
86
let chain = chain_view. subscribe ( ) . await ?. inner ( ) ;
72
87
let mut db_data = db. handle ( ) . await ?;
73
88
let data_requests_task = tokio:: spawn ( async move {
74
89
data_requests ( chain, & params, db_data. as_mut ( ) ) . await ?;
75
90
Ok ( ( ) )
76
91
} ) ;
77
92
78
- Ok ( ( steady_state_task, recover_history_task, data_requests_task) )
93
+ Ok ( (
94
+ steady_state_task,
95
+ recover_history_task,
96
+ poll_transparent_task,
97
+ data_requests_task,
98
+ ) )
79
99
}
80
100
}
81
101
@@ -346,6 +366,75 @@ async fn recover_history(
346
366
}
347
367
}
348
368
369
+ /// Polls the non-ephemeral transparent addresses in the wallet for UTXOs.
370
+ ///
371
+ /// Ephemeral addresses are handled by [`data_requests`].
372
+ #[ tracing:: instrument( skip_all) ]
373
+ async fn poll_transparent (
374
+ chain : FetchServiceSubscriber ,
375
+ params : & Network ,
376
+ db_data : & mut DbConnection ,
377
+ ) -> Result < ( ) , SyncError > {
378
+ info ! ( "Transparent address polling sync task started" ) ;
379
+
380
+ loop {
381
+ // Collect all of the wallet's non-ephemeral transparent addresses. We do this
382
+ // fresh every loop to ensure we incorporate changes to the address set.
383
+ //
384
+ // TODO: This is likely to be append-only unless we add support for removing an
385
+ // account from the wallet, so we could implement a more efficient strategy here
386
+ // with some changes to the `WalletRead` API. For now this is fine.
387
+ let addresses = db_data
388
+ . get_account_ids ( ) ?
389
+ . into_iter ( )
390
+ . map ( |account| db_data. get_transparent_receivers ( account, true ) )
391
+ . collect :: < Result < Vec < _ > , _ > > ( ) ?
392
+ . into_iter ( )
393
+ . flat_map ( |m| m. into_keys ( ) . map ( |addr| addr. encode ( params) ) )
394
+ . collect ( ) ;
395
+
396
+ // Open a mempool stream, which we use for its side-effect: notifying us of
397
+ // changes to the UTXO set (either due to a new mempool transaction, or the chain
398
+ // tip changing).
399
+ // TODO: Alter this once Zaino supports transactional chain view queries.
400
+ let mut mempool_stream = chain. get_mempool_stream ( ) . await ?;
401
+
402
+ // Fetch all mined UTXOs.
403
+ // TODO: I really want to use the chaininfo-aware version (which Zaino doesn't
404
+ // implement) or an equivalent Zaino index (once it exists).
405
+ info ! ( "Fetching mined UTXOs" ) ;
406
+ let utxos = chain
407
+ . z_get_address_utxos (
408
+ AddressStrings :: new_valid ( addresses) . expect ( "we just encoded these" ) ,
409
+ )
410
+ . await ?;
411
+
412
+ // Notify the wallet about all mined UTXOs.
413
+ for utxo in utxos {
414
+ let ( address, txid, index, script, value_zat, mined_height) = utxo. into_parts ( ) ;
415
+ debug ! ( "{address} has UTXO in tx {txid} at index {}" , index. index( ) ) ;
416
+
417
+ let output = WalletTransparentOutput :: from_parts (
418
+ OutPoint :: new ( txid. 0 , index. index ( ) ) ,
419
+ TxOut {
420
+ value : Zatoshis :: const_from_u64 ( value_zat) ,
421
+ script_pubkey : Script ( script. as_raw_bytes ( ) . to_vec ( ) ) ,
422
+ } ,
423
+ Some ( BlockHeight :: from_u32 ( mined_height. 0 ) ) ,
424
+ )
425
+ . expect ( "the UTXO was detected via a supported address kind" ) ;
426
+
427
+ db_data. put_received_transparent_utxo ( & output) ?;
428
+ }
429
+
430
+ // Now wait on the chain tip to change.
431
+ // TODO: Once Zaino has an index over the mempool, monitor it for changes to the
432
+ // unmined UTXO set (which we can't get directly from the stream without building
433
+ // an index because existing mempool txs can be spent within the mempool).
434
+ while mempool_stream. next ( ) . await . is_some ( ) { }
435
+ }
436
+ }
437
+
349
438
/// Fetches information that the wallet requests to complete its view of transaction
350
439
/// history.
351
440
#[ tracing:: instrument( skip_all) ]
0 commit comments