Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion rust/main/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,27 @@ impl BaseAgent for Relayer {
.map(|(key, origin)| (key.id(), origin.prover_sync.clone()))
.collect();

// Build deposit-force config if available (requires sender AND REST URL)
let deposit_force = if let Some(dym_args) = self.dymension_kaspa_args.as_ref() {
let sender_guard = dym_args.force_sender.read().unwrap();
let rest_url = dym_args
.kas_provider
.conf()
.kaspa_urls_rest
.first()
.map(|u| u.to_string());

match (sender_guard.as_ref(), rest_url) {
(Some(sender), Some(url)) => Some(relayer_server::DepositForceConfig {
sender: sender.clone(),
rest_api_url: url,
}),
_ => None,
}
} else {
None
};

let relayer_router = relayer_server::Server::new(self.destinations.len())
.with_op_retry(sender.clone())
.with_message_queue(prep_queues)
Expand All @@ -596,7 +617,8 @@ impl BaseAgent for Relayer {
self.dymension_kaspa_args
.as_ref()
.and_then(|dym_args| dym_args.kas_provider.kaspa_db().cloned()),
) // Set kaspa_db to server_builder from dymension_args provider if available
)
.with_deposit_force(deposit_force)
.router();

let server = self
Expand Down Expand Up @@ -1126,6 +1148,8 @@ impl Relayer {
struct DymensionKaspaArgs {
kas_provider: Box<KaspaProvider>,
dym_mailbox: Arc<CosmosNativeMailbox>,
/// Sender for force-deposit requests, populated when Foo is created
force_sender: Arc<std::sync::RwLock<Option<hyperlane_base::kas_hack::DepositForceSender>>>,
}

// Manual Debug since KaspaMailbox now has a trait object
Expand All @@ -1135,6 +1159,7 @@ impl std::fmt::Debug for DymensionKaspaArgs {
.field("kas_provider", &self.kas_provider)
.field("kas_mailbox", &"KaspaMailbox")
.field("dym_mailbox", &self.dym_mailbox)
.field("force_sender", &"<RwLock>")
.finish()
}
}
Expand Down Expand Up @@ -1197,6 +1222,7 @@ impl Relayer {
Ok(Some(DymensionKaspaArgs {
kas_provider,
dym_mailbox,
force_sender: Arc::new(std::sync::RwLock::new(None)),
}))
}

