Skip to content

Commit 5a863c4

Browse files
committed
Schedule rebalance in a minute if we fail
1 parent c07a0cd commit 5a863c4

File tree

2 files changed

+193
-111
lines changed

2 files changed

+193
-111
lines changed

graduated-rebalancer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ bitcoin-payment-instructions = { workspace = true }
1111
lightning = { workspace = true }
1212
lightning-invoice = { workspace = true }
1313
lightning-macros = { workspace = true }
14-
tokio = { version = "1", default-features = false, features = ["sync", "macros"] }
14+
tokio = { version = "1", default-features = false, features = ["sync", "macros", "rt", "time"] }

graduated-rebalancer/src/lib.rs

Lines changed: 192 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::future::Future;
2222
use std::pin::Pin;
2323
use std::sync::Arc;
2424

25+
const REBALANCE_RETRY_WAIT_TIME_SECS: u64 = 60;
26+
2527
/// Represents the state of an in-progress rebalance from trusted to lightning wallet
2628
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2729
pub struct RebalanceState {
@@ -337,16 +339,43 @@ pub struct GraduatedRebalancer<
337339

338340
/// In-memory cache of active on-chain rebalance (only one on-chain rebalance allowed at a time)
339341
active_onchain_rebalance: Arc<tokio::sync::Mutex<Option<OnChainRebalanceState>>>,
342+
343+
/// Handle to cancel a scheduled retry task
344+
scheduled_retry_handle: Arc<tokio::sync::Mutex<Option<tokio::task::AbortHandle>>>,
345+
}
346+
347+
impl<
348+
T: TrustedWallet,
349+
L: LightningWallet,
350+
R: RebalanceTrigger,
351+
E: EventHandler,
352+
P: RebalancePersistence,
353+
O: Logger,
354+
> Clone for GraduatedRebalancer<T, L, R, E, P, O>
355+
{
356+
fn clone(&self) -> Self {
357+
Self {
358+
trusted: Arc::clone(&self.trusted),
359+
ln_wallet: Arc::clone(&self.ln_wallet),
360+
trigger: Arc::clone(&self.trigger),
361+
event_handler: Arc::clone(&self.event_handler),
362+
persistence: Arc::clone(&self.persistence),
363+
logger: Arc::clone(&self.logger),
364+
active_trusted_rebalance: Arc::clone(&self.active_trusted_rebalance),
365+
active_onchain_rebalance: Arc::clone(&self.active_onchain_rebalance),
366+
scheduled_retry_handle: Arc::clone(&self.scheduled_retry_handle),
367+
}
368+
}
340369
}
341370

342371
impl<T, LN, R, E, P, L> GraduatedRebalancer<T, LN, R, E, P, L>
343372
where
344-
T: TrustedWallet,
345-
LN: LightningWallet,
346-
R: RebalanceTrigger,
347-
E: EventHandler,
348-
P: RebalancePersistence,
349-
L: Logger,
373+
T: TrustedWallet + 'static,
374+
LN: LightningWallet + 'static,
375+
R: RebalanceTrigger + 'static,
376+
E: EventHandler + 'static,
377+
P: RebalancePersistence + 'static,
378+
L: Logger + Send + Sync + 'static,
350379
{
351380
/// Create a new graduated rebalancer
352381
pub fn new(
@@ -362,9 +391,50 @@ where
362391
logger,
363392
active_trusted_rebalance: Arc::new(tokio::sync::Mutex::new(None)),
364393
active_onchain_rebalance: Arc::new(tokio::sync::Mutex::new(None)),
394+
scheduled_retry_handle: Arc::new(tokio::sync::Mutex::new(None)),
395+
}
396+
}
397+
398+
/// Cancel any scheduled retry task
399+
async fn cancel_scheduled_retry(&self) {
400+
let mut handle = self.scheduled_retry_handle.lock().await;
401+
if let Some(abort_handle) = handle.take() {
402+
log_debug!(self.logger, "Cancelling scheduled retry");
403+
abort_handle.abort();
365404
}
366405
}
367406

407+
/// Schedule a retry of the rebalance after a delay.
408+
/// If a rebalance is triggered before the delay, the retry will be cancelled.
409+
fn schedule_retry(&self) {
410+
let this = self.clone();
411+
let handle_mutex = Arc::clone(&self.scheduled_retry_handle);
412+
413+
tokio::spawn(async move {
414+
// Cancel any existing scheduled retry
415+
let mut handle = handle_mutex.lock().await;
416+
if let Some(abort_handle) = handle.take() {
417+
abort_handle.abort();
418+
}
419+
420+
// Create the retry task
421+
let retry_this = this.clone();
422+
let task = tokio::spawn(async move {
423+
tokio::time::sleep(std::time::Duration::from_secs(REBALANCE_RETRY_WAIT_TIME_SECS))
424+
.await;
425+
log_info!(retry_this.logger, "Executing scheduled rebalance retry");
426+
retry_this.do_rebalance_if_needed().await;
427+
});
428+
429+
*handle = Some(task.abort_handle());
430+
431+
log_info!(
432+
this.logger,
433+
"Scheduled rebalance retry in {REBALANCE_RETRY_WAIT_TIME_SECS} seconds"
434+
);
435+
});
436+
}
437+
368438
/// Does any rebalance if it meets the conditions of the tunables
369439
pub async fn do_rebalance_if_needed(&self) {
370440
self.do_trusted_rebalance_if_needed().await;
@@ -397,72 +467,84 @@ where
397467
return;
398468
}
399469

