Skip to content

Commit 38162c4

Browse files
quakeCopilot
andcommitted
Refactor channel index to pubkey
Reuse prefix 64 with pubkey-based keys, update channel store/query interfaces, and add migration to rebuild channel index from channel states. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b18f692 commit 38162c4

File tree

8 files changed

+163
-60
lines changed

8 files changed

+163
-60
lines changed

crates/fiber-lib/src/fiber/channel.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4614,13 +4614,13 @@ impl ChannelActorState {
46144614
fn update_graph_for_remote_channel_change(&mut self) {
46154615
if let Some(channel_update_info) = self.get_remote_channel_update_info() {
46164616
if let Some(channel_outpoint) = self.get_funding_transaction_outpoint() {
4617-
let peer_id = self.get_remote_pubkey();
4617+
let pubkey = self.get_remote_pubkey();
46184618
self.network()
46194619
.send_message(NetworkActorMessage::new_event(
46204620
NetworkActorEvent::OwnedChannelUpdateEvent(
46214621
super::graph::OwnedChannelUpdateEvent::Updated(
46224622
channel_outpoint,
4623-
peer_id,
4623+
pubkey,
46244624
channel_update_info,
46254625
),
46264626
),
@@ -4646,14 +4646,14 @@ impl ChannelActorState {
46464646
let Some(channel_outpoint) = self.get_funding_transaction_outpoint() else {
46474647
return;
46484648
};
4649-
let peer_id = self.get_local_pubkey();
4649+
let pubkey = self.get_local_pubkey();
46504650
let channel_update_info = self.get_local_channel_update_info();
46514651
self.network()
46524652
.send_message(NetworkActorMessage::new_event(
46534653
NetworkActorEvent::OwnedChannelUpdateEvent(
46544654
super::graph::OwnedChannelUpdateEvent::Updated(
46554655
channel_outpoint,
4656-
peer_id,
4656+
pubkey,
46574657
channel_update_info,
46584658
),
46594659
),
@@ -7917,21 +7917,21 @@ pub trait ChannelActorStateStore {
79177917
fn get_channel_actor_state(&self, id: &Hash256) -> Option<ChannelActorState>;
79187918
fn insert_channel_actor_state(&self, state: ChannelActorState);
79197919
fn delete_channel_actor_state(&self, id: &Hash256);
7920-
fn get_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256>;
7921-
fn get_active_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256> {
7922-
self.get_channel_ids_by_peer(peer_id)
7920+
fn get_channel_ids_by_pubkey(&self, pubkey: &Pubkey) -> Vec<Hash256>;
7921+
fn get_active_channel_ids_by_pubkey(&self, pubkey: &Pubkey) -> Vec<Hash256> {
7922+
self.get_channel_ids_by_pubkey(pubkey)
79237923
.into_iter()
79247924
.filter(
79257925
|id| matches!(self.get_channel_actor_state(id), Some(state) if !state.is_closed()),
79267926
)
79277927
.collect()
79287928
}
7929-
fn get_channel_states(&self, peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)>;
7929+
fn get_channel_states(&self, pubkey: Option<Pubkey>) -> Vec<(Pubkey, Hash256, ChannelState)>;
79307930
fn get_active_channel_states(
79317931
&self,
7932-
peer_id: Option<PeerId>,
7933-
) -> Vec<(PeerId, Hash256, ChannelState)> {
7934-
self.get_channel_states(peer_id)
7932+
pubkey: Option<Pubkey>,
7933+
) -> Vec<(Pubkey, Hash256, ChannelState)> {
7934+
self.get_channel_states(pubkey)
79357935
.into_iter()
79367936
.filter(|(_, _, state)| !state.is_closed())
79377937
.collect()

crates/fiber-lib/src/fiber/network.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,8 @@ where
11441144
NetworkActorCommand::MaintainConnections => {
11451145
debug!("Trying to connect to peers with mutual channels");
11461146

1147-
for (peer_id, channel_id, channel_state) in self.store.get_channel_states(None) {
1147+
for (pubkey, channel_id, channel_state) in self.store.get_channel_states(None) {
1148+
let peer_id = pubkey.tentacle_peer_id();
11481149
if state.is_connected(&peer_id) {
11491150
continue;
11501151
}
@@ -1278,7 +1279,7 @@ where
12781279
}
12791280
}
12801281
NetworkActorCommand::CheckChannelsShutdown => {
1281-
for (_peer_id, channel_id, channel_state) in self.store.get_channel_states(None) {
1282+
for (_pubkey, channel_id, channel_state) in self.store.get_channel_states(None) {
12821283
if matches!(
12831284
channel_state,
12841285
ChannelState::ChannelReady | ChannelState::ShuttingDown(..)
@@ -1327,7 +1328,8 @@ where
13271328
let mut with_channel_down_peers = HashSet::new();
13281329
let mut ready_channels_count = 0;
13291330
let mut shuttingdown_channels_count = 0;
1330-
for (peer_id, channel_id, channel_state) in self.store.get_channel_states(None) {
1331+
for (pubkey, channel_id, channel_state) in self.store.get_channel_states(None) {
1332+
let peer_id = pubkey.tentacle_peer_id();
13311333
if matches!(channel_state, ChannelState::ChannelReady) {
13321334
if let Some(actor_state) = self.store.get_channel_actor_state(&channel_id) {
13331335
ready_channels_count += 1;
@@ -4168,9 +4170,10 @@ where
41684170

41694171
if let Some(info) = self.peer_session_map.get_mut(&peer_id) {
41704172
info.features = Some(init_msg.features);
4173+
let peer_pubkey = info.pubkey;
41714174
debug_event!(_myself, "PeerInit");
41724175

4173-
for channel_id in self.store.get_active_channel_ids_by_peer(&peer_id) {
4176+
for channel_id in self.store.get_active_channel_ids_by_pubkey(&peer_pubkey) {
41744177
if let Err(e) = self.reestablish_channel(&peer_id, channel_id).await {
41754178
error!("Failed to reestablish channel {:x}: {:?}", &channel_id, &e);
41764179
}

crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
use std::cell::RefCell;
44
use std::collections::HashMap;
55

6-
use ckb_types::packed::{OutPoint, Script};
7-
use tentacle::secio::PeerId;
8-
96
use crate::fiber::channel::InboundTlcStatus;
107
use crate::fiber::channel::{
118
AppliedFlags, ChannelActorState, ChannelActorStateStore, ChannelBasePublicKeys,
@@ -15,13 +12,16 @@ use crate::fiber::channel::{
1512
use crate::fiber::hash_algorithm::HashAlgorithm;
1613
use crate::fiber::payment::PaymentCustomRecords;
1714
use crate::fiber::settle_tlc_set_command::{SettleTlcSetCommand, TlcSettlement};
18-
use crate::fiber::types::{Hash256, HoldTlc, RemoveTlcReason, TlcErrorCode, NO_SHARED_SECRET};
15+
use crate::fiber::types::{
16+
Hash256, HoldTlc, Pubkey, RemoveTlcReason, TlcErrorCode, NO_SHARED_SECRET,
17+
};
1918
use crate::gen_rand_sha256_hash;
2019
use crate::invoice::{CkbInvoice, CkbInvoiceStatus, Currency, InvoiceBuilder, InvoiceError};
2120
use crate::invoice::{InvoiceStore, PreimageStore};
2221
use crate::now_timestamp_as_millis_u64;
2322
use crate::tests::gen_utils::gen_rand_fiber_public_key;
2423
use crate::time::SystemTime;
24+
use ckb_types::packed::{OutPoint, Script};
2525

2626
/// Mock store for testing that implements PreimageStore, InvoiceStore, and ChannelActorStateStore
2727
struct MockStore {
@@ -137,11 +137,11 @@ impl ChannelActorStateStore for MockStore {
137137
self.channel_states.borrow_mut().remove(id);
138138
}
139139

140-
fn get_channel_ids_by_peer(&self, _peer_id: &PeerId) -> Vec<Hash256> {
140+
fn get_channel_ids_by_pubkey(&self, _pubkey: &Pubkey) -> Vec<Hash256> {
141141
self.channel_states.borrow().keys().cloned().collect()
142142
}
143143

144-
fn get_channel_states(&self, _peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)> {
144+
fn get_channel_states(&self, _pubkey: Option<Pubkey>) -> Vec<(Pubkey, Hash256, ChannelState)> {
145145
vec![]
146146
}
147147

crates/fiber-lib/src/rpc/channel.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -589,10 +589,7 @@ where
589589
) -> Result<ListChannelsResult, ErrorObjectOwned> {
590590
let only_pending = params.only_pending.unwrap_or_default();
591591
let include_closed = params.include_closed.unwrap_or_default();
592-
let filter_peer_id = params
593-
.pubkey
594-
.as_ref()
595-
.map(|pubkey| pubkey.tentacle_peer_id());
592+
let filter_pubkey = params.pubkey;
596593

597594
// The two filter options are mutually exclusive: `only_pending` narrows to channels
598595
// that are still opening (or failed to open), while `include_closed` broadens to
@@ -609,15 +606,15 @@ where
609606
let channel_states = if only_pending {
610607
// For pending mode, fetch all channel states (including non-active ones
611608
// like ABANDONED / FUNDING_ABORTED which are "closed" but represent failed openings)
612-
self.store.get_channel_states(filter_peer_id.clone())
609+
self.store.get_channel_states(filter_pubkey)
613610
} else if include_closed {
614-
self.store.get_channel_states(filter_peer_id.clone())
611+
self.store.get_channel_states(filter_pubkey)
615612
} else {
616-
self.store.get_active_channel_states(filter_peer_id.clone())
613+
self.store.get_active_channel_states(filter_pubkey)
617614
};
618615
let mut channels: Vec<_> = channel_states
619616
.into_iter()
620-
.filter_map(|(_peer_id, channel_id, _state)| {
617+
.filter_map(|(_pubkey, channel_id, _state)| {
621618
self.store
622619
.get_channel_actor_state(&channel_id)
623620
.and_then(|state| {
@@ -717,7 +714,7 @@ where
717714
continue;
718715
}
719716
// Apply pubkey filter if provided
720-
if let Some(ref filter_pubkey) = params.pubkey {
717+
if let Some(filter_pubkey) = filter_pubkey.as_ref() {
721718
if filter_pubkey != &record.pubkey {
722719
continue;
723720
}
@@ -775,7 +772,7 @@ where
775772
};
776773
for pending in pending_accept {
777774
// Apply pubkey filter if provided
778-
if let Some(ref filter_pubkey) = params.pubkey {
775+
if let Some(filter_pubkey) = filter_pubkey.as_ref() {
779776
if filter_pubkey != &pending.pubkey {
780777
continue;
781778
}

crates/fiber-lib/src/store/store_impl/mod.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use crate::fiber::gossip::GossipMessageStore;
2525
use crate::fiber::payment::{
2626
Attempt, AttemptStatus, PaymentCustomRecords, PaymentSession, PaymentStatus,
2727
};
28-
use crate::fiber::types::{HoldTlc, CURSOR_SIZE};
2928
#[cfg(feature = "watchtower")]
30-
use crate::fiber::types::{Privkey, Pubkey};
29+
use crate::fiber::types::Privkey;
30+
use crate::fiber::types::{HoldTlc, Pubkey, CURSOR_SIZE};
3131
use crate::{
3232
fiber::{
3333
channel::{
@@ -231,7 +231,7 @@ pub enum KeyValue {
231231
CkbInvoice(Hash256, CkbInvoice),
232232
Preimage(Hash256, Hash256),
233233
CkbInvoiceStatus(Hash256, CkbInvoiceStatus),
234-
PeerIdChannelId((PeerId, Hash256), ChannelState),
234+
PubkeyChannelId((Pubkey, Hash256), ChannelState),
235235
OutPointChannelId(OutPoint, Hash256),
236236
BroadcastMessageTimestamp(BroadcastMessageID, u64),
237237
BroadcastMessage(Cursor, BroadcastMessage),
@@ -293,12 +293,15 @@ impl StoreKeyValue for KeyValue {
293293
KeyValue::CkbInvoiceStatus(id, _) => {
294294
[&[CKB_INVOICE_STATUS_PREFIX], id.as_ref()].concat()
295295
}
296-
KeyValue::PeerIdChannelId((peer_id, channel_id), _) => [
297-
&[PEER_ID_CHANNEL_ID_PREFIX],
298-
peer_id.as_bytes(),
299-
channel_id.as_ref(),
300-
]
301-
.concat(),
296+
KeyValue::PubkeyChannelId((pubkey, channel_id), _) => {
297+
let pubkey_bytes = pubkey.serialize();
298+
[
299+
&[PEER_ID_CHANNEL_ID_PREFIX][..],
300+
&pubkey_bytes[..],
301+
channel_id.as_ref(),
302+
]
303+
.concat()
304+
}
302305
KeyValue::OutPointChannelId(outpoint, _) => {
303306
[&[CHANNEL_OUTPOINT_CHANNEL_ID_PREFIX], outpoint.as_slice()].concat()
304307
}
@@ -383,7 +386,7 @@ impl StoreKeyValue for KeyValue {
383386
KeyValue::CkbInvoice(_, invoice) => serialize_to_vec(invoice, "CkbInvoice"),
384387
KeyValue::Preimage(_, preimage) => serialize_to_vec(preimage, "Hash256"),
385388
KeyValue::CkbInvoiceStatus(_, status) => serialize_to_vec(status, "CkbInvoiceStatus"),
386-
KeyValue::PeerIdChannelId(_, state) => serialize_to_vec(state, "ChannelState"),
389+
KeyValue::PubkeyChannelId(_, state) => serialize_to_vec(state, "ChannelState"),
387390
KeyValue::OutPointChannelId(_, channel_id) => serialize_to_vec(channel_id, "ChannelId"),
388391
KeyValue::PaymentSession(_, payment_session) => {
389392
serialize_to_vec(payment_session, "PaymentSession")
@@ -444,8 +447,8 @@ impl ChannelActorStateStore for Store {
444447
fn insert_channel_actor_state(&self, state: ChannelActorState) {
445448
let mut batch = self.batch();
446449

447-
batch.put_kv(KeyValue::PeerIdChannelId(
448-
(state.get_remote_peer_id(), state.id),
450+
batch.put_kv(KeyValue::PubkeyChannelId(
451+
(state.get_remote_pubkey(), state.id),
449452
state.state,
450453
));
451454
if let Some(outpoint) = state.get_funding_transaction_outpoint() {
@@ -459,10 +462,11 @@ impl ChannelActorStateStore for Store {
459462
if let Some(state) = self.get_channel_actor_state(id) {
460463
let mut batch = self.batch();
461464
batch.delete([&[CHANNEL_ACTOR_STATE_PREFIX], id.as_ref()].concat());
465+
let remote_pubkey_bytes = state.get_remote_pubkey().serialize();
462466
batch.delete(
463467
[
464-
&[PEER_ID_CHANNEL_ID_PREFIX],
465-
state.get_remote_peer_id().as_bytes(),
468+
&[PEER_ID_CHANNEL_ID_PREFIX][..],
469+
&remote_pubkey_bytes[..],
466470
id.as_ref(),
467471
]
468472
.concat(),
@@ -474,8 +478,9 @@ impl ChannelActorStateStore for Store {
474478
}
475479
}
476480

477-
fn get_channel_ids_by_peer(&self, peer_id: &tentacle::secio::PeerId) -> Vec<Hash256> {
478-
let prefix = [&[PEER_ID_CHANNEL_ID_PREFIX], peer_id.as_bytes()].concat();
481+
fn get_channel_ids_by_pubkey(&self, pubkey: &Pubkey) -> Vec<Hash256> {
482+
let pubkey_bytes = pubkey.serialize();
483+
let prefix = [&[PEER_ID_CHANNEL_ID_PREFIX][..], &pubkey_bytes[..]].concat();
479484
let iter = self.prefix_iterator(&prefix);
480485
iter.map(|(key, _)| {
481486
let channel_id: [u8; 32] = key[prefix.len()..]
@@ -486,21 +491,24 @@ impl ChannelActorStateStore for Store {
486491
.collect()
487492
}
488493

489-
fn get_channel_states(&self, peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)> {
490-
let prefix = match peer_id {
491-
Some(peer_id) => [&[PEER_ID_CHANNEL_ID_PREFIX], peer_id.as_bytes()].concat(),
494+
fn get_channel_states(&self, pubkey: Option<Pubkey>) -> Vec<(Pubkey, Hash256, ChannelState)> {
495+
let prefix = match pubkey {
496+
Some(pubkey) => {
497+
let pubkey_bytes = pubkey.serialize();
498+
[&[PEER_ID_CHANNEL_ID_PREFIX][..], &pubkey_bytes[..]].concat()
499+
}
492500
None => vec![PEER_ID_CHANNEL_ID_PREFIX],
493501
};
494502
self.prefix_iterator(&prefix)
495503
.map(|(key, value)| {
496504
let key_len = key.len();
497-
let peer_id = PeerId::from_bytes(key[1..key_len - 32].into())
498-
.expect("deserialize peer id should be OK");
505+
let pubkey = Pubkey::from_slice(&key[1..key_len - 32])
506+
.expect("deserialize pubkey should be OK");
499507
let channel_id: [u8; 32] = key[key_len - 32..]
500508
.try_into()
501509
.expect("channel id should be 32 bytes");
502510
let state = deserialize_from(value.as_ref(), "ChannelState");
503-
(peer_id, channel_id.into(), state)
511+
(pubkey, channel_id.into(), state)
504512
})
505513
.collect()
506514
}

crates/fiber-lib/src/store/tests/store.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -576,9 +576,9 @@ fn test_channel_actor_state_store() {
576576
assert!(get_state.is_some());
577577
assert!(!get_state.unwrap().is_tlc_forwarding_enabled());
578578

579-
let remote_peer_id = state.get_remote_peer_id();
579+
let remote_pubkey = state.get_remote_pubkey();
580580
assert_eq!(
581-
store.get_channel_ids_by_peer(&remote_peer_id),
581+
store.get_channel_ids_by_pubkey(&remote_pubkey),
582582
vec![state.id]
583583
);
584584
let channel_point = state.must_get_funding_transaction_outpoint();
@@ -588,7 +588,7 @@ fn test_channel_actor_state_store() {
588588

589589
store.delete_channel_actor_state(&state.id);
590590
assert!(store.get_channel_actor_state(&state.id).is_none());
591-
assert_eq!(store.get_channel_ids_by_peer(&remote_peer_id), vec![]);
591+
assert_eq!(store.get_channel_ids_by_pubkey(&remote_pubkey), vec![]);
592592
let channel_point = state.must_get_funding_transaction_outpoint();
593593
assert!(store
594594
.get_channel_state_by_outpoint(&channel_point)
@@ -1022,12 +1022,12 @@ fn test_store_sample_channel_actor_state() {
10221022
sample.shutdown_transaction_hash
10231023
);
10241024

1025-
// Verify peer-id index
1026-
let remote_peer_id = sample.get_remote_peer_id();
1027-
let channel_ids = store.get_channel_ids_by_peer(&remote_peer_id);
1025+
// Verify pubkey index
1026+
let remote_pubkey = sample.get_remote_pubkey();
1027+
let channel_ids = store.get_channel_ids_by_pubkey(&remote_pubkey);
10281028
assert!(
10291029
channel_ids.contains(&sample.id),
1030-
"peer-id index should contain the channel id"
1030+
"pubkey index should contain the channel id"
10311031
);
10321032
}
10331033

0 commit comments

Comments
 (0)