Skip to content

Commit 5af3805

Browse files
committed
Offload watchtower TLC status queries to dedicated actor
- Add WatchtowerQueryActor to run query_tlc_status without blocking Network Actor - Queue QueryPreimage and QuerySettled jobs from CheckChannels; handle results via WatchtowerPreimageResult and WatchtowerSettledResult commands - Pass tlc_id through preimage flow so result handler can send RemoveTlc when preimage is found; only send if channel still has that received TLC Made-with: Cursor
1 parent 01cae25 commit 5af3805

File tree

3 files changed

+285
-58
lines changed

3 files changed

+285
-58
lines changed

crates/fiber-lib/src/fiber/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod in_flight_ckb_tx_actor;
1919
mod key;
2020
mod path;
2121
mod settle_tlc_set_command;
22+
mod watchtower_query_actor;
2223

2324
pub use config::FiberConfig;
2425
pub use in_flight_ckb_tx_actor::{

crates/fiber-lib/src/fiber/network.rs

Lines changed: 149 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ use super::types::{
6969
Hash256, Init, NodeAnnouncement, OpenChannel, Privkey, Pubkey, RemoveTlcFulfill,
7070
RemoveTlcReason, TlcErr, TlcErrorCode,
7171
};
72+
use super::watchtower_query_actor::{WatchtowerQueryActor, WatchtowerQueryActorMessage};
7273
use super::{
7374
FiberConfig, InFlightCkbTxActor, InFlightCkbTxActorArguments, InFlightCkbTxKind,
7475
ASSUME_NETWORK_ACTOR_ALIVE,
@@ -383,6 +384,21 @@ pub enum NetworkActorCommand {
383384
// Get all inbound channel requests that are waiting for `accept_channel`
384385
GetPendingAcceptChannels(RpcReplyPort<Result<Vec<PendingAcceptChannel>, String>>),
385386

387+
// Watchtower query result: preimage found for a TLC, settle it immediately.
388+
WatchtowerPreimageResult {
389+
channel_id: Hash256,
390+
tlc_id: u64,
391+
payment_hash: Hash256,
392+
preimage: Hash256,
393+
},
394+
// Watchtower query result: TLC is settled on-chain.
395+
// The forwarding TLC should be failed with ExpiryTooSoon.
396+
WatchtowerSettledResult {
397+
forwarding_channel_id: Hash256,
398+
forwarding_tlc_id: u64,
399+
shared_secret: [u8; 32],
400+
},
401+
386402
#[cfg(any(debug_assertions, feature = "bench"))]
387403
UpdateFeatures(FeatureVector),
388404
}
@@ -1349,28 +1365,6 @@ where
13491365
continue;
13501366
}
13511367
}
1352-
1353-
let mut payment_preimage =
1354-
self.store.get_preimage(&tlc.payment_hash);
1355-
if payment_preimage.is_none() {
1356-
if let Some(querier) = self.watchtower_querier.as_ref() {
1357-
payment_preimage = querier
1358-
.query_tlc_status(&channel_id, &tlc.payment_hash)
1359-
.await
1360-
.and_then(|s| s.preimage);
1361-
if let Some(preimage) = payment_preimage {
1362-
self.store.insert_preimage(tlc.payment_hash, preimage);
1363-
}
1364-
}
1365-
}
1366-
let Some(payment_preimage) = payment_preimage else {
1367-
continue;
1368-
};
1369-
debug!(
1370-
"Found payment preimage for channel {:?} tlc {:?}",
1371-
channel_id,
1372-
tlc.id()
1373-
);
13741368
if self
13751369
.store
13761370
.get_invoice_status(&tlc.payment_hash)
@@ -1384,6 +1378,29 @@ where
13841378
continue;
13851379
}
13861380