470+
// Cancel any scheduled retry since we're actually doing a rebalance now
471+
self.cancel_scheduled_retry().await;
472+
400473
log_info!(self.logger, "Initiating rebalance");
401474

402475
let transfer_amt = params.amount;
403-
if let Ok(inv) = self.ln_wallet.get_bolt11_invoice(Some(transfer_amt)).await {
404-
log_debug!(
405-
self.logger,
406-
"Attempting to pay invoice {inv} to rebalance for {transfer_amt:?}",
407-
);
476+
let inv = match self.ln_wallet.get_bolt11_invoice(Some(transfer_amt)).await {
477+
Ok(inv) => inv,
478+
Err(_) => return,
479+
};
408480

409-
let expected_hash = inv.payment_hash().to_byte_array();
410-
411-
// Create and persist rebalance state
412-
let mut state = RebalanceState {
413-
trigger_id: params.id,
414-
expected_payment_hash: expected_hash,
415-
amount_msat: transfer_amt.milli_sats(),
416-
ln_payment_received: false,
417-
trusted_payment_sent: false,
418-
trusted_payment_id: None,
419-
ln_payment_id: None,
420-
ln_fee_msat: None,
421-
trusted_fee_msat: None,
422-
};
423-
self.persistence.insert_trusted_rebalance_state(state).await;
424-
425-
match self.trusted.pay(PaymentMethod::LightningBolt11(inv), transfer_amt).await {
426-
Ok(trusted_payment_id) => {
427-
log_debug!(
428-
self.logger,
429-
"Rebalance trusted transaction initiated, id {}. Will complete via observer callbacks.",
430-
trusted_payment_id.as_hex()
431-
);
481+
log_debug!(
482+
self.logger,
483+
"Attempting to pay invoice {inv} to rebalance for {transfer_amt:?}",
484+
);
432485

433-
// persist trusted payment id
434-
state.trusted_payment_id = Some(trusted_payment_id);
435-
self.persistence.update_trusted_rebalance_state(state).await;
486+
let expected_hash = inv.payment_hash().to_byte_array();
436487

437-
// Set active rebalance
438-
*rebalance = Some(state);
488+
// Create and persist rebalance state
489+
let mut state = RebalanceState {
490+
trigger_id: params.id,
491+
expected_payment_hash: expected_hash,
492+
amount_msat: transfer_amt.milli_sats(),
493+
ln_payment_received: false,
494+
trusted_payment_sent: false,
495+
trusted_payment_id: None,
496+
ln_payment_id: None,
497+
ln_fee_msat: None,
498+
trusted_fee_msat: None,
499+
};
500+
self.persistence.insert_trusted_rebalance_state(state).await;
439501

440-
// Post initiated event
441-
self.event_handler
442-
.handle_event(RebalancerEvent::RebalanceInitiated {
443-
trigger_id: params.id,
444-
trusted_rebalance_payment_id: trusted_payment_id,
445-
amount_msat: transfer_amt.milli_sats(),
446-
})
447-
.await;
448-
},
449-
Err(e) => {
450-
log_info!(self.logger, "Rebalance trusted transaction failed with {e:?}",);
451-
452-
// Clean up persisted state
453-
self.persistence.remove_trusted_rebalance_state().await;
454-
455-
// Post failure event
456-
self.event_handler
457-
.handle_event(RebalancerEvent::RebalanceFailed {
458-
trigger_id: params.id,
459-
trusted_rebalance_payment_id: None,
460-
amount_msat: transfer_amt.milli_sats(),
461-
reason: format!("Failed to initiate trusted payment: {e:?}"),
462-
})
463-
.await;
464-
},
465-
}
502+
match self.trusted.pay(PaymentMethod::LightningBolt11(inv), transfer_amt).await {
503+
Ok(trusted_payment_id) => {
504+
log_debug!(
505+
self.logger,
506+
"Rebalance trusted transaction initiated, id {}. Will complete via observer callbacks.",
507+
trusted_payment_id.as_hex()
508+
);
509+
510+
// persist trusted payment id
511+
state.trusted_payment_id = Some(trusted_payment_id);
512+
self.persistence.update_trusted_rebalance_state(state).await;
513+
514+
// Set active rebalance
515+
*rebalance = Some(state);
516+
517+
// Post initiated event
518+
self.event_handler
519+
.handle_event(RebalancerEvent::RebalanceInitiated {
520+
trigger_id: params.id,
521+
trusted_rebalance_payment_id: trusted_payment_id,
522+
amount_msat: transfer_amt.milli_sats(),
523+
})
524+
.await;
525+
},
526+
Err(e) => {
527+
log_info!(self.logger, "Rebalance trusted transaction failed with {e:?}",);
528+
529+
// Clean up persisted state
530+
self.persistence.remove_trusted_rebalance_state().await;
531+
532+
// Post failure event
533+
self.event_handler
534+
.handle_event(RebalancerEvent::RebalanceFailed {
535+
trigger_id: params.id,
536+
trusted_rebalance_payment_id: None,
537+
amount_msat: transfer_amt.milli_sats(),
538+
reason: format!("Failed to initiate trusted payment: {e:?}"),
539+
})
540+
.await;
541+
542+
// Release the lock before scheduling retry
543+
drop(rebalance);
544+
545+
// Schedule a retry
546+
self.schedule_retry();
547+
},
466548
}
467549
}
468550

