Skip to content

Commit 47a11d1

Browse files
authored
ETCM-12351 dolos data source: bridge (#1097)
1 parent d71de18 commit 47a11d1

File tree

2 files changed

+257
-38
lines changed

2 files changed

+257
-38
lines changed

demo/node/src/data_sources.rs

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub(crate) async fn create_cached_data_sources(
8181
ServiceError::Application(format!("Failed to create mock data sources: {err}").into())
8282
}),
8383

84-
DataSourceType::Dolos => create_dolos_data_sources(metrics_opt).await.map_err(|err| {
84+
DataSourceType::Dolos => create_dolos_data_sources().await.map_err(|err| {
8585
ServiceError::Application(format!("Failed to create dolos data sources: {err}").into())
8686
}),
8787
}
@@ -101,50 +101,34 @@ pub fn create_mock_data_sources()
101101
})
102102
}
103103

104-
// TODO Currently uses db-sync for unimplemented Dolos data sources
105-
pub async fn create_dolos_data_sources(
106-
metrics_opt: Option<McFollowerMetrics>,
107-
) -> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
108-
let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?;
109-
let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?;
110-
let block_dbsync = Arc::new(
111-
partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone())
112-
.await?,
113-
);
114-
let block_dolos = Arc::new(
115-
partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone())
104+
pub async fn create_dolos_data_sources()
105+
-> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
106+
let client = partner_chains_dolos_data_sources::get_connection_from_env()?;
107+
let block = Arc::new(
108+
partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(client.clone())
116109
.await?,
117110
);
118111
Ok(DataSources {
119112
sidechain_rpc: Arc::new(
120-
partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new(
121-
dolos_client.clone(),
122-
),
113+
partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new(client.clone()),
123114
),
124115
mc_hash: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new(
125-
block_dolos.clone(),
116+
block.clone(),
126117
)),
127118
authority_selection: Arc::new(
128119
partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new(
129-
dolos_client.clone(),
120+
client.clone(),
130121
),
131122
),
132123
block_participation: Arc::new(
133-
partner_chains_dolos_data_sources::StakeDistributionDataSourceImpl::new(
134-
dolos_client.clone(),
135-
),
124+
partner_chains_dolos_data_sources::StakeDistributionDataSourceImpl::new(client.clone()),
136125
),
137126
governed_map: Arc::new(partner_chains_dolos_data_sources::GovernedMapDataSourceImpl::new(
138-
dolos_client.clone(),
127+
client.clone(),
128+
)),
129+
bridge: Arc::new(partner_chains_dolos_data_sources::TokenBridgeDataSourceImpl::new(
130+
client.clone(),
139131
)),
140-
bridge: Arc::new(
141-
partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new(
142-
pool,
143-
metrics_opt,
144-
block_dbsync,
145-
BRIDGE_TRANSFER_CACHE_LOOKAHEAD,
146-
),
147-
),
148132
})
149133
}
150134

Lines changed: 243 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,266 @@
1-
use crate::Result;
1+
use crate::{
2+
Result,
3+
client::{MiniBFClient, api::MiniBFApi, minibf::format_asset_id},
4+
};
5+
use blockfrost_openapi::models::{
6+
tx_content::TxContent, tx_content_output_amount_inner::TxContentOutputAmountInner,
7+
tx_content_utxo::TxContentUtxo,
8+
};
9+
use cardano_serialization_lib::PlutusData;
10+
use partner_chains_plutus_data::bridge::{TokenTransferDatum, TokenTransferDatumV1};
211
use sidechain_domain::*;
312
use sp_partner_chains_bridge::{
413
BridgeDataCheckpoint, BridgeTransferV1, MainChainScripts, TokenBridgeDataSource,
514
};
15+
use std::fmt::Debug;
616
use std::marker::PhantomData;
717

818
pub struct TokenBridgeDataSourceImpl<RecipientAddress> {
19+
client: MiniBFClient,
920
_phantom: PhantomData<RecipientAddress>,
1021
}
1122