1381+
let payment_preimage = self.store.get_preimage(&tlc.payment_hash);
1382+
if payment_preimage.is_none() {
1383+
if let Some(wt_actor) = state.watchtower_query_actor.as_ref() {
1384+
let _ = wt_actor.send_message(
1385+
WatchtowerQueryActorMessage::QueryPreimage {
1386+
channel_id,
1387+
tlc_id: tlc.id(),
1388+
payment_hash: tlc.payment_hash,
1389+
},
1390+
);
1391+
}
1392+
continue;
1393+
}
1394+
let Some(payment_preimage) = payment_preimage else {
1395+
continue;
1396+
};
1397+
1398+
debug!(
1399+
"Found payment preimage for channel {:?} tlc {:?}",
1400+
channel_id,
1401+
tlc.id()
1402+
);
1403+
13871404
let (send, _recv) = oneshot::channel();
13881405
let rpc_reply = RpcReplyPort::from(send);
13891406

@@ -1521,43 +1538,16 @@ where
15211538
if let Some((forwarding_channel_id, forwarding_tlc_id)) =
15221539
tlc.forwarding_tlc
15231540
{
1524-
let is_settled =
1525-
if let Some(querier) = self.watchtower_querier.as_ref() {
1526-
querier
1527-
.query_tlc_status(&channel_id, &tlc.payment_hash)
1528-
.await
1529-
.is_some_and(|s| s.is_settled)
1530-
} else {
1531-
false
1532-
};
1533-
if is_settled {
1534-
let (send, _recv) = oneshot::channel();
1535-
let rpc_reply = RpcReplyPort::from(send);
1536-
if let Err(err) = state
1537-
.send_command_to_channel(
1541+
if let Some(wt_actor) = state.watchtower_query_actor.as_ref() {
1542+
let _ = wt_actor.send_message(
1543+
WatchtowerQueryActorMessage::QuerySettled {
1544+
channel_id,
1545+
payment_hash: tlc.payment_hash,
15381546
forwarding_channel_id,
1539-
ChannelCommand::RemoveTlc(
1540-
RemoveTlcCommand {
1541-
id: forwarding_tlc_id,
1542-
reason: RemoveTlcReason::RemoveTlcFail(
1543-
TlcErrPacket::new(
1544-
TlcErr::new(
1545-
TlcErrorCode::ExpiryTooSoon,
1546-
),
1547-
&tlc.shared_secret,
1548-
),
1549-
),
1550-
},
1551-
rpc_reply,
1552-
),
1553-
)
1554-
.await
1555-
{
1556-
error!(
1557-
"Failed to remove settled tlc {:?} for channel {:?}: {}",
1558-
forwarding_tlc_id, forwarding_channel_id, err
1559-
);
1560-
}
1547+
forwarding_tlc_id,
1548+
shared_secret: tlc.shared_secret,
1549+
},
1550+
);
15611551
}
15621552
}
15631553
}
@@ -1622,6 +1612,90 @@ where
16221612
}
16231613
}
16241614
}
1615+
NetworkActorCommand::WatchtowerPreimageResult {
1616+
channel_id,
1617+
tlc_id,
1618+
payment_hash,
1619+
preimage,
1620+
} => {
1621+
self.store.insert_preimage(payment_hash, preimage);
1622+
debug!(
1623+
"Found watchtower preimage for channel {:?} tlc {:?}",
1624+
channel_id, tlc_id
1625+
);
1626+
if self
1627+
.store
1628+
.get_invoice_status(&payment_hash)
1629+
.is_some_and(|s| {
1630+
!matches!(s, CkbInvoiceStatus::Open | CkbInvoiceStatus::Received)
1631+
})
1632+
{
1633+
return Ok(());
1634+
}
1635+
// Only send RemoveTlc if the channel still has this received TLC (it may have
1636+
// been removed already, e.g. by a concurrent payment winning the race).
1637+
let tlc_still_exists =
1638+
self.store
1639+
.get_channel_actor_state(&channel_id)
1640+
.is_some_and(|actor_state| {
1641+
actor_state
1642+
.get_received_tlc(TLCId::Received(tlc_id))
1643+
.is_some()
1644+
});
1645+
if tlc_still_exists {
1646+
let (send, _recv) = oneshot::channel();
1647+
let rpc_reply = RpcReplyPort::from(send);
1648+
if let Err(err) = state
1649+
.send_command_to_channel(
1650+
channel_id,
1651+
ChannelCommand::RemoveTlc(
1652+
RemoveTlcCommand {
1653+
id: tlc_id,
1654+
reason: RemoveTlcReason::RemoveTlcFulfill(RemoveTlcFulfill {
1655+
payment_preimage: preimage,
1656+
}),
1657+
},
1658+
rpc_reply,
1659+
),
1660+
)
1661+
.await
1662+
{
1663+
error!(
1664+
"Failed to remove tlc {:?} with preimage for channel {:?}: {}",
1665+
tlc_id, channel_id, err
1666+
);
1667+
}
1668+
}
1669+
}
1670+
NetworkActorCommand::WatchtowerSettledResult {
1671+
forwarding_channel_id,
1672+
forwarding_tlc_id,
1673+
shared_secret,
1674+
} => {
1675+
let (send, _recv) = oneshot::channel();
1676+
let rpc_reply = RpcReplyPort::from(send);
1677+
if let Err(err) = state
1678+
.send_command_to_channel(
1679+
forwarding_channel_id,
1680+
ChannelCommand::RemoveTlc(
1681+
RemoveTlcCommand {
1682+
id: forwarding_tlc_id,
1683+
reason: RemoveTlcReason::RemoveTlcFail(TlcErrPacket::new(
1684+
TlcErr::new(TlcErrorCode::ExpiryTooSoon),
1685+
&shared_secret,
1686+
)),
1687+
},
1688+
rpc_reply,
1689+
),
1690+
)
1691+
.await
1692+
{
1693+
error!(
1694+
"Failed to remove settled tlc {:?} for channel {:?}: {}",
1695+
forwarding_tlc_id, forwarding_channel_id, err
1696+
);
1697+
}
1698+
}
16251699
NetworkActorCommand::SettleHoldTlcSet(payment_hash) => {
16261700
self.settle_hold_tlc_set(state, payment_hash).await;
16271701
}
@@ -2928,6 +3002,8 @@ pub struct NetworkActorState<S, C> {
29283002

29293003
// Inflight payment actors
29303004
inflight_payments: HashMap<Hash256, ActorRef<PaymentActorMessage>>,
3005+
// Actor for querying watchtower TLC status without blocking the network actor.
3006+
watchtower_query_actor: Option<ActorRef<WatchtowerQueryActorMessage>>,
29313007
}
29323008

29333009
#[derive(Debug, Clone)]
@@ -4696,6 +4772,20 @@ where
46964772
let chain_actor = self.chain_actor.clone();
46974773
let features = config.gen_node_features();
46984774

4775+
let watchtower_query_actor = if let Some(querier) = self.watchtower_querier.clone() {
4776+
let actor = WatchtowerQueryActor::new(querier, myself.clone());
4777+
let (actor_ref, _) = Actor::spawn_linked(
4778+
Some("watchtower_query".to_string()),
4779+
actor,
4780+
(),
4781+
myself.get_cell(),
4782+
)
4783+
.await?;
4784+
Some(actor_ref)
4785+
} else {
4786+
None
4787+
};
4788+
46994789
let mut state = NetworkActorState {
47004790
store: self.store.clone(),
47014791
state_to_be_persisted,
@@ -4732,6 +4822,7 @@ where
47324822
funding_timeout_seconds: config.funding_timeout_seconds,
47334823
},
47344824
inflight_payments: Default::default(),
4825+
watchtower_query_actor,
47354826
};
47364827

47374828
let node_announcement = state.get_or_create_new_node_announcement_message();

0 commit comments

Comments
 (0)