Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "historyprovider"
version = "2.1.0"
version = "2.2.0"
edition = "2024"

[[bin]]
Expand All @@ -10,7 +10,7 @@ name = "hp"
tokio = { version = "1.48.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
shvproto = { git = "https://github.com/silicon-heaven/libshvproto-rs", tag = "3.6.22" }
shvrpc = { git = "https://github.com/silicon-heaven/libshvrpc-rs", tag = "5.0.0" }
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.20.0", features = ["tokio"] }
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.21.0", features = ["tokio"] }
futures = "0.3.31"
log = "0.4.28"
clap = { version = "4.5.53", features = ["derive"] }
Expand Down Expand Up @@ -42,5 +42,5 @@ humantime = "2.3.0"
[dev-dependencies]
async-broadcast = "0.7.2"
async-trait = "0.1.80"
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.20.0", features = ["mocking"] }
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.21.0", features = ["mocking"] }
tempfile = "3.23.0"
182 changes: 94 additions & 88 deletions src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::sync::Arc;

use async_compression::tokio::write::GzipEncoder;
use futures::{StreamExt, TryStreamExt};
use log::error;
use sha1::Digest;
use shvclient::appnodes::DOT_APP_METHODS;
use shvclient::clientnode::{rpc_error_unknown_method_on_path, LsHandlerResult};
use shvclient::clientnode::{ResolvedRequest, StaticNode, METH_DIR, METH_LS};
use shvclient::clientnode::{err_unresolved_request, LsHandlerResult, Method, RequestHandlerResult, UnresolvedRequest};
use shvclient::clientnode::StaticNode;
use shvclient::shvproto::RpcValue;
use shvclient::ClientCommandSender;
use shvrpc::metamethod::{AccessLevel, MetaMethod};
Expand Down Expand Up @@ -105,10 +106,10 @@ fn path_contains_parent_dir_references(path: impl AsRef<str>) -> bool {

async fn shvjournal_request_handler(
path: String,
method: String,
method: Method,
param: RpcValue,
app_state: Arc<State>,
) -> Result<ResolvedRequest, RpcError>
) -> RequestHandlerResult
{
async fn get_journaldir_entries(path: impl AsRef<Path>) -> Result<ReadDirStream, RpcError> {
Ok(ReadDirStream::new(
Expand Down Expand Up @@ -210,7 +211,7 @@ async fn shvjournal_request_handler(

if path_contains_parent_dir_references(&path) {
// Reject parent dir references
return Err(rpc_error_unknown_method_on_path(path, method));
return err_unresolved_request()
}
let is_shvjournal_root = &path == "_shvjournal";
let path = path.replacen("_shvjournal", &app_state.config.journal_dir, 1);
Expand All @@ -225,53 +226,56 @@ async fn shvjournal_request_handler(
MetaMethod::new_static(METH_SYNC_INFO, 0, AccessLevel::Read, "String", "Map", &[], "syncInfo - returns info about sites' sync status\nOptionally takes a string that filters the sites by prefix.\n\nReturns: a map where they is the path of the site and the value is a map with a status string and a last updated timestamp.\n"),
MetaMethod::new_static(METH_SANITIZE_LOG, 0, AccessLevel::Developer, "Null", "String", &[], ""),
];
match method.as_ref() {
METH_DIR => return Ok(ResolvedRequest::dir(METHODS)),
METH_LS => return Ok(ResolvedRequest::ls(METHODS, async || {
match method {
Method::Dir(dir) => return dir.resolve(METHODS),
Method::Ls(ls) => return ls.resolve(METHODS, async || {
journaldir_ls_handler(get_journaldir_entries(path).await?).await
})),
METH_LS_FILES => return Ok(ResolvedRequest::method(METHODS, method, async || {
journaldir_lsfiles_handler(get_journaldir_entries(path).await?).await
})),
METH_LOG_SIZE_LIMIT => return Ok(ResolvedRequest::method(METHODS, method, async move || Ok(log_size_limit(&app_state.config)))),
METH_TOTAL_LOG_SIZE => return Ok(ResolvedRequest::method(METHODS, method, async move || total_log_size(&app_state.config)
}),
Method::Other(m) => match m.method() {
METH_LS_FILES => return m.resolve(METHODS, async || {
journaldir_lsfiles_handler(get_journaldir_entries(path).await?).await
}),
METH_LOG_SIZE_LIMIT => return m.resolve(METHODS, async move || Ok(log_size_limit(&app_state.config))),
METH_TOTAL_LOG_SIZE => return m.resolve(METHODS, async move || total_log_size(&app_state.config)
.await
.map(RpcValue::from)
.map_err(rpc_error_filesystem))
),
METH_LOG_USAGE => return Ok(ResolvedRequest::method(METHODS, method, async move || total_log_size(&app_state.config)
.await
.map(|size| 100. * (size as f64) / (log_size_limit(&app_state.config) as f64))
.map_err(rpc_error_filesystem))
),
METH_SYNC_LOG => return Ok(ResolvedRequest::method(METHODS, method, async move || sync_log_request_handler(&param, app_state).await)),
METH_SYNC_INFO => return Ok(ResolvedRequest::method(METHODS, method, async move || Ok((*app_state.sync_info.sites_sync_info.read().await).to_owned()))),
METH_SANITIZE_LOG => return Ok(ResolvedRequest::method(METHODS, method, async move || app_state.sync_cmd_tx
.send(crate::sync::SyncCommand::Cleanup)
.map(|_| true)
.map_err(|_| RpcError::new(RpcErrorCode::InternalError, "Cannot send the command through the channel"))
)),
_ => return Err(rpc_error_method_not_found()),
.map_err(rpc_error_filesystem)
),
METH_LOG_USAGE => return m.resolve(METHODS, async move || total_log_size(&app_state.config)
.await
.map(|size| 100. * (size as f64) / (log_size_limit(&app_state.config) as f64))
.map_err(rpc_error_filesystem)
),
METH_SYNC_LOG => return m.resolve(METHODS, async move || sync_log_request_handler(&param, app_state).await),
METH_SYNC_INFO => return m.resolve(METHODS, async move || Ok((*app_state.sync_info.sites_sync_info.read().await).to_owned())),
METH_SANITIZE_LOG => return m.resolve(METHODS, async move || app_state.sync_cmd_tx
.send(crate::sync::SyncCommand::Cleanup)
.map(|_| true)
.map_err(|_| RpcError::new(RpcErrorCode::InternalError, "Cannot send the command through the channel"))
),
_ => return err_unresolved_request(),
}
}
}

// Probe the path on the fs. Prevent leaking FS info to unauthorized
// users, do not return `rpc_error_filesystem`.
let path_meta = tokio::fs::metadata(&path)
.await
.map_err(|_| rpc_error_unknown_method_on_path(&path, &method))?;
.inspect_err(|err| error!("Cannot read FS metadata, path: {path}, error: {err}"))
.or(Err(UnresolvedRequest))?;

if path_meta.is_dir() {
const METHODS: &[MetaMethod] = &[META_METHOD_LS_FILES];
match method.as_str() {
METH_DIR => Ok(ResolvedRequest::dir(METHODS)),
METH_LS => Ok(ResolvedRequest::ls(METHODS, async || {
match method {
Method::Dir(dir) => dir.resolve(METHODS),
Method::Ls(ls) => ls.resolve(METHODS, async || {
journaldir_ls_handler(get_journaldir_entries(path).await?).await
})),
METH_LS_FILES => Ok(ResolvedRequest::method(METHODS, method, async || {
}),
Method::Other(m) if m.method() == METH_LS_FILES => m.resolve(METHODS, async || {
journaldir_lsfiles_handler(get_journaldir_entries(path).await?).await
})),
_ => Err(rpc_error_method_not_found()),
}),
_ => err_unresolved_request(),
}
} else if path_meta.is_file() {
const METHODS: &[MetaMethod] = &[
Expand All @@ -280,15 +284,18 @@ async fn shvjournal_request_handler(
MetaMethod::new_static(METH_READ, 0, AccessLevel::Read, "Map", "Blob", &[], "Parameters\n offset: file offset to start read, default is 0\n size: number of bytes to read starting on offset, default is till end of file\n"),
MetaMethod::new_static(METH_READ_COMPRESSED, 0, AccessLevel::Read, "Map", "Blob", &[], "Parameters\n read() parameters\n compressionType: gzip (default) | qcompress"),
];
match method.as_str() {
METH_DIR => Ok(ResolvedRequest::dir(METHODS)),
METH_LS => Ok(ResolvedRequest::ls(METHODS, ls_empty)),
_ => Ok(ResolvedRequest::method(METHODS, method.clone(), async move || {
journalfile_methods_handler(&method, &path, path_meta.len(), &param).await
})),
match method {
Method::Dir(dir) => dir.resolve(METHODS),
Method::Ls(ls) => ls.resolve(METHODS, ls_empty),
Method::Other(m) => {
let method = m.method().to_owned();
m.resolve(METHODS, async move || {
journalfile_methods_handler(&method, &path, path_meta.len(), &param).await
})
},
}
} else {
Err(rpc_error_unknown_method_on_path(path, method))
err_unresolved_request()
}
}

Expand Down Expand Up @@ -562,20 +569,20 @@ pub(crate) async fn request_handler(
rq: RpcMessage,
client_cmd_tx: ClientCommandSender,
app_state: Arc<State>,
) -> Result<ResolvedRequest, RpcError>
) -> RequestHandlerResult
{
let path = rq.shv_path().map_or_else(String::new, String::from);
let method = rq.method().map_or_else(String::new, String::from);
let method = Method::from_request(&rq);
let param = rq.param().map_or_else(RpcValue::null, RpcValue::clone);

match NodeType::from_path(&path) {
NodeType::DotApp => {
match method.as_str() {
METH_DIR => Ok(ResolvedRequest::dir(DOT_APP_METHODS)),
METH_LS => Ok(ResolvedRequest::ls(DOT_APP_METHODS, ls_empty)),
_ => Ok(ResolvedRequest::method_opt(DOT_APP_METHODS, method, async move ||
DOT_APP_NODE.process_request(rq, client_cmd_tx).await
)),
match method {
Method::Dir(dir) => dir.resolve(DOT_APP_METHODS),
Method::Ls(ls) => ls.resolve(DOT_APP_METHODS, ls_empty),
Method::Other(m) => m.resolve_opt(DOT_APP_METHODS, async move ||
DOT_APP_NODE.process_request(rq, client_cmd_tx).await
),
}
}
NodeType::ShvJournal =>
Expand All @@ -585,11 +592,11 @@ pub(crate) async fn request_handler(
MetaMethod::new_static(METH_GET, 0, AccessLevel::Developer, "String", "RpcValue", &[], ""),
MetaMethod::new_static(METH_GET_CACHE, 0, AccessLevel::Developer, "Null", "Map", &[], ""),
];
match method.as_str() {
METH_DIR => Ok(ResolvedRequest::dir(METHODS)),
METH_LS => Ok(ResolvedRequest::ls(METHODS, ls_empty)),
METH_GET | METH_GET_CACHE => Ok(ResolvedRequest::method(METHODS, method, async || Err::<(), _>(rpc_error_not_implemented()))),
_ => Err(rpc_error_method_not_found()),
match method {
Method::Dir(dir) => dir.resolve(METHODS),
Method::Ls(ls) => ls.resolve(METHODS, ls_empty),
Method::Other(m) if m.method() == METH_GET || m.method() == METH_GET_CACHE => m.resolve(METHODS, async || Err::<(), _>(rpc_error_not_implemented())),
_ => err_unresolved_request(),
}
}
NodeType::Root => {
Expand All @@ -607,9 +614,9 @@ pub(crate) async fn request_handler(
// shvGitCommit
// reloadSites (wr) -> Bool
];
match method.as_str() {
METH_DIR => Ok(ResolvedRequest::dir(METHODS)),
METH_LS => {
match method {
Method::Dir(dir) => dir.resolve(METHODS),
Method::Ls(ls) => {
let ls_handler = async move || {
let mut nodes = vec![
".app".to_string(),
Expand All @@ -620,25 +627,27 @@ pub(crate) async fn request_handler(
nodes.append(&mut children_on_path(&app_state.sites_data.read().await.sites_info, ROOT_PATH).unwrap_or_default());
Ok(nodes)
};
Ok(ResolvedRequest::ls(METHODS, ls_handler))
ls.resolve(METHODS, ls_handler)
}
Method::Other(m) => match m.method() {
METH_VERSION => m.resolve(METHODS, async || Ok(env!("CARGO_PKG_VERSION"))),
METH_UPTIME => m.resolve(METHODS, async move || {
Ok(humantime::format_duration(std::time::Duration::from_secs(app_state.start_time.elapsed().as_secs())).to_string())
}),
METH_ALARM_LOG => m.resolve(METHODS, async move || {
let params = AlarmLogParams::try_from(param)
.map_err(|err| RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmLog parameters: {err}")))?;
Ok(alarmlog_impl(&path, &params, app_state).await)
}),
_ => err_unresolved_request(),
}
METH_VERSION => Ok(ResolvedRequest::method(METHODS, method, async || Ok(env!("CARGO_PKG_VERSION")))),
METH_UPTIME => Ok(ResolvedRequest::method(METHODS, method, async move || {
Ok(humantime::format_duration(std::time::Duration::from_secs(app_state.start_time.elapsed().as_secs())).to_string())
})),
METH_ALARM_LOG => Ok(ResolvedRequest::method(METHODS, method, async move || {
let params = AlarmLogParams::try_from(param)
.map_err(|err| RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmLog parameters: {err}")))?;
Ok(alarmlog_impl(&path, &params, app_state).await)
})),
_ => Err(rpc_error_method_not_found()),
}
}
NodeType::History => {
let methods = {
let sites_data = app_state.sites_data.read().await;
let is_site_path = children_on_path(&sites_data.sites_info, &path)
.ok_or_else(|| rpc_error_unknown_method_on_path(&path, &method))?
.ok_or(UnresolvedRequest)?
.is_empty();
if is_site_path {
let is_pushlog = sites_data.sites_info.get(&path)
Expand All @@ -661,26 +670,23 @@ pub(crate) async fn request_handler(
METHODS
}
};
match method.as_str() {
METH_DIR => Ok(ResolvedRequest::dir(methods)),
METH_LS => {
Ok(ResolvedRequest::ls(methods, async move || {
match method {
Method::Dir(dir) => dir.resolve(methods),
Method::Ls(ls) => {
ls.resolve(methods, async move || {
let children = children_on_path(&app_state.sites_data.read().await.sites_info, &path)
.unwrap_or_else(|| panic!("Children on path `{path}` should be Some after methods processing"));
Ok(children)
}))
})
}
_ => Ok(ResolvedRequest::method(methods, method.clone(), async move || {
match method.as_str() {
METH_GET_LOG => getlog_handler_rq(&path, &param, app_state).await,
METH_ALARM_TABLE => alarmtable_handler::<CommonAlarm>(&path, app_state).await,
METH_STATE_ALARM_TABLE => alarmtable_handler::<StateAlarm>(&path, app_state).await,
METH_ALARM_LOG => alarmlog_handler(&path, &param, app_state).await,
METH_PUSH_LOG => pushlog_handler(&path, param, app_state).await,
_ => Err(rpc_error_method_not_found()),
}

})),
Method::Other(m) => match m.method() {
METH_GET_LOG => m.resolve(methods, async move || { getlog_handler_rq(&path, &param, app_state).await }),
METH_ALARM_TABLE => m.resolve(methods, async move || { alarmtable_handler::<CommonAlarm>(&path, app_state).await }),
METH_STATE_ALARM_TABLE => m.resolve(methods, async move || { alarmtable_handler::<StateAlarm>(&path, app_state).await }),
METH_ALARM_LOG => m.resolve(methods, async move || { alarmlog_handler(&path, &param, app_state).await }),
METH_PUSH_LOG => m.resolve(methods, async move || { pushlog_handler(&path, param, app_state).await }),
_ => err_unresolved_request(),
},
}
}
}
Expand Down