Skip to content

Commit 0725ed2

Browse files
authored
Merge pull request #119 from silicon-heaven/optimize-trimming
Optimize trimming
2 parents c6a4cb9 + b6702e1 commit 0725ed2

3 files changed

Lines changed: 64 additions & 57 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "historyprovider"
33
description = "historyprovider-rs"
44
license = "MIT"
55
repository = "https://github.com/silicon-heaven/historyprovider-rs"
6-
version = "2.12.1"
6+
version = "2.12.2"
77
edition = "2024"
88

99
[profile.release]

src/sync.rs

Lines changed: 62 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ async fn get_files_to_sync(
253253
.collect()
254254
}
255255

256+
enum ShouldTrim {
257+
Yes,
258+
No,
259+
}
260+
256261
async fn sync_site_by_download(
257262
site_path: impl AsRef<str>,
258263
remote_journal_path: impl AsRef<str>,
@@ -261,7 +266,7 @@ async fn sync_site_by_download(
261266
journal_dir: impl AsRef<str>,
262267
sync_logger: impl SyncLogger,
263268
file_list: Option<&[LsFilesEntry]>,
264-
) -> Result<(), String>
269+
) -> Result<ShouldTrim, String>
265270
{
266271
let (site_path, remote_journal_path) = (site_path.as_ref(), remote_journal_path.as_ref());
267272
let local_journal_path = Path::new(journal_dir.as_ref()).join(site_path);
@@ -369,32 +374,34 @@ async fn sync_site_by_download(
369374
.collect::<Vec<_>>()
370375
.await;
371376

372-
if let Some((first_file, _, _)) = files_to_sync.first() {
373-
let read_api = RpcCall::new(&join_path!(remote_journal_path, first_file), METH_DIR)
374-
.param("sha1")
375-
.exec(&client_cmd_tx)
376-
.await
377-
.map(|v: RpcValue| if v.is_imap() { ReadApi::List } else { ReadApi::Map })
378-
.map_err(|e| format!("Cannot get read param API for {remote_journal_path}: {e}"))?;
379-
380-
// Sync from the remote to the sync directory
381-
for (file_name, sync_offset, file_size) in files_to_sync {
382-
sync_file(
383-
client_cmd_tx.clone(),
384-
join_path!(remote_journal_path, &file_name),
385-
local_journal_path.join(&file_name),
386-
download_chunk_size,
387-
sync_offset,
388-
file_size,
389-
read_api,
390-
sync_logger.clone(),
391-
)
392-
.await
393-
.map_err(to_string)?;
394-
}
377+
let Some((first_file, _, _)) = files_to_sync.first() else {
378+
return Ok(ShouldTrim::No);
379+
};
380+
381+
let read_api = RpcCall::new(&join_path!(remote_journal_path, first_file), METH_DIR)
382+
.param("sha1")
383+
.exec(&client_cmd_tx)
384+
.await
385+
.map(|v: RpcValue| if v.is_imap() { ReadApi::List } else { ReadApi::Map })
386+
.map_err(|e| format!("Cannot get read param API for {remote_journal_path}: {e}"))?;
387+
388+
// Sync from the remote to the sync directory
389+
for (file_name, sync_offset, file_size) in files_to_sync {
390+
sync_file(
391+
client_cmd_tx.clone(),
392+
join_path!(remote_journal_path, &file_name),
393+
local_journal_path.join(&file_name),
394+
download_chunk_size,
395+
sync_offset,
396+
file_size,
397+
read_api,
398+
sync_logger.clone(),
399+
)
400+
.await
401+
.map_err(to_string)?;
395402
}
396403

397-
Ok(())
404+
Ok(ShouldTrim::Yes)
398405
}
399406

400407
#[derive(Copy,Clone)]
@@ -504,7 +511,7 @@ async fn sync_site_legacy(
504511
client_cmd_tx: ClientCommandSender,
505512
journal_dir: impl AsRef<str>,
506513
sync_logger: impl SyncLogger,
507-
) -> Result<(), String>
514+
) -> Result<ShouldTrim, String>
508515
{
509516
let (site_path, getlog_path) = (site_path.as_ref(), getlog_path.as_ref());
510517
let local_journal_path = Path::new(journal_dir.as_ref()).join(site_path);
@@ -644,6 +651,8 @@ async fn sync_site_legacy(
644651
Ok(())
645652
}
646653

654+
let mut should_trim = ShouldTrim::No;
655+
647656
loop {
648657
sync_logger.log(
649658
log::Level::Info,
@@ -687,6 +696,8 @@ async fn sync_site_legacy(
687696
break;
688697
};
689698

699+
should_trim = ShouldTrim::Yes;
700+
690701
log_file_entries.append(&mut log_entries);
691702

692703
getlog_params.since = GetLog2Since::DateTime(shvproto::DateTime::from_epoch_msec(last_entry_ms));
@@ -706,7 +717,7 @@ async fn sync_site_legacy(
706717
getlog_params.with_snapshot = false;
707718
}
708719
}
709-
Ok(())
720+
Ok(should_trim)
710721
}
711722

712723
pub(crate) async fn sync_task(
@@ -739,6 +750,24 @@ pub(crate) async fn sync_task(
739750
let days_to_keep = app_state.config.days_to_keep;
740751

741752
while let Some(cmd) = sync_cmd_rx.next().await {
753+
fn on_sync_result(sync_result: Result<ShouldTrim, String>, site_path: String, dirtylog_cmd_tx: UnboundedSender<DirtyLogCommand>, sync_logger: &SyncSiteLogger) {
754+
match sync_result {
755+
Ok(ShouldTrim::Yes) => {
756+
if let Err(e) = dirtylog_cmd_tx.unbounded_send(DirtyLogCommand::Trim { site: site_path }) {
757+
let err_msg = e.to_string();
758+
let command = e.into_inner();
759+
log::error!("Cannot send dirtylog Trim command {command:?}: {err_msg}")
760+
}
761+
},
762+
Ok(ShouldTrim::No) => {
763+
sync_logger.log(log::Level::Info, format!("Not trimming {site_path}, because no changes were made"));
764+
},
765+
Err(err) => {
766+
sync_logger.log(log::Level::Error, format!("site sync error: {err}"));
767+
},
768+
}
769+
}
770+
742771
match cmd {
743772
SyncCommand::SyncAll => {
744773
log::info!("Sync logs start");
@@ -778,14 +807,6 @@ pub(crate) async fn sync_task(
778807
let app_state = app_state.clone();
779808
let logger_tx = logger_tx.clone();
780809

781-
fn send_trim(site_path: String, dirtylog_cmd_tx: UnboundedSender<DirtyLogCommand>) {
782-
if let Err(e) = dirtylog_cmd_tx.unbounded_send(DirtyLogCommand::Trim { site: site_path }) {
783-
let err_msg = e.to_string();
784-
let command = e.into_inner();
785-
log::error!("Cannot send dirtylog Trim command {command:?}: {err_msg}")
786-
}
787-
}
788-
789810
match sub_hp {
790811
SubHpInfo::Normal { sync_path, download_chunk_size } => {
791812
let site_suffix = shvrpc::util::strip_prefix_path(&site_path, &site_info.sub_hp)
@@ -804,11 +825,8 @@ pub(crate) async fn sync_task(
804825
sync_logger.clone(),
805826
Some(&file_list),
806827
).await;
807-
if let Err(err) = sync_result {
808-
sync_logger.log(log::Level::Error, format!("site sync error: {err}"));
809-
}
810828
sync_logger.log(log::Level::Info, "syncing done");
811-
send_trim(site_path, app_state.dirtylog_cmd_tx.clone());
829+
on_sync_result(sync_result, site_path, app_state.dirtylog_cmd_tx.clone(), &sync_logger);
812830
drop(permit);
813831
});
814832
sync_tasks.push(sync_task);
@@ -826,11 +844,8 @@ pub(crate) async fn sync_task(
826844
&app_state.config.journal_dir,
827845
sync_logger.clone()
828846
).await;
829-
if let Err(err) = sync_result {
830-
sync_logger.log(log::Level::Error, format!("site sync error: {err}"));
831-
}
832847
sync_logger.log(log::Level::Info, "syncing done");
833-
send_trim(site_path, app_state.dirtylog_cmd_tx.clone());
848+
on_sync_result(sync_result, site_path, app_state.dirtylog_cmd_tx.clone(), &sync_logger);
834849
drop(permit);
835850
});
836851
sync_tasks.push(sync_task);
@@ -868,17 +883,15 @@ pub(crate) async fn sync_task(
868883
let download_chunk_size = *download_chunk_size;
869884
let sync_logger = SyncSiteLogger::new(&site_path, logger_tx);
870885
let sync_result = sync_site_by_download(
871-
site_path,
886+
site_path.clone(),
872887
remote_journal_path,
873888
download_chunk_size,
874889
client_cmd_tx,
875890
&app_state.config.journal_dir,
876891
sync_logger.clone(),
877892
None,
878893
).await;
879-
if let Err(err) = sync_result {
880-
sync_logger.log(log::Level::Error, format!("site sync error: {err}"));
881-
}
894+
on_sync_result(sync_result, site_path, app_state.dirtylog_cmd_tx.clone(), &sync_logger);
882895
sync_logger.log(log::Level::Info, "syncing done");
883896
}
884897
SubHpInfo::Legacy { getlog_path } => {
@@ -887,25 +900,19 @@ pub(crate) async fn sync_task(
887900
let remote_getlog_path = join_path!("shv", &site_info.sub_hp, getlog_path, site_suffix);
888901
let sync_logger = SyncSiteLogger::new(&site_path, logger_tx);
889902
let sync_result = sync_site_legacy(
890-
site_path,
903+
site_path.clone(),
891904
remote_getlog_path,
892905
client_cmd_tx,
893906
&app_state.config.journal_dir,
894907
sync_logger.clone()
895908
).await;
896-
if let Err(err) = sync_result {
897-
sync_logger.log(log::Level::Error, format!("site sync error: {err}"));
898-
}
909+
on_sync_result(sync_result, site_path, app_state.dirtylog_cmd_tx.clone(), &sync_logger);
899910
sync_logger.log(log::Level::Info, "syncing done");
900911
}
901912
SubHpInfo::PushLog => {
902913
// No op
903914
}
904915
}
905-
app_state.dirtylog_cmd_tx.unbounded_send(DirtyLogCommand::Trim { site: site.clone() })
906-
.unwrap_or_else(|e|
907-
panic!("Cannot send dirtylog Trim command for site {site}: {e}")
908-
);
909916
match cleanup_log_files(&app_state.config.journal_dir, max_journal_dir_size, days_to_keep).await {
910917
Ok(_) => info!("Cleanup journal dir done"),
911918
Err(err) => error!("Cleanup journal dir error: {err}"),

0 commit comments

Comments
 (0)