Skip to content

Commit 7aa48fa

Browse files
committed
DRY up batched KVStore reads with read_all_objects helper
The parallel `JoinSet`-based batching loop was duplicated across `read_payments` and `read_pending_payments`. Extract it into a generic `read_all_objects<T: Readable>` helper that callers invoke directly with the relevant namespace constants. Per-type log messages are preserved via `std::any::type_name::<T>()`. Co-Authored-By: HAL 9000
1 parent b8123d5 commit 7aa48fa

2 files changed

Lines changed: 31 additions & 112 deletions

File tree

src/builder.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,8 @@ use crate::fee_estimator::OnchainFeeEstimator;
5959
use crate::gossip::GossipSource;
6060
use crate::io::sqlite_store::SqliteStore;
6161
use crate::io::utils::{
62-
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
63-
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
64-
read_scorer,
62+
read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache,
63+
read_network_graph, read_node_metrics, read_output_sweeper, read_peer_info, read_scorer,
6564
};
6665
use crate::io::vss_store::VssStoreBuilder;
6766
use crate::io::{
@@ -1279,9 +1278,19 @@ fn build_with_store_internal(
12791278
let (payment_store_res, node_metris_res, pending_payment_store_res) =
12801279
runtime.block_on(async move {
12811280
tokio::join!(
1282-
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1281+
read_all_objects(
1282+
&*kv_store_ref,
1283+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1284+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1285+
Arc::clone(&logger_ref),
1286+
),
12831287
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1284-
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
1288+
read_all_objects(
1289+
&*kv_store_ref,
1290+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1291+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1292+
Arc::clone(&logger_ref),
1293+
)
12851294
)
12861295
});
12871296

src/io/utils.rs

Lines changed: 17 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ use crate::io::{
4444
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4545
};
4646
use crate::logger::{log_error, LdkLogger, Logger};
47-
use crate::payment::PendingPaymentDetails;
4847
use crate::peer_store::PeerStore;
4948
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
5049
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
51-
use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
50+
use crate::{Error, EventQueue, NodeMetrics};
5251

5352
pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache";
5453

@@ -221,21 +220,19 @@ where
221220
})
222221
}
223222

224-
/// Read previously persisted payments information from the store.
225-
pub(crate) async fn read_payments<L: Deref>(
226-
kv_store: &DynStore, logger: L,
227-
) -> Result<Vec<PaymentDetails>, std::io::Error>
223+
/// Read all objects of type `T` from the given namespace, spawning reads in parallel.
224+
pub(crate) async fn read_all_objects<T, L>(
225+
kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L,
226+
) -> Result<Vec<T>, std::io::Error>
228227
where
228+
T: Readable,
229+
L: Deref,
229230
L::Target: LdkLogger,
230231
{
232+
let type_name = std::any::type_name::<T>();
231233
let mut res = Vec::new();
232234

233-
let mut stored_keys = KVStore::list(
234-
&*kv_store,
235-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
236-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
237-
)
238-
.await?;
235+
let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?;
239236

240237
const BATCH_SIZE: usize = 50;
241238

@@ -244,12 +241,7 @@ where
244241
// Fill JoinSet with tasks if possible
245242
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
246243
if let Some(next_key) = stored_keys.pop() {
247-
let fut = KVStore::read(
248-
&*kv_store,
249-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
250-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
251-
&next_key,
252-
);
244+
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
253245
set.spawn(fut);
254246
debug_assert!(set.len() <= BATCH_SIZE);
255247
}
@@ -259,37 +251,32 @@ where
259251
// Exit early if we get an IO error.
260252
let reader = read_res
261253
.map_err(|e| {
262-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
254+
log_error!(logger, "Failed to read {}: {}", type_name, e);
263255
set.abort_all();
264256
e
265257
})?
266258
.map_err(|e| {
267-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
259+
log_error!(logger, "Failed to read {}: {}", type_name, e);
268260
set.abort_all();
269261
e
270262
})?;
271263

272264
// Refill set for every finished future, if we still have something to do.
273265
if let Some(next_key) = stored_keys.pop() {
274-
let fut = KVStore::read(
275-
&*kv_store,
276-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
277-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
278-
&next_key,
279-
);
266+
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
280267
set.spawn(fut);
281268
debug_assert!(set.len() <= BATCH_SIZE);
282269
}
283270

284271
// Handle result.
285-
let payment = PaymentDetails::read(&mut &*reader).map_err(|e| {
286-
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
272+
let object = T::read(&mut &*reader).map_err(|e| {
273+
log_error!(logger, "Failed to deserialize {}: {}", type_name, e);
287274
std::io::Error::new(
288275
std::io::ErrorKind::InvalidData,
289-
"Failed to deserialize PaymentDetails",
276+
format!("Failed to deserialize {}", type_name),
290277
)
291278
})?;
292-
res.push(payment);
279+
res.push(object);
293280
}
294281

295282
debug_assert!(set.is_empty());
@@ -632,83 +619,6 @@ pub(crate) fn read_bdk_wallet_change_set(
632619
Ok(Some(change_set))
633620
}
634621

635-
/// Read previously persisted pending payments information from the store.
636-
pub(crate) async fn read_pending_payments<L: Deref>(
637-
kv_store: &DynStore, logger: L,
638-
) -> Result<Vec<PendingPaymentDetails>, std::io::Error>
639-
where
640-
L::Target: LdkLogger,
641-
{
642-
let mut res = Vec::new();
643-
644-
let mut stored_keys = KVStore::list(
645-
&*kv_store,
646-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
647-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
648-
)
649-
.await?;
650-
651-
const BATCH_SIZE: usize = 50;
652-
653-
let mut set = tokio::task::JoinSet::new();
654-
655-
// Fill JoinSet with tasks if possible
656-
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
657-
if let Some(next_key) = stored_keys.pop() {
658-
let fut = KVStore::read(
659-
&*kv_store,
660-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
661-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
662-
&next_key,
663-
);
664-
set.spawn(fut);
665-
debug_assert!(set.len() <= BATCH_SIZE);
666-
}
667-
}
668-
669-
while let Some(read_res) = set.join_next().await {
670-
// Exit early if we get an IO error.
671-
let reader = read_res
672-
.map_err(|e| {
673-
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
674-
set.abort_all();
675-
e
676-
})?
677-
.map_err(|e| {
678-
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
679-
set.abort_all();
680-
e
681-
})?;
682-
683-
// Refill set for every finished future, if we still have something to do.
684-
if let Some(next_key) = stored_keys.pop() {
685-
let fut = KVStore::read(
686-
&*kv_store,
687-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
688-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
689-
&next_key,
690-
);
691-
set.spawn(fut);
692-
debug_assert!(set.len() <= BATCH_SIZE);
693-
}
694-
695-
// Handle result.
696-
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
697-
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
698-
std::io::Error::new(
699-
std::io::ErrorKind::InvalidData,
700-
"Failed to deserialize PendingPaymentDetails",
701-
)
702-
})?;
703-
res.push(pending_payment);
704-
}
705-
706-
debug_assert!(set.is_empty());
707-
debug_assert!(stored_keys.is_empty());
708-
709-
Ok(res)
710-
}
711-
712622
#[cfg(test)]
713623
mod tests {
714624
use super::read_or_generate_seed_file;

0 commit comments

Comments
 (0)