Expand All @@ -1218,6 +1244,12 @@ impl Relayer {

let b = KaspaBridgeFoo::new(kas_provider.clone(), hub_mailbox.clone(), metadata_getter);

// Store the force sender for use by the server endpoint
{
let mut sender_guard = args.force_sender.write().unwrap();
*sender_guard = Some(b.force_sender());
}

// sync relayer before starting other tasks
b.sync_hub_if_needed().await.unwrap();

Expand Down
84 changes: 84 additions & 0 deletions rust/main/agents/relayer/src/server/kaspa/deposit_force.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use axum::{extract::State, http::StatusCode, Json};
use dymension_kaspa::dym_kas_core::api::client::Deposit;
use serde::{Deserialize, Serialize};

use hyperlane_base::server::utils::{
ServerErrorBody, ServerErrorResponse, ServerResult, ServerSuccessResponse,
};

use super::ServerState;

#[derive(Clone, Debug, Deserialize)]
pub struct RequestBody {
pub kaspa_tx: String,
}

#[derive(Clone, Debug, Serialize)]
pub struct ResponseBody {
pub message: String,
pub deposit_id: String,
}

/// Force processing of a Kaspa deposit by fetching it from the REST API.
/// Useful for deposits that fell outside the normal lookback window.
///
/// POST /kaspa/deposit-force
/// Body: { "kaspa_tx": "242b5987..." }
pub async fn handler(
State(state): State<ServerState>,
Json(body): Json<RequestBody>,
) -> ServerResult<ServerSuccessResponse<ResponseBody>> {
let RequestBody { kaspa_tx } = body;
tracing::info!(%kaspa_tx, "Received deposit force request");

let (sender, client) = match (&state.force_sender, &state.http_client) {
(Some(s), Some(c)) => (s, c),
_ => {
return Err(ServerErrorResponse::new(
StatusCode::SERVICE_UNAVAILABLE,
ServerErrorBody {
message: "Deposit force is not enabled on this relayer".to_string(),
},
));
}
};

let tx = client.get_tx_by_id(&kaspa_tx).await.map_err(|e| {
tracing::error!(%kaspa_tx, error = ?e, "Failed to fetch transaction from Kaspa API");
ServerErrorResponse::new(
StatusCode::NOT_FOUND,
ServerErrorBody {
message: format!("Transaction not found or API error: {}", e),
},
)
})?;

let deposit: Deposit = tx.try_into().map_err(|e: eyre::Error| {
tracing::error!(%kaspa_tx, error = ?e, "Failed to convert transaction to deposit");
ServerErrorResponse::new(
StatusCode::BAD_REQUEST,
ServerErrorBody {
message: format!("Invalid deposit transaction: {}", e),
},
)
})?;

let deposit_id = deposit.id.to_string();

sender.send(deposit).await.map_err(|e| {
tracing::error!(%kaspa_tx, error = ?e, "Failed to send deposit to processing channel");
ServerErrorResponse::new(
StatusCode::INTERNAL_SERVER_ERROR,
ServerErrorBody {
message: "Failed to queue deposit for processing".to_string(),
},
)
})?;

tracing::info!(%kaspa_tx, %deposit_id, "Deposit queued for processing");

Ok(ServerSuccessResponse::new(ResponseBody {
message: "Deposit queued for processing".to_string(),
deposit_id,
}))
}
38 changes: 35 additions & 3 deletions rust/main/agents/relayer/src/server/kaspa/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,50 @@
use std::sync::Arc;

use axum::{routing::get, Router};
use derive_new::new;
use axum::{
routing::{get, post},
Router,
};
use dymension_kaspa::dym_kas_core::api::{base::RateLimitConfig, client::HttpClient};
use hyperlane_base::kas_hack::DepositForceSender;
use hyperlane_core::KaspaDb;
use tower_http::cors::{Any, CorsLayer};

pub mod deposit_force;
pub mod list_deposits;
pub mod list_withdrawals;

#[derive(Clone, Debug, new)]
#[derive(Clone)]
pub struct ServerState {
pub kaspa_db: Arc<dyn KaspaDb>,
pub force_sender: Option<DepositForceSender>,
pub http_client: Option<HttpClient>,
}

impl std::fmt::Debug for ServerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServerState")
.field("kaspa_db", &"<dyn KaspaDb>")
.field("force_sender", &self.force_sender.is_some())
.field("http_client", &self.http_client.is_some())
.finish()
}
}

impl ServerState {
pub fn new(kaspa_db: Arc<dyn KaspaDb>) -> Self {
Self {
kaspa_db,
force_sender: None,
http_client: None,
}
}

pub fn with_deposit_force(mut self, sender: DepositForceSender, rest_api_url: String) -> Self {
self.force_sender = Some(sender);
self.http_client = Some(HttpClient::new(rest_api_url, RateLimitConfig::default()));
self
}

pub fn router(self) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any)
Expand All @@ -23,6 +54,7 @@ impl ServerState {
Router::new()
.route("/kaspa/deposit", get(list_deposits::handler))
.route("/kaspa/withdrawal", get(list_withdrawals::handler))
.route("/kaspa/deposit-force", post(deposit_force::handler))
.layer(cors)
.with_state(self)
}
Expand Down
22 changes: 21 additions & 1 deletion rust/main/agents/relayer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use axum::Router;
use derive_new::new;
use hyperlane_base::kas_hack::DepositForceSender;
use hyperlane_core::HyperlaneDomain;
use tokio::sync::broadcast::Sender;

Expand All @@ -27,6 +28,12 @@ pub mod messages;
pub mod operations;
pub mod proofs;

/// Config for deposit-force endpoint
pub struct DepositForceConfig {
pub sender: DepositForceSender,
pub rest_api_url: String,
}

