Skip to content

Commit 38d0693

Browse files
authored
Merge pull request #324 from RGB-WG/async
Async support for wallet and witness syncrhonization
2 parents cb24235 + dce6cf9 commit 38d0693

File tree

5 files changed

+150
-10
lines changed

5 files changed

+150
-10
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
strategy:
3434
fail-fast: false
3535
matrix:
36-
feature: [ bitcoin, liquid, prime, uri, binfile, serde, stl ]
36+
feature: [ async, bitcoin, liquid, prime, uri, binfile, serde, stl ]
3737
steps:
3838
- uses: actions/checkout@v4
3939
- uses: dtolnay/rust-toolchain@stable

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ rgb-persist-fs.workspace = true
8383

8484
[features]
8585
default = ["std", "bitcoin"]
86-
all = ["std", "bitcoin", "liquid", "prime", "binfile", "uri", "stl", "serde"]
86+
all = ["std", "bitcoin", "liquid", "prime", "binfile", "uri", "stl", "serde", "async"]
8787
std = ["rgb-invoice/std", "indexmap/std"]
88+
async = []
8889

8990
bitcoin = ["bp-core", "bp-invoice", "rgb-core/bitcoin", "rgb-invoice/bitcoin"]
9091
liquid = ["rgb-core/liquid", "rgb-invoice/liquid"]

src/contracts.rs

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,27 @@ where
126126
panic!("Contract {id} not found")
127127
}
128128
}
129+
130+
#[cfg(feature = "async")]
131+
#[allow(clippy::await_holding_refcell_ref)]
132+
async fn with_contract_mut_async<R>(
133+
&mut self,
134+
id: ContractId,
135+
f: impl AsyncFnOnce(&mut Contract<Sp::Stock, Sp::Pile>) -> R,
136+
) -> R {
137+
// We need this bullshit due to a failed rust `RefCell` implementation which panics if we do
138+
// this block any other way.
139+
if self.contracts.borrow().contains_key(&id) {
140+
return f(self.contracts.borrow_mut().get_mut(&id).unwrap()).await;
141+
}
142+
if let Some(mut contract) = self.persistence.contract(id) {
143+
let res = f(&mut contract).await;
144+
self.contracts.borrow_mut().insert(id, contract);
145+
res
146+
} else {
147+
panic!("Contract {id} not found")
148+
}
149+
}
129150
}
130151

131152
impl<Sp, S, C> Contracts<Sp, S, C>
@@ -270,13 +291,16 @@ where
270291
self.with_contract_mut(contract_id, |contract| contract.call(call, seals))
271292
}
272293

