Skip to content

sync: Poll non-ephemeral transparent addresses for mined UTXOs #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions zallet/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl StartCmd {
let (
wallet_sync_steady_state_task_handle,
wallet_sync_recover_history_task_handle,
wallet_sync_poll_transparent_task_handle,
wallet_sync_data_requests_task_handle,
) = WalletSync::spawn(&config, db, chain_view).await?;

Expand All @@ -42,6 +43,7 @@ impl StartCmd {
pin!(rpc_task_handle);
pin!(wallet_sync_steady_state_task_handle);
pin!(wallet_sync_recover_history_task_handle);
pin!(wallet_sync_poll_transparent_task_handle);
pin!(wallet_sync_data_requests_task_handle);

// Wait for tasks to finish.
Expand Down Expand Up @@ -77,6 +79,13 @@ impl StartCmd {
Ok(())
}

wallet_sync_join_result = &mut wallet_sync_poll_transparent_task_handle => {
let wallet_sync_result = wallet_sync_join_result
.expect("unexpected panic in the wallet poll-transparent sync task");
info!(?wallet_sync_result, "Wallet poll-transparent sync task exited");
Ok(())
}

wallet_sync_join_result = &mut wallet_sync_data_requests_task_handle => {
let wallet_sync_result = wallet_sync_join_result
.expect("unexpected panic in the wallet data-requests sync task");
Expand All @@ -101,6 +110,7 @@ impl StartCmd {
rpc_task_handle.abort();
wallet_sync_steady_state_task_handle.abort();
wallet_sync_recover_history_task_handle.abort();
wallet_sync_poll_transparent_task_handle.abort();
wallet_sync_data_requests_task_handle.abort();

info!("All tasks have been asked to stop, waiting for remaining tasks to finish");
Expand Down
107 changes: 98 additions & 9 deletions zallet/src/components/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@ use std::{collections::HashSet, time::Duration};
use futures::StreamExt as _;
use jsonrpsee::tracing::{self, debug, info, warn};
use tokio::time;
use transparent::{
address::Script,
bundle::{OutPoint, TxOut},
};
use zaino_proto::proto::service::GetAddressUtxosArg;
use zaino_state::{FetchServiceSubscriber, LightWalletIndexer as _, ZcashIndexer};
use zcash_client_backend::data_api::{
OutputStatusFilter, TransactionDataRequest, TransactionStatus, TransactionStatusFilter,
WalletRead, WalletWrite,
chain::{BlockCache, scan_cached_blocks},
scanning::{ScanPriority, ScanRange},
wallet::decrypt_and_store_transaction,
use zcash_client_backend::{
data_api::{
OutputStatusFilter, TransactionDataRequest, TransactionStatus, TransactionStatusFilter,
WalletRead, WalletWrite,
chain::{BlockCache, scan_cached_blocks},
scanning::{ScanPriority, ScanRange},
wallet::decrypt_and_store_transaction,
},
wallet::WalletTransparentOutput,
};
use zcash_keys::encoding::AddressCodec;
use zcash_primitives::transaction::Transaction;
use zcash_protocol::{
TxId,
consensus::{self, BlockHeight},
value::Zatoshis,
};
use zebra_chain::transaction::SerializedTransaction;
use zebra_rpc::methods::GetAddressTxIdsRequest;
use zebra_rpc::methods::{AddressStrings, GetAddressTxIdsRequest};

use super::{
TaskHandle,
Expand All @@ -46,7 +54,7 @@ impl WalletSync {
config: &ZalletConfig,
db: Database,
chain_view: ChainView,
) -> Result<(TaskHandle, TaskHandle, TaskHandle), Error> {
) -> Result<(TaskHandle, TaskHandle, TaskHandle, TaskHandle), Error> {
let params = config.network();

// Ensure the wallet is in a state that the sync tasks can work with.
Expand All @@ -68,14 +76,26 @@ impl WalletSync {
Ok(())
});

let chain = chain_view.subscribe().await?.inner();
let mut db_data = db.handle().await?;
let poll_transparent_task = tokio::spawn(async move {
poll_transparent(chain, &params, db_data.as_mut()).await?;
Ok(())
});

let chain = chain_view.subscribe().await?.inner();
let mut db_data = db.handle().await?;
let data_requests_task = tokio::spawn(async move {
data_requests(chain, &params, db_data.as_mut()).await?;
Ok(())
});

Ok((steady_state_task, recover_history_task, data_requests_task))
Ok((
steady_state_task,
recover_history_task,
poll_transparent_task,
data_requests_task,
))
}
}

Expand Down Expand Up @@ -346,6 +366,75 @@ async fn recover_history(
}
}

/// Polls the non-ephemeral transparent addresses in the wallet for UTXOs.
///
/// Ephemeral addresses are handled by [`data_requests`].
#[tracing::instrument(skip_all)]
async fn poll_transparent(
chain: FetchServiceSubscriber,
params: &Network,
db_data: &mut DbConnection,
) -> Result<(), SyncError> {
info!("Transparent address polling sync task started");

loop {
// Collect all of the wallet's non-ephemeral transparent addresses. We do this
// fresh every loop to ensure we incorporate changes to the address set.
//
// TODO: This is likely to be append-only unless we add support for removing an
// account from the wallet, so we could implement a more efficient strategy here
// with some changes to the `WalletRead` API. For now this is fine.
let addresses = db_data
.get_account_ids()?
.into_iter()
.map(|account| db_data.get_transparent_receivers(account, true))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flat_map(|m| m.into_keys().map(|addr| addr.encode(params)))
.collect();

// Open a mempool stream, which we use for its side-effect: notifying us of
// changes to the UTXO set (either due to a new mempool transaction, or the chain
// tip changing).
// TODO: Alter this once Zaino supports transactional chain view queries.
let mut mempool_stream = chain.get_mempool_stream().await?;

// Fetch all mined UTXOs.
// TODO: I really want to use the chaininfo-aware version (which Zaino doesn't
// implement) or an equivalent Zaino index (once it exists).
info!("Fetching mined UTXOs");
let utxos = chain
.z_get_address_utxos(
AddressStrings::new_valid(addresses).expect("we just encoded these"),
)
.await?;

// Notify the wallet about all mined UTXOs.
for utxo in utxos {
let (address, txid, index, script, value_zat, mined_height) = utxo.into_parts();
debug!("{address} has UTXO in tx {txid} at index {}", index.index());

let output = WalletTransparentOutput::from_parts(
OutPoint::new(txid.0, index.index()),
TxOut {
value: Zatoshis::const_from_u64(value_zat),
script_pubkey: Script(script.as_raw_bytes().to_vec()),
},
Some(BlockHeight::from_u32(mined_height.0)),
)
.expect("the UTXO was detected via a supported address kind");

db_data.put_received_transparent_utxo(&output)?;
}

// Now wait on the chain tip to change.
// TODO: Once Zaino has an index over the mempool, monitor it for changes to the
// unmined UTXO set (which we can't get directly from the stream without building
// an index because existing mempool txs can be spent within the mempool).
while mempool_stream.next().await.is_some() {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent a significant amount of time adding logic to turn mempool tx outputs into mempool UTXOs, only to remember at the end that later mempool transactions can spend outputs of earlier ones. Due to transactions not being guaranteed to leave the mempool across chain tip changes (e.g. during high load), that means we can't use the mempool stream to get UTXOs, and instead need to build and maintain an index (which is Zaino's job).

}
}

/// Fetches information that the wallet requests to complete its view of transaction
/// history.
#[tracing::instrument(skip_all)]
Expand Down
Loading