#[derive(new)]
pub struct Server {
destination_chains: usize,
Expand All @@ -45,6 +52,8 @@ pub struct Server {
prover_syncs: Option<HashMap<u32, Arc<RwLock<MerkleTreeBuilder>>>>,
#[new(default)]
kaspa_db: Option<Arc<dyn KaspaDb>>,
#[new(default)]
deposit_force: Option<DepositForceConfig>,
}

impl Server {
Expand Down Expand Up @@ -92,6 +101,11 @@ impl Server {
self
}

pub fn with_deposit_force(mut self, config: Option<DepositForceConfig>) -> Self {
self.deposit_force = config;
self
}

// return a custom router that can be used in combination with other routers
pub fn router(self) -> Router {
let mut router = Router::new();
Expand Down Expand Up @@ -127,7 +141,13 @@ impl Server {
router = router.merge(proofs::ServerState::new(prover_syncs).router());
}
if let Some(kaspa_db) = self.kaspa_db {
router = router.merge(kaspa::ServerState::new(kaspa_db).router());
let kaspa_state = kaspa::ServerState::new(kaspa_db);
let kaspa_state = if let Some(df) = self.deposit_force {
kaspa_state.with_deposit_force(df.sender, df.rest_api_url)
} else {
kaspa_state
};
router = router.merge(kaspa_state.router());
}

let expose_environment_variable_endpoint =
Expand Down
29 changes: 28 additions & 1 deletion rust/main/hyperlane-base/src/kas_hack/logic_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use hyperlane_core::{
};
use hyperlane_cosmos::native::{h512_to_cosmos_hash, CosmosNativeMailbox};
use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc, time::Duration};
use tokio::{sync::Mutex, task::JoinHandle, time};
use tokio::{
sync::{mpsc, Mutex},
task::JoinHandle,
time,
};
use tokio_metrics::TaskMonitor;
use tracing::{debug, error, info, info_span, Instrument};

Expand All @@ -23,6 +27,9 @@ use super::{
};
use dymension_kaspa::conf::RelayerDepositTimings;

pub type DepositForceSender = mpsc::Sender<Deposit>;
pub type DepositForceReceiver = mpsc::Receiver<Deposit>;

enum DepositRelayResult {
Success {
deposit_id: String,
Expand Down Expand Up @@ -51,6 +58,8 @@ pub struct Foo<C: MetadataConstructor> {
metadata_constructor: C,
deposit_tracker: Mutex<DepositTracker>,
config: RelayerDepositTimings,
force_sender: DepositForceSender,
force_receiver: Mutex<DepositForceReceiver>,
}

impl<C: MetadataConstructor> Foo<C>
Expand All @@ -64,15 +73,23 @@ where
) -> Self {
// Get config from provider, or use defaults if not available
let config = provider.must_relayer_stuff().deposit_timings.clone();
let (force_sender, force_receiver) = mpsc::channel(100);
Self {
provider,
hub_mailbox,
metadata_constructor,
deposit_tracker: Mutex::new(DepositTracker::new()),
config,
force_sender,
force_receiver: Mutex::new(force_receiver),
}
}

/// Get a sender for submitting deposits to be recovered/reprocessed
pub fn force_sender(&self) -> DepositForceSender {
self.force_sender.clone()
}

/// Run deposit and progress indication loops
pub fn run_loops(self, task_monitor: TaskMonitor) -> JoinHandle<()> {
let foo = Arc::new(self);
Expand Down Expand Up @@ -122,6 +139,7 @@ where

loop {
self.process_deposit_queue().await;
self.process_force_requests().await;

let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
Expand Down Expand Up @@ -189,6 +207,15 @@ where
}
}

/// Process deposits submitted via the force channel
async fn process_force_requests(&self) {
let mut receiver = self.force_receiver.lock().await;
while let Ok(deposit) = receiver.try_recv() {
info!(deposit_id = %deposit.id, "Processing forced deposit");
self.queue_new_deposits(vec![deposit]).await;
}
}

/// Process the retry queue for failed deposit operations
async fn process_deposit_queue(&self) {
loop {
Expand Down
1 change: 1 addition & 0 deletions rust/main/hyperlane-base/src/kas_hack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ pub mod migration;
pub mod sync;

pub use kaspa_db::KaspaRocksDB;
pub use logic_loop::DepositForceSender;
pub use migration::run_migration_with_sync;
pub use sync::{ensure_hub_synced, format_ad_hoc_signatures};
Loading