Skip to content

Commit 22866ab

Browse files
committed
DRY up batched KVStore reads with read_all_objects helper
The parallel `JoinSet`-based batching loop was duplicated across `read_payments` and `read_pending_payments`. Extract it into a generic `read_all_objects<T: Readable>` helper that callers invoke directly with the relevant namespace constants. Per-type log messages are preserved via `std::any::type_name::<T>()`. Co-Authored-By: HAL 9000
1 parent 64e3154 commit 22866ab

2 files changed

Lines changed: 32 additions & 112 deletions

File tree

src/builder.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
5555
use crate::gossip::GossipSource;
5656
use crate::io::sqlite_store::SqliteStore;
5757
use crate::io::utils::{
58-
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
59-
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
60-
read_scorer, write_node_metrics,
58+
read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache,
59+
read_network_graph, read_node_metrics, read_output_sweeper, read_peer_info, read_scorer,
60+
write_node_metrics,
6161
};
6262
use crate::io::vss_store::VssStoreBuilder;
6363
use crate::io::{
@@ -1270,9 +1270,19 @@ fn build_with_store_internal(
12701270
let (payment_store_res, node_metris_res, pending_payment_store_res) =
12711271
runtime.block_on(async move {
12721272
tokio::join!(
1273-
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1273+
read_all_objects(
1274+
&*kv_store_ref,
1275+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1276+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1277+
Arc::clone(&logger_ref),
1278+
),
12741279
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1275-
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
1280+
read_all_objects(
1281+
&*kv_store_ref,
1282+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1283+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1284+
Arc::clone(&logger_ref),
1285+
)
12761286
)
12771287
});
12781288

src/io/utils.rs

Lines changed: 17 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ use crate::io::{
4444
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4545
};
4646
use crate::logger::{log_error, LdkLogger, Logger};
47-
use crate::payment::PendingPaymentDetails;
4847
use crate::peer_store::PeerStore;
4948
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
5049
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
51-
use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
50+
use crate::{Error, EventQueue, NodeMetrics};
5251

5352
pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache";
5453

@@ -221,21 +220,19 @@ where
221220
})
222221
}
223222

224-
/// Read previously persisted payments information from the store.
225-
pub(crate) async fn read_payments<L: Deref>(
226-
kv_store: &DynStore, logger: L,
227-
) -> Result<Vec<PaymentDetails>, std::io::Error>
223+
/// Read all objects of type `T` from the given namespace, spawning reads in parallel.
224+
pub(crate) async fn read_all_objects<T, L>(
225+
kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L,
226+
) -> Result<Vec<T>, std::io::Error>
228227
where
228+
T: Readable,
229+
L: Deref,
229230
L::Target: LdkLogger,
230231
{
232+
let type_name = std::any::type_name::<T>();
231233
let mut res = Vec::new();
232234

233-
let mut stored_keys = KVStore::list(
234-
&*kv_store,
235-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
236-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
237-
)
238-
.await?;
235+
let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?;
239236

240237
const BATCH_SIZE: usize = 50;
241238

@@ -244,12 +241,7 @@ where
244241
// Fill JoinSet with tasks if possible
245242
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
246243
if let Some(next_key) = stored_keys.pop() {
247-
let fut = KVStore::read(
248-
&*kv_store,
249-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
250-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
251-
&next_key,
252-
);
244+
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
253245
set.spawn(fut);
254246
debug_assert!(set.len() <= BATCH_SIZE);
255247
}
@@ -259,37 +251,32 @@ where
259251
// Exit early if we get an IO error.
260252
let reader = read_res
261253
.map_err(|e| {
262-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
254+
log_error!(logger, "Failed to read {}: {}", type_name, e);
263255
set.abort_all();
264256
e
265257
})?
266258
.map_err(|e| {
267-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
259+
log_error!(logger, "Failed to read {}: {}", type_name, e);
268260
set.abort_all();
269261
e
270262
})?;
271263

272264
// Refill set for every finished future, if we still have something to do.
273265
if let Some(next_key) = stored_keys.pop() {
274-
let fut = KVStore::read(
275-
&*kv_store,
276-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
277-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
278-
&next_key,
279-
);
266+
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
280267
set.spawn(fut);
281268
debug_assert!(set.len() <= BATCH_SIZE);
282269
}
283270

284271
// Handle result.
285-
let payment = PaymentDetails::read(&mut &*reader).map_err(|e| {
286-
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
272+
let object = T::read(&mut &*reader).map_err(|e| {
273+
log_error!(logger, "Failed to deserialize {}: {}", type_name, e);
287274
std::io::Error::new(
288275
std::io::ErrorKind::InvalidData,
289-
"Failed to deserialize PaymentDetails",
276+
format!("Failed to deserialize {}", type_name),
290277
)
291278
})?;
292-
res.push(payment);
279+
res.push(object);
293280
}
294281

295282
debug_assert!(set.is_empty());
@@ -625,83 +612,6 @@ pub(crate) fn read_bdk_wallet_change_set(
625612
Ok(Some(change_set))
626613
}
627614

628-
/// Read previously persisted pending payments information from the store.
629-
pub(crate) async fn read_pending_payments<L: Deref>(
630-
kv_store: &DynStore, logger: L,
631-
) -> Result<Vec<PendingPaymentDetails>, std::io::Error>
632-
where
633-
L::Target: LdkLogger,
634-
{
635-
let mut res = Vec::new();
636-
637-
let mut stored_keys = KVStore::list(
638-
&*kv_store,
639-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
640-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
641-
)
642-
.await?;
643-
644-
const BATCH_SIZE: usize = 50;
645-
646-
let mut set = tokio::task::JoinSet::new();
647-
648-
// Fill JoinSet with tasks if possible
649-
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
650-
if let Some(next_key) = stored_keys.pop() {
651-
let fut = KVStore::read(
652-
&*kv_store,
653-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
654-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
655-
&next_key,
656-
);
657-
set.spawn(fut);
658-
debug_assert!(set.len() <= BATCH_SIZE);
659-
}
660-
}
661-
662-
while let Some(read_res) = set.join_next().await {
663-
// Exit early if we get an IO error.
664-
let reader = read_res
665-
.map_err(|e| {
666-
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
667-
set.abort_all();
668-
e
669-
})?
670-
.map_err(|e| {
671-
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
672-
set.abort_all();
673-
e
674-
})?;
675-
676-
// Refill set for every finished future, if we still have something to do.
677-
if let Some(next_key) = stored_keys.pop() {
678-
let fut = KVStore::read(
679-
&*kv_store,
680-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
681-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
682-
&next_key,
683-
);
684-
set.spawn(fut);
685-
debug_assert!(set.len() <= BATCH_SIZE);
686-
}
687-
688-
// Handle result.
689-
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
690-
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
691-
std::io::Error::new(
692-
std::io::ErrorKind::InvalidData,
693-
"Failed to deserialize PendingPaymentDetails",
694-
)
695-
})?;
696-
res.push(pending_payment);
697-
}
698-
699-
debug_assert!(set.is_empty());
700-
debug_assert!(stored_keys.is_empty());
701-
702-
Ok(res)
703-
}
704-
705615
#[cfg(test)]
706616
mod tests {
707617
use super::read_or_generate_seed_file;

0 commit comments

Comments
 (0)