@@ -479,6 +561,9 @@ where
479561
return;
480562
}
481563

564+
// Cancel any scheduled retry since we're actually doing a rebalance now
565+
self.cancel_scheduled_retry().await;
566+
482567
let is_splice = self.ln_wallet.has_channel_with_lsp();
483568

484569
if is_splice {
@@ -500,55 +585,46 @@ where
500585
*onchain_rebalance = Some(state);
501586

502587
// Now initiate the actual operation
503-
let user_channel_id = if is_splice {
504-
match self.ln_wallet.splice_to_lsp_channel(params.amount).await {
505-
Ok(chan_id) => chan_id,
506-
Err(e) => {
507-
log_error!(self.logger, "Failed to splice to LSP channel: {e:?}");
508-
// Clean up the state if we failed
509-
let _ = onchain_rebalance.take();
510-
self.persistence.remove_onchain_rebalance_state().await;
511-
self.event_handler
512-
.handle_event(RebalancerEvent::RebalanceFailed {
513-
trigger_id: params.id,
514-
trusted_rebalance_payment_id: None,
515-
amount_msat: params.amount.milli_sats(),
516-
reason: format!("Failed to splice to LSP channel: {e:?}"),
517-
})
518-
.await;
519-
return;
520-
},
521-
}
588+
let channel_result = if is_splice {
589+
self.ln_wallet.splice_to_lsp_channel(params.amount).await
522590
} else {
523-
match self.ln_wallet.open_channel_with_lsp(params.amount).await {
524-
Ok(chan_id) => chan_id,
525-
Err(e) => {
526-
log_error!(self.logger, "Failed to open channel with LSP: {e:?}");
527-
// Clean up the state if we failed
528-
let _ = onchain_rebalance.take();
529-
self.persistence.remove_onchain_rebalance_state().await;
530-
self.event_handler
531-
.handle_event(RebalancerEvent::RebalanceFailed {
532-
trigger_id: params.id,
533-
trusted_rebalance_payment_id: None,
534-
amount_msat: params.amount.milli_sats(),
535-
reason: format!("Failed to open channel with LSP: {e:?}"),
536-
})
537-
.await;
538-
return;
539-
},
540-
}
591+
self.ln_wallet.open_channel_with_lsp(params.amount).await
541592
};
542593

543-
// Update state with the user_channel_id
544-
state.user_channel_id = Some(user_channel_id);
545-
*onchain_rebalance = Some(state);
546-
self.persistence.update_onchain_rebalance_state(state).await;
594+
match channel_result {
595+
Ok(user_channel_id) => {
596+
// Update state with the user_channel_id
597+
state.user_channel_id = Some(user_channel_id);
598+
*onchain_rebalance = Some(state);
599+
self.persistence.update_onchain_rebalance_state(state).await;
547600

548-
log_info!(
549-
self.logger,
550-
"On-chain rebalance initiated for user_channel_id {user_channel_id}. Will complete via observer callback."
551-
);
601+
log_info!(
602+
self.logger,
603+
"On-chain rebalance initiated for user_channel_id {user_channel_id}. Will complete via observer callback."
604+
);
605+
},
606+
Err(e) => {
607+
let op_name = if is_splice { "splice to" } else { "open channel with" };
608+
log_error!(self.logger, "Failed to {op_name} LSP channel: {e:?}");
609+
// Clean up the state if we failed
610+
let _ = onchain_rebalance.take();
611+
self.persistence.remove_onchain_rebalance_state().await;
612+
self.event_handler
613+
.handle_event(RebalancerEvent::RebalanceFailed {
614+
trigger_id: params.id,
615+
trusted_rebalance_payment_id: None,
616+
amount_msat: params.amount.milli_sats(),
617+
reason: format!("Failed to {op_name} LSP channel: {e:?}"),
618+
})
619+
.await;
620+
621+
// Release the lock before scheduling retry
622+
drop(onchain_rebalance);
623+
624+
// Schedule a retry
625+
self.schedule_retry();
626+
},
627+
}
552628
}
553629

554630
/// Called when the trusted wallet confirms sending a payment
@@ -633,6 +709,12 @@ where
633709
// Clean up
634710
let _ = rebalance.take();
635711
self.persistence.remove_trusted_rebalance_state().await;
712+
713+
// Release the lock before scheduling retry
714+
drop(rebalance);
715+
716+
// Schedule a retry
717+
self.schedule_retry();
636718
}
637719
}
638720
}

0 commit comments

Comments
 (0)