Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 14 additions & 30 deletions demo/node/src/data_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub(crate) async fn create_cached_data_sources(
ServiceError::Application(format!("Failed to create mock data sources: {err}").into())
}),

DataSourceType::Dolos => create_dolos_data_sources(metrics_opt).await.map_err(|err| {
DataSourceType::Dolos => create_dolos_data_sources().await.map_err(|err| {
ServiceError::Application(format!("Failed to create dolos data sources: {err}").into())
}),
}
Expand All @@ -101,50 +101,34 @@ pub fn create_mock_data_sources()
})
}

// TODO Currently uses db-sync for unimplemented Dolos data sources
pub async fn create_dolos_data_sources(
metrics_opt: Option<McFollowerMetrics>,
) -> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?;
let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?;
let block_dbsync = Arc::new(
partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone())
.await?,
);
let block_dolos = Arc::new(
partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone())
pub async fn create_dolos_data_sources()
-> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
let client = partner_chains_dolos_data_sources::get_connection_from_env()?;
let block = Arc::new(
partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(client.clone())
.await?,
);
Ok(DataSources {
sidechain_rpc: Arc::new(
partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new(
dolos_client.clone(),
),
partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new(client.clone()),
),
mc_hash: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new(
block_dolos.clone(),
block.clone(),
)),
authority_selection: Arc::new(
partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new(
dolos_client.clone(),
client.clone(),
),
),
block_participation: Arc::new(
partner_chains_dolos_data_sources::StakeDistributionDataSourceImpl::new(
dolos_client.clone(),
),
partner_chains_dolos_data_sources::StakeDistributionDataSourceImpl::new(client.clone()),
),
governed_map: Arc::new(partner_chains_dolos_data_sources::GovernedMapDataSourceImpl::new(
dolos_client.clone(),
client.clone(),
)),
bridge: Arc::new(partner_chains_dolos_data_sources::TokenBridgeDataSourceImpl::new(
client.clone(),
)),
bridge: Arc::new(
partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new(
pool,
metrics_opt,
block_dbsync,
BRIDGE_TRANSFER_CACHE_LOOKAHEAD,
),
),
})
}

Expand Down
251 changes: 243 additions & 8 deletions toolkit/data-sources/dolos/src/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,266 @@
use crate::Result;
use crate::{
Result,
client::{MiniBFClient, api::MiniBFApi, minibf::format_asset_id},
};
use blockfrost_openapi::models::{
tx_content::TxContent, tx_content_output_amount_inner::TxContentOutputAmountInner,
tx_content_utxo::TxContentUtxo,
};
use cardano_serialization_lib::PlutusData;
use partner_chains_plutus_data::bridge::{TokenTransferDatum, TokenTransferDatumV1};
use sidechain_domain::*;
use sp_partner_chains_bridge::{
BridgeDataCheckpoint, BridgeTransferV1, MainChainScripts, TokenBridgeDataSource,
};
use std::fmt::Debug;
use std::marker::PhantomData;

pub struct TokenBridgeDataSourceImpl<RecipientAddress> {
client: MiniBFClient,
_phantom: PhantomData<RecipientAddress>,
}