1223
impl<RecipientAddress> TokenBridgeDataSourceImpl<RecipientAddress> {
13-
pub fn new() -> Self {
14-
Self { _phantom: PhantomData::default() }
24+
pub fn new(client: MiniBFClient) -> Self {
25+
Self { client, _phantom: PhantomData::default() }
1526
}
1627
}
1728

1829
#[async_trait::async_trait]
1930
impl<RecipientAddress: Send + Sync> TokenBridgeDataSource<RecipientAddress>
2031
for TokenBridgeDataSourceImpl<RecipientAddress>
32+
where
33+
RecipientAddress: Debug,
34+
RecipientAddress: (for<'a> TryFrom<&'a [u8]>),
2135
{
2236
async fn get_transfers(
2337
&self,
24-
_main_chain_scripts: MainChainScripts,
25-
_data_checkpoint: BridgeDataCheckpoint,
26-
_max_transfers: u32,
27-
_current_mc_block: McBlockHash,
38+
main_chain_scripts: MainChainScripts,
39+
data_checkpoint: BridgeDataCheckpoint,
40+
max_transfers: u32,
41+
current_mc_block_hash: McBlockHash,
2842
) -> Result<(Vec<BridgeTransferV1<RecipientAddress>>, BridgeDataCheckpoint)> {
29-
Err("not implemented".into())
43+
let current_mc_block = self.client.blocks_by_id(current_mc_block_hash).await?;
44+
45+
let data_checkpoint = match data_checkpoint {
46+
BridgeDataCheckpoint::Utxo(utxo) => {
47+
let TxBlockInfo { block_number, tx_ix } =
48+
get_block_info_for_utxo(&self.client, utxo.tx_hash.into()).await?.ok_or(
49+
format!(
50+
"Could not find block info for data checkpoint: {data_checkpoint:?}"
51+
),
52+
)?;
53+
ResolvedBridgeDataCheckpoint::Utxo {
54+
block_number,
55+
tx_ix,
56+
tx_out_ix: utxo.index.into(),
57+
}
58+
},
59+
BridgeDataCheckpoint::Block(number) => {
60+
ResolvedBridgeDataCheckpoint::Block { number: number.into() }
61+
},
62+
};
63+
64+
let asset = AssetId {
65+
policy_id: main_chain_scripts.token_policy_id.into(),
66+
asset_name: main_chain_scripts.token_asset_name.into(),
67+
};
68+
let current_mc_block_height: McBlockNumber = McBlockNumber(
69+
current_mc_block.height.expect("current mc block has valid height") as u32,
70+
);
71+
let utxos = get_bridge_utxos_tx(
72+
&self.client,
73+
&main_chain_scripts.illiquid_circulation_supply_validator_address.into(),
74+
asset,
75+
data_checkpoint,
76+
current_mc_block_height,
77+
Some(max_transfers),
78+
)
79+
.await?;
80+
81+
let new_checkpoint = match utxos.last() {
82+
None => BridgeDataCheckpoint::Block(current_mc_block_height),
83+
Some(_) if (utxos.len() as u32) < max_transfers => {
84+
BridgeDataCheckpoint::Block(current_mc_block_height)
85+
},
86+
Some(utxo) => BridgeDataCheckpoint::Utxo(utxo.utxo_id()),
87+
};
88+
89+
let transfers = utxos.into_iter().flat_map(utxo_to_transfer).collect();
90+
91+
Ok((transfers, new_checkpoint))
92+
}
93+
}
94+
95+
fn utxo_to_transfer<RecipientAddress>(
96+
utxo: BridgeUtxo,
97+
) -> Option<BridgeTransferV1<RecipientAddress>>
98+
where
99+
RecipientAddress: for<'a> TryFrom<&'a [u8]>,
100+
{
101+
let token_delta = utxo.tokens_out.0.checked_sub(utxo.tokens_in.0)?;
102+
103+
if token_delta == 0 {
104+
return None;
105+
}
106+
107+
let token_amount = token_delta as u64;
108+
109+
let Some(datum) = utxo.datum.clone() else {
110+
return Some(BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() });
111+
};
112+
113+
let transfer = match TokenTransferDatum::try_from(datum) {
114+
Ok(TokenTransferDatum::V1(TokenTransferDatumV1::UserTransfer { receiver })) => {
115+
match RecipientAddress::try_from(receiver.0.as_ref()) {
116+
Ok(recipient) => BridgeTransferV1::UserTransfer { token_amount, recipient },
117+
Err(_) => {
118+
BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() }
119+
},
120+
}
121+
},
122+
Ok(TokenTransferDatum::V1(TokenTransferDatumV1::ReserveTransfer)) => {
123+
BridgeTransferV1::ReserveTransfer { token_amount }
124+
},
125+
Err(_) => BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() },
126+
};
127+
128+
Some(transfer)
129+
}
130+
131+
pub(crate) struct BridgeUtxo {
132+
pub(crate) block_number: McBlockNumber,
133+
pub(crate) tx_ix: McTxIndexInBlock,
134+
pub(crate) tx_hash: McTxHash,
135+
pub(crate) utxo_ix: UtxoIndex,
136+
pub(crate) tokens_out: NativeTokenAmount,
137+
pub(crate) tokens_in: NativeTokenAmount,
138+
pub(crate) datum: Option<cardano_serialization_lib::PlutusData>,
139+
}
140+
141+
impl BridgeUtxo {
142+
pub(crate) fn utxo_id(&self) -> UtxoId {
143+
UtxoId { tx_hash: self.tx_hash.into(), index: self.utxo_ix.into() }
30144
}
145+
146+
pub(crate) fn ordering_key(&self) -> UtxoOrderingKey {
147+
(self.block_number, self.tx_ix, self.utxo_ix)
148+
}
149+
}
150+
151+
pub(crate) type UtxoOrderingKey = (McBlockNumber, McTxIndexInBlock, UtxoIndex);
152+
153+
#[derive(Debug, Clone, PartialEq)]
154+
pub(crate) struct TxBlockInfo {
155+
pub(crate) block_number: McBlockNumber,
156+
pub(crate) tx_ix: McTxIndexInBlock,
157+
}
158+
159+
pub(crate) async fn get_block_info_for_utxo(
160+
client: &MiniBFClient,
161+
tx_hash: McTxHash,
162+
) -> Result<Option<TxBlockInfo>> {
163+
let tx = client.transaction_by_hash(tx_hash).await?;
164+
Ok(Some(TxBlockInfo {
165+
block_number: McBlockNumber(tx.block_height as u32),
166+
tx_ix: McTxIndexInBlock(tx.index as u32),
167+
}))
168+
}
169+
170+
#[derive(Clone)]
171+
pub(crate) enum ResolvedBridgeDataCheckpoint {
172+
Utxo { block_number: McBlockNumber, tx_ix: McTxIndexInBlock, tx_out_ix: UtxoIndex },
173+
Block { number: McBlockNumber },
174+
}
175+
176+
impl ResolvedBridgeDataCheckpoint {
177+
fn block_number(&self) -> McBlockNumber {
178+
match self {
179+
ResolvedBridgeDataCheckpoint::Utxo { block_number, .. } => *block_number,
180+
ResolvedBridgeDataCheckpoint::Block { number } => *number,
181+
}
182+
}
183+
}
184+
185+
pub(crate) async fn get_bridge_utxos_tx(
186+
client: &MiniBFClient,
187+
icp_address: &MainchainAddress,
188+
native_token: AssetId,
189+
checkpoint: ResolvedBridgeDataCheckpoint,
190+
to_block: McBlockNumber,
191+
max_utxos: Option<u32>,
192+
) -> Result<Vec<BridgeUtxo>> {
193+
let txs = client.assets_transactions(native_token.clone()).await?;
194+
let checkpoint_block_no = checkpoint.block_number().0;
195+
let futures = txs.into_iter().map(|a| async move {
196+
let block_no = a.block_height as u32;
197+
if checkpoint_block_no < block_no && block_no <= to_block.0 {
198+
let tx_hash = McTxHash::from_hex_unsafe(&a.tx_hash);
199+
let utxos = client.transactions_utxos(tx_hash).await?;
200+
let tx = client.transaction_by_hash(tx_hash).await?;
201+
Result::Ok(Some((utxos, tx)))
202+
} else {
203+
Result::Ok(None)
204+
}
205+
});
206+
let mut bridge_utxos = futures::future::try_join_all(futures)
207+
.await?
208+
.iter()
209+
.flatten()
210+
.flat_map(|(utxos, tx): &(TxContentUtxo, TxContent)| {
211+
let inputs = utxos.inputs.iter().filter(|i| i.address == icp_address.to_string());
212+
let outputs = utxos.outputs.iter().filter(|o| o.address == icp_address.to_string());
213+
let native_token = native_token.clone();
214+
let checkpoint_clone = checkpoint.clone();
215+
outputs.filter_map(move |output| {
216+
let native_token = native_token.clone();
217+
let output_tokens = get_all_tokens(&output.amount, &native_token.clone());
218+
let input_tokens = inputs
219+
.clone()
220+
.map(move |input| get_all_tokens(&input.amount, &native_token.clone()))
221+
.sum();
222+
223+
match checkpoint_clone {
224+
ResolvedBridgeDataCheckpoint::Utxo { tx_ix, tx_out_ix, .. }
225+
if tx.block_height <= tx_ix.0 as i32
226+
&& output.output_index <= tx_out_ix.0.into() =>
227+
{
228+
None
229+
},
230+
_ => Some(BridgeUtxo {
231+
block_number: McBlockNumber(tx.block_height as u32),
232+
tokens_out: NativeTokenAmount(output_tokens),
233+
tokens_in: NativeTokenAmount(input_tokens),
234+
datum: output
235+
.inline_datum
236+
.clone()
237+
.map(|d| PlutusData::from_hex(&d).expect("valid datum")),
238+
tx_ix: McTxIndexInBlock(tx.index as u32),
239+
tx_hash: McTxHash::from_hex_unsafe(&tx.hash),
240+
utxo_ix: UtxoIndex(output.output_index as u16),
241+
}),
242+
}
243+
})
244+
})
245+
.collect::<Vec<_>>();
246+
bridge_utxos.sort_by_key(|b| b.ordering_key());
247+
248+
if let Some(max_utxos) = max_utxos {
249+
bridge_utxos.truncate(max_utxos as usize);
250+
}
251+
252+
Ok(bridge_utxos)
253+
}
254+
255+
fn get_all_tokens(amount: &Vec<TxContentOutputAmountInner>, asset_id: &AssetId) -> u128 {
256+
amount
257+
.iter()
258+
.map(|v| {
259+
if v.unit == format_asset_id(asset_id) {
260+
v.quantity.parse::<u128>().expect("valid quantity is u128")
261+
} else {
262+
0u128
263+
}
264+
})
265+
.sum()
31266
}

0 commit comments

Comments
 (0)