Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 203 additions & 53 deletions src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use bitcoin::Network;
use lightning::impl_writeable_tlv_based_enum;
use lightning::io::{self, Error, ErrorKind};
use lightning::sign::{EntropySource as LdkEntropySource, RandomBytes};
use lightning::util::persist::{KVStore, KVStoreSync};
use lightning::util::persist::{
KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse,
};
use lightning::util::ser::{Readable, Writeable};
use prost::Message;
use vss_client::client::VssClient;
Expand Down Expand Up @@ -377,6 +379,52 @@ impl KVStore for VssStore {
}
}

impl PaginatedKVStoreSync for VssStore {
fn list_paginated(
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
) -> io::Result<PaginatedListResponse> {
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
let msg = format!("Failed to access internal runtime");
Error::new(ErrorKind::Other, msg)
})?;
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let inner = Arc::clone(&self.inner);
let fut = async move {
inner
.list_paginated_internal(
&inner.blocking_client,
primary_namespace,
secondary_namespace,
page_token,
)
.await
};
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}
}

impl PaginatedKVStore for VssStore {
fn list_paginated(
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send {
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let inner = Arc::clone(&self.inner);
async move {
inner
.list_paginated_internal(
&inner.async_client,
primary_namespace,
secondary_namespace,
page_token,
)
.await
}
}
}

impl Drop for VssStore {
fn drop(&mut self) {
let internal_runtime = self.internal_runtime.take();
Expand Down Expand Up @@ -474,35 +522,32 @@ impl VssStoreInner {
}
}

async fn list_all_keys(
async fn list_keys(
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: &str,
secondary_namespace: &str,
) -> io::Result<Vec<String>> {
let mut page_token = None;
let mut keys = vec![];
secondary_namespace: &str, page_token: Option<String>, page_size: Option<i32>,
) -> io::Result<(Vec<String>, Option<String>)> {
let key_prefix = self.build_obfuscated_prefix(primary_namespace, secondary_namespace);
while page_token != Some("".to_string()) {
let request = ListKeyVersionsRequest {
store_id: self.store_id.clone(),
key_prefix: Some(key_prefix.clone()),
page_token,
page_size: None,
};
let request = ListKeyVersionsRequest {
store_id: self.store_id.clone(),
key_prefix: Some(key_prefix),
page_token,
page_size,
};

let response = client.list_key_versions(&request).await.map_err(|e| {
let msg = format!(
"Failed to list keys in {}/{}: {}",
primary_namespace, secondary_namespace, e
);
Error::new(ErrorKind::Other, msg)
})?;
let response = client.list_key_versions(&request).await.map_err(|e| {
let msg = format!(
"Failed to list keys in {}/{}: {}",
primary_namespace, secondary_namespace, e
);
Error::new(ErrorKind::Other, msg)
})?;

for kv in response.key_versions {
keys.push(self.extract_key(&kv.key)?);
}
page_token = response.next_page_token;
let mut keys = Vec::with_capacity(response.key_versions.len());
for kv in response.key_versions {
keys.push(self.extract_key(&kv.key)?);
}
Ok(keys)

Ok((keys, response.next_page_token))
}

async fn read_internal(
Expand Down Expand Up @@ -624,20 +669,51 @@ impl VssStoreInner {
) -> io::Result<Vec<String>> {
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;

let keys = self
.list_all_keys(client, &primary_namespace, &secondary_namespace)
.await
.map_err(|e| {
let msg = format!(
"Failed to retrieve keys in namespace: {}/{} : {}",
primary_namespace, secondary_namespace, e
);
Error::new(ErrorKind::Other, msg)
})?;

let mut page_token: Option<String> = None;
let mut keys = vec![];
loop {
let (page_keys, next_page_token) = self
.list_keys(client, &primary_namespace, &secondary_namespace, page_token, None)
.await?;
keys.extend(page_keys);
match next_page_token {
Some(t) if !t.is_empty() => page_token = Some(t),
_ => break,
}
}
Ok(keys)
}

async fn list_paginated_internal(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this is duplicating a lot of the logic of list_all_keys. Rather than doing that, can we

  1. move the logic to a new method called list_keys or similar that takes the paget oken
  2. move the while page_token != Some("".to_string()) loop to list_internal, calling list_keys
  3. hence have both list_internal and list_internal_paginated reuse the same list_keys code to avoid duplication

Copy link
Copy Markdown
Contributor Author

@benthecarman benthecarman Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, also fixed a potential issue where if the VSS server returned None for the page token we could go into an infinite loop.

&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
secondary_namespace: String, page_token: Option<PageToken>,
) -> io::Result<PaginatedListResponse> {
check_namespace_key_validity(
&primary_namespace,
&secondary_namespace,
None,
"list_paginated",
)?;

const PAGE_SIZE: i32 = 50;

let vss_page_token = page_token.map(|t| t.to_string());
let (keys, next_page_token) = self
.list_keys(
client,
&primary_namespace,
&secondary_namespace,
vss_page_token,
Some(PAGE_SIZE),
)
.await?;

// VSS can use empty string to signal the last page
let next_page_token = next_page_token.filter(|t| !t.is_empty()).map(PageToken::new);

Ok(PaginatedListResponse { keys, next_page_token })
}

async fn execute_locked_write<
F: Future<Output = Result<(), lightning::io::Error>>,
FN: FnOnce() -> F,
Expand Down Expand Up @@ -1020,35 +1096,109 @@ mod tests {
use super::*;
use crate::io::test_utils::do_read_write_remove_list_persist;

#[test]
fn vss_read_write_remove_list_persist() {
fn build_vss_store() -> VssStore {
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let mut rng = rng();
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let mut node_seed = [0u8; 64];
rng.fill_bytes(&mut node_seed);
let entropy = NodeEntropy::from_seed_bytes(node_seed);
let vss_store =
VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet)
.build_with_sigs_auth(HashMap::new())
.unwrap();
VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet)
.build_with_sigs_auth(HashMap::new())
.unwrap()
}

#[test]
fn vss_read_write_remove_list_persist() {
let vss_store = build_vss_store();
do_read_write_remove_list_persist(&vss_store);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn vss_read_write_remove_list_persist_in_runtime_context() {
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let mut rng = rng();
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let mut node_seed = [0u8; 64];
rng.fill_bytes(&mut node_seed);
let entropy = NodeEntropy::from_seed_bytes(node_seed);
let vss_store =
VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet)
.build_with_sigs_auth(HashMap::new())
.unwrap();

let vss_store = build_vss_store();
do_read_write_remove_list_persist(&vss_store);
drop(vss_store)
}

#[test]
fn vss_paginated_listing() {
let store = build_vss_store();
let ns = "test_paginated";
let sub = "listing";
let num_entries = 5;

for i in 0..num_entries {
let key = format!("key_{:04}", i);
let data = vec![i as u8; 32];
KVStoreSync::write(&store, ns, sub, &key, data).unwrap();
}

let mut all_keys = Vec::new();
let mut page_token = None;

loop {
let response =
PaginatedKVStoreSync::list_paginated(&store, ns, sub, page_token).unwrap();
all_keys.extend(response.keys);
match response.next_page_token {
Some(token) => page_token = Some(token),
_ => break,
}
Comment on lines +1144 to +1147
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm here we determine whether we are done with pagination based on whether the page token is None, and not whether the list is empty. I guess just looking for clarification on the PaginatedKVStore API :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to handle empty string

}

assert_eq!(all_keys.len(), num_entries);

// Verify no duplicates
let mut unique = all_keys.clone();
unique.sort();
unique.dedup();
assert_eq!(unique.len(), num_entries);
}

#[test]
fn vss_paginated_empty_namespace() {
let store = build_vss_store();
let response =
PaginatedKVStoreSync::list_paginated(&store, "nonexistent", "ns", None).unwrap();
assert!(response.keys.is_empty());
assert!(response.next_page_token.is_none());
}

#[test]
fn vss_paginated_removal() {
let store = build_vss_store();
let ns = "test_paginated";
let sub = "removal";

KVStoreSync::write(&store, ns, sub, "a", vec![1u8; 8]).unwrap();
KVStoreSync::write(&store, ns, sub, "b", vec![2u8; 8]).unwrap();
KVStoreSync::write(&store, ns, sub, "c", vec![3u8; 8]).unwrap();

KVStoreSync::remove(&store, ns, sub, "b", false).unwrap();

let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap();
assert_eq!(response.keys.len(), 2);
assert!(response.keys.contains(&"a".to_string()));
assert!(!response.keys.contains(&"b".to_string()));
assert!(response.keys.contains(&"c".to_string()));
}

#[test]
fn vss_paginated_namespace_isolation() {
let store = build_vss_store();

KVStoreSync::write(&store, "ns_a", "sub", "key_1", vec![1u8; 8]).unwrap();
KVStoreSync::write(&store, "ns_a", "sub", "key_2", vec![2u8; 8]).unwrap();
KVStoreSync::write(&store, "ns_b", "sub", "key_3", vec![3u8; 8]).unwrap();

let response = PaginatedKVStoreSync::list_paginated(&store, "ns_a", "sub", None).unwrap();
assert_eq!(response.keys.len(), 2);
assert!(response.keys.contains(&"key_1".to_string()));
assert!(response.keys.contains(&"key_2".to_string()));

let response = PaginatedKVStoreSync::list_paginated(&store, "ns_b", "sub", None).unwrap();
assert_eq!(response.keys.len(), 1);
assert!(response.keys.contains(&"key_3".to_string()));
}
}
Loading