Skip to content

Commit 9d862a5

Browse files
committed
Sync log3 journal files
HP can now sync files in log3 format and provide them via getLog API. A significant difference from log2 is that a snapshot is not recorded at the begging of each file, but rather scattered in the whole file. This saves space but requires to process an extra file when creating a result in getLog implemenation.
1 parent 291c413 commit 9d862a5

13 files changed

Lines changed: 316 additions & 888 deletions

File tree

Cargo.lock

Lines changed: 36 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "historyprovider"
33
description = "historyprovider-rs"
44
license = "MIT"
55
repository = "https://github.com/silicon-heaven/historyprovider-rs"
6-
version = "2.7.11"
6+
version = "2.8.0"
77
edition = "2024"
88

99
[[bin]]
@@ -12,8 +12,8 @@ name = "hp"
1212
[dependencies]
1313
tokio = { version = "1.50.0", features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
1414
shvproto = "6.1.4"
15-
shvrpc = { version = "14.0", features = ["journal"] }
16-
shvclient = { version = "3.0", features = ["tokio"] }
15+
shvrpc = { version = "15.0", features = ["journal"] }
16+
shvclient = { version = "4.0", features = ["tokio"] }
1717
futures = "0.3.32"
1818
log = "0.4.29"
1919
clap = { version = "4.6.0", features = ["derive"] }
@@ -42,5 +42,5 @@ size = { version = "0.5.0", features = ["serde"] }
4242
[dev-dependencies]
4343
async-broadcast = "0.7.2"
4444
async-trait = "0.1.80"
45-
shvclient = { version = "3.0", features = ["mocking", "tokio"] }
45+
shvclient = { version = "4.0", features = ["mocking", "tokio"] }
4646
tempfile = "3.27.0"

src/cleanup.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use log::error;
22
use log::info;
3+
use shvrpc::journalrw::datetime_to_log3_filename;
34
use tokio::fs;
45
use tokio::io;
56
use std::collections::HashMap;
@@ -14,7 +15,7 @@ pub(crate) struct LogFile {
1415
pub(crate) size: u64,
1516
}
1617

17-
pub(crate) async fn collect_log2_files(dir: impl AsRef<Path>) -> io::Result<Vec<LogFile>> {
18+
pub(crate) async fn collect_log_files(dir: impl AsRef<Path>) -> io::Result<Vec<LogFile>> {
1819
let mut result = Vec::new();
1920
let mut dirs = vec![dir.as_ref().to_path_buf()];
2021

@@ -27,7 +28,7 @@ pub(crate) async fn collect_log2_files(dir: impl AsRef<Path>) -> io::Result<Vec<
2728

2829
if metadata.is_dir() {
2930
dirs.push(path);
30-
} else if metadata.is_file() && path.extension().is_some_and(|ext| ext == "log2")
31+
} else if metadata.is_file() && path.extension().is_some_and(|ext| ext == "log2" || ext == "log3")
3132
&& let (Some(file_name), Some(parent)) = (path.file_name(), path.parent()) {
3233
result.push(LogFile {
3334
name: file_name.into(),
@@ -41,9 +42,9 @@ pub(crate) async fn collect_log2_files(dir: impl AsRef<Path>) -> io::Result<Vec<
4142
Ok(result)
4243
}
4344

44-
/// Prune `.log2` files while keeping the newest one per directory
45-
pub(crate) async fn cleanup_log2_files(dir: impl AsRef<Path>, size_limit: u64, days_to_keep: i64) -> io::Result<()> {
46-
let files = collect_log2_files(dir).await?;
45+
/// Prune `.log[2,3]` files while keeping the newest one per directory
46+
pub(crate) async fn cleanup_log_files(dir: impl AsRef<Path>, size_limit: u64, days_to_keep: i64) -> io::Result<()> {
47+
let files = collect_log_files(dir).await?;
4748
let mut files_size: u64 = files.iter().map(|f| f.size).sum();
4849

4950
info!("log2 files size: {files_size}, size limit: {size_limit}, days_to_keep: {days_to_keep}");
@@ -61,14 +62,26 @@ pub(crate) async fn cleanup_log2_files(dir: impl AsRef<Path>, size_limit: u64, d
6162
let mut deletable_files = Vec::new();
6263

6364
// This file doesn't have to exist, I'm only constructing the filename for log retention.
64-
let oldest_file_to_keep = PathBuf::from(msec_to_log2_filename(shvproto::DateTime::now().add_days(-days_to_keep).epoch_msec()));
65-
info!("keeping files younger than {filename}", filename = oldest_file_to_keep.to_string_lossy());
65+
let oldest_log2_file_to_keep = PathBuf::from(msec_to_log2_filename(shvproto::DateTime::now().add_days(-days_to_keep).epoch_msec()));
66+
let oldest_log3_file_to_keep = PathBuf::from(datetime_to_log3_filename(shvproto::DateTime::now().add_days(-days_to_keep)));
67+
info!("keeping files younger than {filename_log2} or {filename_log3}",
68+
filename_log2 = oldest_log2_file_to_keep.to_string_lossy(),
69+
filename_log3 = oldest_log3_file_to_keep.to_string_lossy(),
70+
);
6671

6772
for (_dir, mut group) in grouped {
6873
// Sort descending (newest first)
6974
group.sort_by(|a, b| b.name.cmp(&a.name));
70-
// Keep the newest file, and keep files newer than oldest_file_to_delete
71-
deletable_files.extend(group.into_iter().skip(1).filter(|log_file| log_file.name < oldest_file_to_keep));
75+
// Keep the newest two files (for snapshots), and keep files newer than oldest_file_to_delete
76+
deletable_files.extend(group
77+
.into_iter()
78+
.skip(2)
79+
.filter(|log_file| if log_file.name.ends_with(".log2") {
80+
log_file.name < oldest_log2_file_to_keep
81+
} else {
82+
log_file.name < oldest_log3_file_to_keep
83+
})
84+
);
7285
}
7386

7487
// Sort deletable files (oldest first for deletion)

src/dirtylog.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ use shvclient::{ClientCommandSender, ClientEventsReceiver};
1313
use shvproto::DateTime as ShvDateTime;
1414
use shvrpc::metamethod::AccessLevel;
1515
use shvrpc::journalentry::JournalEntry;
16-
use shvrpc::journalrw::{JournalReaderLog2, JournalWriterLog2};
16+
use shvrpc::journalrw::{JournalReaderLog2, JournalReaderLog3, JournalWriterLog2};
1717
use shvrpc::datachange::{DataChange, ValueFlags};
1818
use tokio_util::compat::TokioAsyncReadCompatExt;
1919

2020
use crate::sites::ParsedNotification;
21-
use crate::util::{get_files, is_log2_file};
21+
use crate::util::{get_files, is_log_file};
2222
use crate::State;
2323

2424

@@ -127,7 +127,7 @@ pub(crate) async fn dirtylog_task(
127127
};
128128

129129
let latest_entry = {
130-
let mut log_files = match get_files(journal_dir.join(&site), is_log2_file).await {
130+
let mut log_files = match get_files(journal_dir.join(&site), is_log_file).await {
131131
Ok(files) => files,
132132
Err(err) => {
133133
error!("Cannot trim dirty log. Cannot read journal dir entries: {err}");
@@ -142,8 +142,13 @@ pub(crate) async fn dirtylog_task(
142142
async move {
143143
match tokio::fs::File::open(&file_path).await {
144144
Ok(file) => {
145-
let reader = JournalReaderLog2::new(BufReader::new(file.compat()));
146-
reader.fold(None, async |_, entry| entry.ok()).await
145+
if file_path.ends_with(".log3") {
146+
let reader = JournalReaderLog3::new(BufReader::new(file.compat()));
147+
reader.fold(None, async |_, entry| entry.ok()).await
148+
} else {
149+
let reader = JournalReaderLog2::new(BufReader::new(file.compat()));
150+
reader.fold(None, async |_, entry| entry.ok()).await
151+
}
147152
}
148153
Err(err) => {
149154
error!("Cannot open file {file_path} while getting the last journal entry for trim dirtylog: {err}",
@@ -265,19 +270,19 @@ pub(crate) async fn dirtylog_task(
265270
DirtyLogCommand::Get { site, response_tx } => {
266271
request_scheduler.schedule_new(site, Request::Get(response_tx));
267272
}
268-
DirtyLogCommand::ProcessNotification(ParsedNotification { site_path, property_path, signal, param }) => {
273+
DirtyLogCommand::ProcessNotification(ParsedNotification { site_path, property_path, signal, source, param }) => {
269274
let data_change = DataChange::from(param);
270275
let journal_entry = JournalEntry {
271276
epoch_msec: data_change.date_time.unwrap_or_else(ShvDateTime::now).epoch_msec(),
272277
path: property_path,
273278
signal,
274-
source: Default::default(),
279+
source,
275280
value: data_change.value,
276281
access_level: AccessLevel::Read as _,
277282
short_time: data_change.short_time.unwrap_or(-1),
278-
user_id: Default::default(),
283+
user_id: shvproto::RpcValue::null(),
279284
repeat: !data_change.value_flags.contains(ValueFlags::SPONTANEOUS),
280-
provisional: true, // data_change.value_flags & (1 << VALUE_FLAG_PROVISIONAL_BIT) != 0,
285+
provisional: true,
281286
};
282287
// Schedule next task
283288
request_scheduler.schedule_new(site_path, Request::Append(journal_entry));
@@ -354,15 +359,15 @@ mod tests {
354359
TestCase {
355360
name: "ProcessNotification: journaldir doesn't exist",
356361
steps: &[
357-
Box::new(TestDirtyLogCommand(DirtyLogCommand::ProcessNotification(crate::sites::ParsedNotification { site_path: "site1".into(), property_path: "some_value_node".into(), signal: "chng".into(), param: 20.into() })))
362+
Box::new(TestDirtyLogCommand(DirtyLogCommand::ProcessNotification(crate::sites::ParsedNotification { site_path: "site1".into(), property_path: "some_value_node".into(), signal: "chng".into(), source: "get".into(), param: 20.into() })))
358363
],
359364
starting_files: vec![],
360365
expected_file_paths: vec![],
361366
},
362367
TestCase {
363368
name: "ProcessNotification: notifications get written to disk",
364369
steps: &[
365-
Box::new(TestDirtyLogCommand(DirtyLogCommand::ProcessNotification(crate::sites::ParsedNotification { site_path: "site1".into(), property_path: "some_value_node".into(), signal: "chng".into(), param: DataChange{
370+
Box::new(TestDirtyLogCommand(DirtyLogCommand::ProcessNotification(crate::sites::ParsedNotification { site_path: "site1".into(), property_path: "some_value_node".into(), signal: "chng".into(), source: "get".into(), param: DataChange{
366371
value: 20.into(),
367372
date_time: Some(DateTime::from_iso_str("2022-07-07T00:00:00.000").expect("DateTime must work")),
368373
value_flags: ValueFlags::empty(),
@@ -417,7 +422,7 @@ mod tests {
417422
value: 20.into(),
418423
access_level: 8,
419424
short_time: -1,
420-
user_id: None,
425+
user_id: ().into(),
421426
repeat: true,
422427
provisional: true
423428
}],
@@ -441,7 +446,7 @@ mod tests {
441446
value: 20.into(),
442447
access_level: 8,
443448
short_time: -1,
444-
user_id: None,
449+
user_id: ().into(),
445450
repeat: true,
446451
provisional: true
447452
}],

src/getlog.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use tokio_util::compat::TokioAsyncReadCompatExt;
1414

1515
use crate::dirtylog::DirtyLogCommand;
1616
use shvrpc::journalentry::JournalEntry;
17-
use shvrpc::journalrw::{matches_path_pattern, GetLog2Params, GetLog2Since, JournalReaderLog2};
18-
use crate::util::{get_files, is_log2_file};
17+
use shvrpc::journalrw::{GetLog2Params, GetLog2Since, JournalReaderLog2, JournalReaderLog3, matches_path_pattern};
18+
use crate::util::{get_files, is_log_file};
1919
use crate::State;
2020

2121
pub(crate) struct GetLogResult {
@@ -32,14 +32,17 @@ pub(crate) struct GetLogResult {
3232
}
3333

3434
fn file_name_to_file_msec(filename: &str) -> Result<i64, String> {
35-
let without_ext = filename
36-
.strip_suffix(".log2")
37-
.ok_or_else(|| format!("Invalid file extension in '{filename}'"))?;
38-
39-
let datetime = chrono::NaiveDateTime::parse_from_str(without_ext, "%Y-%m-%dT%H-%M-%S-%3f")
40-
.map_err(|e| format!("Failed to parse '{filename}': {e}"))?;
35+
let (stripped, format) = if let Some(stripped) = filename.strip_suffix(".log2") {
36+
(stripped, "%Y-%m-%dT%H-%M-%S-%3f")
37+
} else if let Some(stripped) = filename.strip_suffix(".log3") {
38+
(stripped, "Y-%m-%dT%H-%M-%S")
39+
} else {
40+
return Err(format!("Invalid file extension in '{filename}'"));
41+
};
4142

42-
Ok(chrono::Utc.from_utc_datetime(&datetime).timestamp_millis())
43+
chrono::NaiveDateTime::parse_from_str(stripped, format)
44+
.map(|datetime| chrono::Utc.from_utc_datetime(&datetime).timestamp_millis())
45+
.map_err(|e| format!("Failed to parse '{filename}': {e}"))
4346
}
4447

4548

@@ -53,7 +56,7 @@ pub(crate) async fn getlog_handler(
5356
}
5457
let local_journal_path = Path::new(&app_state.config.journal_dir).join(site_path);
5558
info!("getLog handler, site: {site_path}, params: {params}");
56-
let mut log_files = get_files(&local_journal_path, is_log2_file)
59+
let mut log_files = get_files(&local_journal_path, is_log_file)
5760
.await
5861
.map_err(|err| RpcError::new(RpcErrorCode::InternalError, format!("Cannot read log files: {err}")))?;
5962
log_files.sort_by_key(|entry| entry.file_name());
@@ -65,10 +68,11 @@ pub(crate) async fn getlog_handler(
6568
async move {
6669
match tokio::fs::File::open(&file_path).await {
6770
Ok(file) => {
68-
JournalReaderLog2::new(BufReader::new(file.compat()))
69-
.next()
70-
.await
71-
.is_some()
71+
if file_path.ends_with(".log3") {
72+
JournalReaderLog3::new(BufReader::new(file.compat())).next().await.is_some()
73+
} else {
74+
JournalReaderLog2::new(BufReader::new(file.compat())).next().await.is_some()
75+
}
7276
}
7377
Err(err) => {
7478
error!("Cannot open file {file_path} in call to getLog: {err}",
@@ -82,7 +86,7 @@ pub(crate) async fn getlog_handler(
8286
.collect::<Vec<_>>()
8387
.await;
8488

85-
let file_start_index = {
89+
let file_start_index: usize = {
8690
if log_files.is_empty() {
8791
0
8892
} else {
@@ -102,6 +106,8 @@ pub(crate) async fn getlog_handler(
102106
GetLog2Since::LastEntry => log_files.len() - 1,
103107
GetLog2Since::None => 0,
104108
}
109+
.saturating_sub(1) // Take previous file to have a complete snapshot when processing log3
110+
// files
105111
}
106112
};
107113

@@ -366,7 +372,7 @@ mod tests {
366372
access_level: 32,
367373
source: "".to_string(),
368374
value: value.into(),
369-
user_id: None,
375+
user_id: ().into(),
370376
repeat: false,
371377
provisional: false,
372378
})

0 commit comments

Comments
 (0)