impl<RecipientAddress> TokenBridgeDataSourceImpl<RecipientAddress> {
pub fn new() -> Self {
Self { _phantom: PhantomData::default() }
pub fn new(client: MiniBFClient) -> Self {
Self { client, _phantom: PhantomData::default() }
}
}

#[async_trait::async_trait]
impl<RecipientAddress: Send + Sync> TokenBridgeDataSource<RecipientAddress>
for TokenBridgeDataSourceImpl<RecipientAddress>
where
RecipientAddress: Debug,
RecipientAddress: (for<'a> TryFrom<&'a [u8]>),
{
async fn get_transfers(
&self,
_main_chain_scripts: MainChainScripts,
_data_checkpoint: BridgeDataCheckpoint,
_max_transfers: u32,
_current_mc_block: McBlockHash,
main_chain_scripts: MainChainScripts,
data_checkpoint: BridgeDataCheckpoint,
max_transfers: u32,
current_mc_block_hash: McBlockHash,
) -> Result<(Vec<BridgeTransferV1<RecipientAddress>>, BridgeDataCheckpoint)> {
Err("not implemented".into())
let current_mc_block = self.client.blocks_by_id(current_mc_block_hash).await?;

let data_checkpoint = match data_checkpoint {
BridgeDataCheckpoint::Utxo(utxo) => {
let TxBlockInfo { block_number, tx_ix } =
get_block_info_for_utxo(&self.client, utxo.tx_hash.into()).await?.ok_or(
format!(
"Could not find block info for data checkpoint: {data_checkpoint:?}"
),
)?;
ResolvedBridgeDataCheckpoint::Utxo {
block_number,
tx_ix,
tx_out_ix: utxo.index.into(),
}
},
BridgeDataCheckpoint::Block(number) => {
ResolvedBridgeDataCheckpoint::Block { number: number.into() }
},
};

let asset = AssetId {
policy_id: main_chain_scripts.token_policy_id.into(),
asset_name: main_chain_scripts.token_asset_name.into(),
};
let current_mc_block_height: McBlockNumber = McBlockNumber(
current_mc_block.height.expect("current mc block has valid height") as u32,
);
let utxos = get_bridge_utxos_tx(
&self.client,
&main_chain_scripts.illiquid_circulation_supply_validator_address.into(),
asset,
data_checkpoint,
current_mc_block_height,
Some(max_transfers),
)
.await?;

let new_checkpoint = match utxos.last() {
None => BridgeDataCheckpoint::Block(current_mc_block_height),
Some(_) if (utxos.len() as u32) < max_transfers => {
BridgeDataCheckpoint::Block(current_mc_block_height)
},
Some(utxo) => BridgeDataCheckpoint::Utxo(utxo.utxo_id()),
};

let transfers = utxos.into_iter().flat_map(utxo_to_transfer).collect();

Ok((transfers, new_checkpoint))
}
}

fn utxo_to_transfer<RecipientAddress>(
utxo: BridgeUtxo,
) -> Option<BridgeTransferV1<RecipientAddress>>
where
RecipientAddress: for<'a> TryFrom<&'a [u8]>,
{
let token_delta = utxo.tokens_out.0.checked_sub(utxo.tokens_in.0)?;

if token_delta == 0 {
return None;
}

let token_amount = token_delta as u64;

let Some(datum) = utxo.datum.clone() else {
return Some(BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() });
};

let transfer = match TokenTransferDatum::try_from(datum) {
Ok(TokenTransferDatum::V1(TokenTransferDatumV1::UserTransfer { receiver })) => {
match RecipientAddress::try_from(receiver.0.as_ref()) {
Ok(recipient) => BridgeTransferV1::UserTransfer { token_amount, recipient },
Err(_) => {
BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() }
},
}
},
Ok(TokenTransferDatum::V1(TokenTransferDatumV1::ReserveTransfer)) => {
BridgeTransferV1::ReserveTransfer { token_amount }
},
Err(_) => BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() },
};

Some(transfer)
}

pub(crate) struct BridgeUtxo {
pub(crate) block_number: McBlockNumber,
pub(crate) tx_ix: McTxIndexInBlock,
pub(crate) tx_hash: McTxHash,
pub(crate) utxo_ix: UtxoIndex,
pub(crate) tokens_out: NativeTokenAmount,
pub(crate) tokens_in: NativeTokenAmount,
pub(crate) datum: Option<cardano_serialization_lib::PlutusData>,
}

impl BridgeUtxo {
pub(crate) fn utxo_id(&self) -> UtxoId {
UtxoId { tx_hash: self.tx_hash.into(), index: self.utxo_ix.into() }
}

pub(crate) fn ordering_key(&self) -> UtxoOrderingKey {
(self.block_number, self.tx_ix, self.utxo_ix)
}
}

pub(crate) type UtxoOrderingKey = (McBlockNumber, McTxIndexInBlock, UtxoIndex);

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct TxBlockInfo {
pub(crate) block_number: McBlockNumber,
pub(crate) tx_ix: McTxIndexInBlock,
}

