Skip to content

Feat/wallet staking sync #2593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions wallet/src/actors/app/handlers/create_data_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ use actix::prelude::*;
use serde::{Deserialize, Serialize};
use witnet_config::defaults::PSEUDO_CONSENSUS_CONSTANTS_WIP0022_REWARD_COLLATERAL_RATIO;
use witnet_data_structures::{
chain::{tapi::current_active_wips, DataRequestOutput, Hashable},
chain::{tapi::current_active_wips, DataRequestOutput},
fee::{deserialize_fee_backwards_compatible, AbsoluteFee, Fee},
proto::ProtobufConvert,
proto::{
versioning::{ProtocolVersion, VersionedHashable},
ProtobufConvert,
},
transaction::Transaction,
};

Expand Down Expand Up @@ -99,7 +102,8 @@ impl Handler<CreateDataReqRequest> for app::App {
_ => vec![],
};
let transaction = transaction.transaction;
let transaction_id = hex::encode(transaction.hash().as_ref());
let transaction_id =
hex::encode(transaction.versioned_hash(ProtocolVersion::V2_0).as_ref());
let bytes = hex::encode(transaction.to_pb_bytes().unwrap());
let weight = transaction.weight();

Expand Down
3 changes: 2 additions & 1 deletion wallet/src/actors/app/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,8 @@ impl App {
let sink = act.state.get_sink(&session_id);
if let Some(balance_movement) = balance_movement.clone() {
// We send a notification to the client
let events = Some(vec![types::Event::Movement(balance_movement)]);
let events =
Some(vec![types::Event::Movement(Box::new(balance_movement))]);
act.params
.worker
.do_send(NotifyStatus(wallet, sink, events));
Expand Down
102 changes: 80 additions & 22 deletions wallet/src/actors/worker/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use serde_json::{json, Value};
use witnet_crypto::{key::ExtendedSK, mnemonic};
use witnet_data_structures::{
chain::{
Block, CheckpointBeacon, DataRequestInfo, Hashable, OutputPointer, RADRequest,
StateMachine, ValueTransferOutput,
Block, CheckpointBeacon, DataRequestInfo, OutputPointer, RADRequest, StateMachine,
ValueTransferOutput,
},
fee::AbsoluteFee,
proto::versioning::{ProtocolVersion, VersionedHashable},
proto::versioning::VersionedHashable,
transaction::Transaction,
};
use witnet_futures_utils::TryFutureExt2;
Expand Down Expand Up @@ -156,10 +156,14 @@ impl Worker {
.expect("It should always found a superconsolidated block");
let get_gen_future = self.get_block(gen_entry.1.clone());
let (block, _confirmed) = futures::executor::block_on(get_gen_future)?;

let protocol_version = self
.node
.protocol_info
.all_versions
.version_for_epoch(block.block_header.beacon.checkpoint);
CheckpointBeacon {
checkpoint: block.block_header.beacon.checkpoint,
hash_prev_block: block.hash(),
hash_prev_block: block.versioned_hash(protocol_version),
}
}
// Use provided epoch as birth date
Expand All @@ -181,10 +185,14 @@ impl Worker {
);
let get_gen_future = self.get_block(gen_entry.1.clone());
let (block, _confirmed_1) = futures::executor::block_on(get_gen_future)?;

let protocol_version = self
.node
.protocol_info
.all_versions
.version_for_epoch(block.block_header.beacon.checkpoint);
CheckpointBeacon {
checkpoint: block.block_header.beacon.checkpoint,
hash_prev_block: block.hash(),
hash_prev_block: block.versioned_hash(protocol_version),
}
}
None => {
Expand Down Expand Up @@ -566,6 +574,14 @@ impl Worker {
Transaction::Tally(tally) => Some(IndexTransactionQuery::DataRequestReport(
tally.dr_pointer.to_string(),
)),
Transaction::Stake(st) => Some(IndexTransactionQuery::InputTransactions(
st.body
.inputs
.iter()
.map(|input| *input.output_pointer())
.collect(),
)),
Transaction::Unstake(_ut) => None,
_ => None,
})
.collect();
Expand Down Expand Up @@ -623,46 +639,68 @@ impl Worker {
let result: Result<Vec<ValueTransferOutput>> = transactions
.iter()
.zip(output_pointers)
.map(|(txn, output)| match txn {
.filter_map(|(txn, output)| match txn {
Transaction::ValueTransfer(vt) => vt
.body
.outputs
.get(output.output_index as usize)
.cloned()
.ok_or_else(|| {
Error::OutputIndexNotFound(output.output_index, format!("{:?}", txn))
.map(Ok)
.or_else(|| {
Some(Err(Error::OutputIndexNotFound(
output.output_index,
format!("{:?}", txn),
)))
}),
Transaction::DataRequest(dr) => dr
.body
.outputs
.get(output.output_index as usize)
.cloned()
.ok_or_else(|| {
Error::OutputIndexNotFound(output.output_index, format!("{:?}", txn))
.map(Ok)
.or_else(|| {
Some(Err(Error::OutputIndexNotFound(
output.output_index,
format!("{:?}", txn),
)))
}),
Transaction::Tally(tally) => tally
.outputs
.get(output.output_index as usize)
.cloned()
.ok_or_else(|| {
Error::OutputIndexNotFound(output.output_index, format!("{:?}", txn))
.map(Ok)
.or_else(|| {
Some(Err(Error::OutputIndexNotFound(
output.output_index,
format!("{:?}", txn),
)))
}),
Transaction::Mint(mint) => mint
.outputs
.get(output.output_index as usize)
.cloned()
.ok_or_else(|| {
Error::OutputIndexNotFound(output.output_index, format!("{:?}", txn))
.map(Ok)
.or_else(|| {
Some(Err(Error::OutputIndexNotFound(
output.output_index,
format!("{:?}", txn),
)))
}),
Transaction::Commit(commit) => commit
.body
.outputs
.get(output.output_index as usize)
.cloned()
.ok_or_else(|| {
Error::OutputIndexNotFound(output.output_index, format!("{:?}", txn))
.map(Ok)
.or_else(|| {
Some(Err(Error::OutputIndexNotFound(
output.output_index,
format!("{:?}", txn),
)))
}),
_ => Err(Error::TransactionTypeNotSupported),
Transaction::Stake(st) => st.body.change.clone().map(Ok),
Transaction::Unstake(ut) => Some(Ok(ut.body.withdrawal.clone())),
_ => Some(Err(Error::TransactionTypeNotSupported)),
})
.collect();

Expand Down Expand Up @@ -986,7 +1024,13 @@ impl Worker {
let wallet_data = wallet.public_data()?;
let last_sync = wallet_data.last_sync;
let last_confirmed = wallet_data.last_confirmed;
let protocol_version = ProtocolVersion::from_epoch(block_beacon.checkpoint);
let checkpoint = block.block_header.beacon.checkpoint;
let protocol_version = self
.node
.protocol_info
.all_versions
.version_for_epoch(checkpoint);

let (needs_clear_pending, needs_indexing) = if block_beacon.hash_prev_block
== last_sync.hash_prev_block
&& (block_beacon.checkpoint == 0 || block_beacon.checkpoint > last_sync.checkpoint)
Expand Down Expand Up @@ -1145,7 +1189,12 @@ impl Worker {
wallet: &types::SessionWallet,
sink: types::DynamicSink,
) -> Result<CheckpointBeacon> {
let block_hash = block.hash();
let protocol_version = self
.node
.protocol_info
.all_versions
.version_for_epoch(block.block_header.beacon.checkpoint);
let block_hash = block.versioned_hash(protocol_version);

// Immediately update the local reference to the node's last beacon
let block_own_beacon = CheckpointBeacon {
Expand All @@ -1162,6 +1211,13 @@ impl Worker {
.iter()
.cloned()
.map(Transaction::from);
let stake_txns = block.txns.stake_txns.iter().cloned().map(Transaction::from);
let unstake_txns = block
.txns
.unstake_txns
.iter()
.cloned()
.map(Transaction::from);
let dr_txns = block
.txns
.data_request_txns
Expand All @@ -1180,6 +1236,8 @@ impl Worker {
.chain(dr_txns)
.chain(commit_txns)
.chain(tally_txns)
.chain(stake_txns)
.chain(unstake_txns)
.chain(std::iter::once(Transaction::Mint(block.txns.mint.clone())));

let block_info = model::Beacon {
Expand All @@ -1192,7 +1250,7 @@ impl Worker {
// Notify about the new block and every single balance movement found within.
let mut events = vec![types::Event::Block(block_info)];
for balance_movement in balance_movements {
events.push(types::Event::Movement(balance_movement));
events.push(types::Event::Movement(Box::new(balance_movement)));
}
self.notify_client(wallet, sink, Some(events)).ok();

Expand Down
31 changes: 30 additions & 1 deletion wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ use witnet_config::config::Config;
use witnet_data_structures::{
chain::{CheckpointBeacon, EpochConstants},
get_protocol_version_activation_epoch, get_protocol_version_period,
proto::versioning::ProtocolVersion,
proto::versioning::{ProtocolInfo, ProtocolVersion},
};
use witnet_net::client::tcp::JsonRpcClient;
use witnet_validations::witnessing::validate_witnessing_config;

use serde_json::json;
use witnet_futures_utils::TryFutureExt2;
use witnet_net::client::tcp::jsonrpc;

use crate::actors::app;

mod account;
Expand Down Expand Up @@ -130,6 +134,8 @@ pub fn run(conf: Config) -> Result<(), Error> {
// retrying connection using a different URL each time.
node_client.valid_connection().await;

let protocol_info = get_protocol_info(node_client.clone()).await?;

let params = params::Params {
testnet,
seed_password,
Expand All @@ -150,6 +156,7 @@ pub fn run(conf: Config) -> Result<(), Error> {
use_unconfirmed_utxos,
pending_transactions_timeout_seconds,
witnessing: witnessing_config,
protocol_info: protocol_info.clone(),
};

let last_beacon = Arc::new(RwLock::new(CheckpointBeacon {
Expand All @@ -163,6 +170,7 @@ pub fn run(conf: Config) -> Result<(), Error> {
network,
requests_timeout,
subscriptions: node_subscriptions,
protocol_info,
};

// Start wallet actors
Expand Down Expand Up @@ -194,3 +202,24 @@ pub fn run(conf: Config) -> Result<(), Error> {

Ok(())
}

async fn get_protocol_info(node_client: Arc<app::NodeClient>) -> Result<ProtocolInfo, app::Error> {
let method = String::from("protocol");
let params1 = json!(null);
let req = jsonrpc::Request::method(method)
.timeout(Duration::from_secs(5))
.params(params1)
.expect("params failed serialization");

let report: Result<_, Error> = node_client.actor.send(req).flatten_err().await;

match report {
Ok(report) => Ok(serde_json::from_value::<ProtocolInfo>(report)
.expect("Failed to deserialize protocol info")),
Err(err) => {
log::error!("getProtocolInfo failed: {}", &err);

Err(app::Error::Node(err))
}
}
}
Loading
Loading