273-
/// Synchronize the status of all witnesses and single-use seal definitions.
294+
#[cfg(not(feature = "async"))]
295+
/// Update the status of all witnesses and single-use seal definitions.
274296
///
275297
/// Applies rollbacks or forwards if required and recomputes the state of the affected
276298
/// contracts.
277-
pub fn sync_witnesses<E: core::error::Error>(
299+
pub fn update_witnesses<E: core::error::Error>(
278300
&mut self,
279301
resolver: impl Fn(<<Sp::Pile as Pile>::Seal as RgbSeal>::WitnessId) -> Result<WitnessStatus, E>,
302+
last_block_height: u64,
303+
min_conformations: u32,
280304
) -> Result<(), MultiError<SyncError<E>, <Sp::Stock as Stock>::Error>> {
281305
let mut changed_statuses = IndexMap::<_, WitnessStatus>::new();
282306
let contract_ids = self.persistence.contract_ids().collect::<IndexSet<_>>();
@@ -285,13 +309,16 @@ where
285309
contract_id,
286310
|contract| -> Result<(), MultiError<SyncError<E>, <Sp::Stock as Stock>::Error>> {
287311
for witness_id in contract.witness_ids() {
312+
let old_status = contract.witness_status(witness_id);
313+
if matches!(old_status, WitnessStatus::Mined(height) if last_block_height - height.get() > min_conformations as u64) {
314+
continue
315+
}
288316
let new_status = match changed_statuses.get(&witness_id) {
289317
None => resolver(witness_id)
290318
.map_err(SyncError::Status)
291319
.map_err(MultiError::A),
292320
Some(witness_id) => Ok(*witness_id),
293321
}?;
294-
let old_status = contract.witness_status(witness_id);
295322
if new_status != old_status {
296323
changed_statuses.insert(witness_id, new_status);
297324
}
@@ -306,6 +333,51 @@ where
306333
Ok(())
307334
}
308335

336+
#[cfg(feature = "async")]
337+
/// Update the status of all witnesses and single-use seal definitions.
338+
///
339+
/// Applies rollbacks or forwards if required and recomputes the state of the affected
340+
/// contracts.
341+
pub async fn update_witnesses_async<E: core::error::Error>(
342+
&mut self,
343+
resolver: impl AsyncFn(
344+
<<Sp::Pile as Pile>::Seal as RgbSeal>::WitnessId,
345+
) -> Result<WitnessStatus, E>,
346+
last_block_height: u64,
347+
min_conformations: u32,
348+
) -> Result<(), MultiError<SyncError<E>, <Sp::Stock as Stock>::Error>> {
349+
let mut changed_statuses = IndexMap::<_, WitnessStatus>::new();
350+
let contract_ids = self.persistence.contract_ids().collect::<IndexSet<_>>();
351+
for contract_id in contract_ids {
352+
self.with_contract_mut_async(
353+
contract_id,
354+
async |contract| -> Result<(), MultiError<SyncError<E>, <Sp::Stock as Stock>::Error>> {
355+
for witness_id in contract.witness_ids() {
356+
let old_status = contract.witness_status(witness_id);
357+
if matches!(old_status, WitnessStatus::Mined(height) if last_block_height - height.get() > min_conformations as u64) {
358+
continue
359+
}
360+
let new_status = match changed_statuses.get(&witness_id) {
361+
None => resolver(witness_id)
362+
.await
363+
.map_err(SyncError::Status)
364+
.map_err(MultiError::A),
365+
Some(witness_id) => Ok(*witness_id),
366+
}?;
367+
if new_status != old_status {
368+
changed_statuses.insert(witness_id, new_status);
369+
}
370+
}
371+
contract
372+
.sync(changed_statuses.iter().map(|(id, status)| (*id, *status)))
373+
.map_err(MultiError::from_other_a)?;
374+
Ok(())
375+
},
376+
).await?;
377+
}
378+
Ok(())
379+
}
380+
309381
/// Include an operation and its witness to the history of known operations and the contract
310382
/// state.
311383
///

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
4141
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
4242
#![allow(clippy::type_complexity)]
43+
#![cfg_attr(feature = "async", allow(async_fn_in_trait))]
4344

4445
extern crate alloc;
4546

src/popls/bp.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ pub trait WalletProvider {
6464

6565
fn has_utxo(&self, outpoint: Outpoint) -> bool;
6666
fn utxos(&self) -> impl Iterator<Item = Outpoint>;
67-
fn sync_utxos(&mut self) -> Result<(), Self::Error>;
67+
68+
#[cfg(not(feature = "async"))]
69+
fn update_utxos(&mut self) -> Result<(), Self::Error>;
70+
#[cfg(feature = "async")]
71+
async fn update_utxos_async(&mut self) -> Result<(), Self::Error>;
6872

6973
fn register_seal(&mut self, seal: WTxoSeal);
7074
fn resolve_seals(
@@ -76,12 +80,32 @@ pub trait WalletProvider {
7680
fn next_address(&mut self) -> Address;
7781
fn next_nonce(&mut self) -> u64;
7882

83+
#[cfg(not(feature = "async"))]
7984
/// Returns a closure which can retrieve a witness status of an arbitrary transaction id
8085
/// (including the ones that are not related to the wallet).
8186
fn txid_resolver(&self) -> impl Fn(Txid) -> Result<WitnessStatus, Self::Error>;
87+
#[cfg(feature = "async")]
88+
/// Returns a closure which can retrieve a witness status of an arbitrary transaction id
89+
/// (including the ones that are not related to the wallet).
90+
fn txid_resolver_async(&self) -> impl AsyncFn(Txid) -> Result<WitnessStatus, Self::Error>;
91+
92+
#[cfg(not(feature = "async"))]
93+
/// Returns the height of the last known block.
94+
fn last_block_height(&self) -> Result<u64, Self::Error>;
95+
#[cfg(feature = "async")]
96+
/// Returns the height of the last known block.
97+
async fn last_block_height_async(&self) -> Result<u64, Self::Error>;
8298

99+
#[cfg(not(feature = "async"))]
83100
/// Broadcasts the transaction, also updating UTXO set accordingly.
84101
fn broadcast(&mut self, tx: &Tx, change: Option<(Vout, u32, u32)>) -> Result<(), Self::Error>;
102+
#[cfg(feature = "async")]
103+
/// Broadcasts the transaction, also updating UTXO set accordingly.
104+
async fn broadcast_async(
105+
&mut self,
106+
tx: &Tx,
107+
change: Option<(Vout, u32, u32)>,
108+
) -> Result<(), Self::Error>;
85109
}
86110

87111
pub trait Coinselect {
@@ -847,20 +871,62 @@ where
847871
.consume(allow_unknown, reader, seal_resolver, sig_validator)
848872
}
849873

850-
/// Synchronize a wallet UTXO set and the status of all witnesses and single-use seal
874+
#[cfg(not(feature = "async"))]
875+
/// Update a wallet UTXO set and the status of all witnesses and single-use seal
851876
/// definitions.
852877
///
853878
/// Applies rollbacks or forwards if required and recomputes the state of the affected
854879
/// contracts.
855-
pub fn sync(
880+
pub fn update(
856881
&mut self,
882+
min_conformations: u32,
857883
) -> Result<(), MultiError<SyncError<W::Error>, <Sp::Stock as Stock>::Error>> {
858884
self.wallet
859-
.sync_utxos()
885+
.update_utxos()
886+
.map_err(SyncError::Wallet)
887+
.map_err(MultiError::from_a)?;
888+
let last_height = self
889+
.wallet
890+
.last_block_height()
860891
.map_err(SyncError::Wallet)
861892
.map_err(MultiError::from_a)?;
862893
self.contracts
863-
.sync_witnesses(self.wallet.txid_resolver())
894+
.update_witnesses(self.wallet.txid_resolver(), last_height, min_conformations)
895+
.map_err(MultiError::from_other_a)
896+
}
897+
898+
#[cfg(feature = "async")]
899+
/// Update a wallet UTXO set and the status of all witnesses and single-use seal
900+
/// definitions.
901+
///
902+
/// Applies rollbacks or forwards if required and recomputes the state of the affected
903+
/// contracts.
904+
pub async fn update_async(
905+
&mut self,
906+
min_conformations: u32,
907+
) -> Result<(), MultiError<SyncError<W::Error>, <Sp::Stock as Stock>::Error>>
908+
where
909+
Sp::Stock: 'static,
910+
Sp::Pile: 'static,
911+
{
912+
self.wallet
913+
.update_utxos_async()
914+
.await
915+
.map_err(SyncError::Wallet)
916+
.map_err(MultiError::from_a)?;
917+
let last_height = self
918+
.wallet
919+
.last_block_height_async()
920+
.await
921+
.map_err(SyncError::Wallet)
922+
.map_err(MultiError::from_a)?;
923+
self.contracts
924+
.update_witnesses_async(
925+
self.wallet.txid_resolver_async(),
926+
last_height,
927+
min_conformations,
928+
)
929+
.await
864930
.map_err(MultiError::from_other_a)
865931
}
866932
}

0 commit comments

Comments
 (0)