pub(crate) async fn get_block_info_for_utxo(
client: &MiniBFClient,
tx_hash: McTxHash,
) -> Result<Option<TxBlockInfo>> {
let tx = client.transaction_by_hash(tx_hash).await?;
Ok(Some(TxBlockInfo {
block_number: McBlockNumber(tx.block_height as u32),
tx_ix: McTxIndexInBlock(tx.index as u32),
}))
}

#[derive(Clone)]
pub(crate) enum ResolvedBridgeDataCheckpoint {
Utxo { block_number: McBlockNumber, tx_ix: McTxIndexInBlock, tx_out_ix: UtxoIndex },
Block { number: McBlockNumber },
}

impl ResolvedBridgeDataCheckpoint {
fn block_number(&self) -> McBlockNumber {
match self {
ResolvedBridgeDataCheckpoint::Utxo { block_number, .. } => *block_number,
ResolvedBridgeDataCheckpoint::Block { number } => *number,
}
}
}

pub(crate) async fn get_bridge_utxos_tx(
client: &MiniBFClient,
icp_address: &MainchainAddress,
native_token: AssetId,
checkpoint: ResolvedBridgeDataCheckpoint,
to_block: McBlockNumber,
max_utxos: Option<u32>,
) -> Result<Vec<BridgeUtxo>> {
let txs = client.assets_transactions(native_token.clone()).await?;
let checkpoint_block_no = checkpoint.block_number().0;
let futures = txs.into_iter().map(|a| async move {
let block_no = a.block_height as u32;
if checkpoint_block_no < block_no && block_no <= to_block.0 {
let tx_hash = McTxHash::from_hex_unsafe(&a.tx_hash);
let utxos = client.transactions_utxos(tx_hash).await?;
let tx = client.transaction_by_hash(tx_hash).await?;
Result::Ok(Some((utxos, tx)))
} else {
Result::Ok(None)
}
});
let mut bridge_utxos = futures::future::try_join_all(futures)
.await?
.iter()
.flatten()
.flat_map(|(utxos, tx): &(TxContentUtxo, TxContent)| {
let inputs = utxos.inputs.iter().filter(|i| i.address == icp_address.to_string());
let outputs = utxos.outputs.iter().filter(|o| o.address == icp_address.to_string());
let native_token = native_token.clone();
let checkpoint_clone = checkpoint.clone();
outputs.filter_map(move |output| {
let native_token = native_token.clone();
let output_tokens = get_all_tokens(&output.amount, &native_token.clone());
let input_tokens = inputs
.clone()
.map(move |input| get_all_tokens(&input.amount, &native_token.clone()))
.sum();

match checkpoint_clone {
ResolvedBridgeDataCheckpoint::Utxo { tx_ix, tx_out_ix, .. }
if tx.block_height <= tx_ix.0 as i32
&& output.output_index <= tx_out_ix.0.into() =>
{
None
},
_ => Some(BridgeUtxo {
block_number: McBlockNumber(tx.block_height as u32),
tokens_out: NativeTokenAmount(output_tokens),
tokens_in: NativeTokenAmount(input_tokens),
datum: output
.inline_datum
.clone()
.map(|d| PlutusData::from_hex(&d).expect("valid datum")),
tx_ix: McTxIndexInBlock(tx.index as u32),
tx_hash: McTxHash::from_hex_unsafe(&tx.hash),
utxo_ix: UtxoIndex(output.output_index as u16),
}),
}
})
})
.collect::<Vec<_>>();
bridge_utxos.sort_by_key(|b| b.ordering_key());

if let Some(max_utxos) = max_utxos {
bridge_utxos.truncate(max_utxos as usize);
}

Ok(bridge_utxos)
}

fn get_all_tokens(amount: &Vec<TxContentOutputAmountInner>, asset_id: &AssetId) -> u128 {
amount
.iter()
.map(|v| {
if v.unit == format_asset_id(asset_id) {
v.quantity.parse::<u128>().expect("valid quantity is u128")
} else {
0u128
}
})
.sum()
}
Loading