Skip to content

Commit 6ece7c7

Browse files
Add subscription persistency
* Initial batch of changes to add subscription persistency * Update deps * temp-ignore advisories of indirect deps * cleanup deny.toml
1 parent 1ee07e7 commit 6ece7c7

21 files changed

+1378
-718
lines changed

Cargo.lock

Lines changed: 271 additions & 185 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ version = "0.2.0"
3131
[workspace.dependencies]
3232
async-trait = { version = "0.1" }
3333
env_logger = { version = "0.11" }
34-
futures = { version = "0.3" }
3534
log = { version = "0.4" }
3635
mockall = { version = "0.13" }
3736
protobuf = { version = "3.4" }

deny.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# For all options see https://github.com/EmbarkStudios/cargo-deny/blob/main/deny.template.toml
1616

1717
[advisories]
18-
ignore = ["RUSTSEC-2023-0071"]
18+
ignore = ["RUSTSEC-2023-0071", "RUSTSEC-2025-0014", "RUSTSEC-2024-0436"]
1919

2020
[bans]
2121
multiple-versions = "allow"
@@ -33,8 +33,6 @@ allow = [
3333
"Unicode-3.0",
3434
"Zlib",
3535
]
36-
exceptions = [{ name = "ring", allow = ["OpenSSL"] }]
37-
#private = { ignore = true }
3836

3937
[[licenses.clarify]]
4038
name = "ring"

up-subscription-cli/Cargo.toml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,9 @@ mqtt5 = ["dep:up-transport-mqtt5"]
3434
zenoh = ["dep:up-transport-zenoh", "dep:serde_json"]
3535

3636
[dependencies]
37-
async-trait = { workspace = true }
3837
clap = { version = "4.5", features = ["derive", "env"] }
3938
clap-num = { version = "1.1" }
40-
env_logger = { workspace = true }
41-
futures = { workspace = true }
4239
log = { workspace = true }
43-
protobuf = { workspace = true }
4440
serde_json = { version = "1.0.138", optional = true }
4541
tokio = { workspace = true }
4642
up-rust = { workspace = true }
@@ -52,7 +48,3 @@ up-transport-zenoh = { version = "0.5.0", optional = true }
5248

5349
[target.'cfg(unix)'.dependencies]
5450
daemonize = { version = "0.5" }
55-
56-
[dev-dependencies]
57-
mockall = { workspace = true }
58-
test-case = { workspace = true }

up-subscription-cli/src/main.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,15 @@ pub(crate) struct Args {
102102
#[arg(short, long, env, value_parser=between_1_and_1024)]
103103
notification_buffer: Option<usize>,
104104

105-
/// Increase verbosity of output
105+
/// Enable or disable persistency, default is true (enable)
106+
#[arg(short, long, env, default_value_t = true)]
107+
persistency: bool,
108+
109+
/// Filesystem location for storing persistent data, default is current working directory
110+
#[arg(long, env)]
111+
storage_path: Option<String>,
112+
113+
/// Increase verbosity of output, default is false (reduced verbosity)
106114
#[arg(short, long, env, default_value_t = false)]
107115
verbose: bool,
108116

@@ -204,5 +212,7 @@ fn config_from_args(args: &Args) -> Result<USubscriptionConfiguration, Configura
204212
authority.to_string(),
205213
args.notification_buffer,
206214
args.subscription_buffer,
215+
args.persistency,
216+
args.storage_path.clone(),
207217
)
208218
}

up-subscription/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ version.workspace = true
2424
[dependencies]
2525
async-trait = { workspace = true }
2626
env_logger = { workspace = true }
27-
futures = { workspace = true }
2827
log = { workspace = true }
28+
pickledb = { version = "0.5.1" }
2929
protobuf = { workspace = true }
30-
semver = { version = "1.0" }
30+
serde = { version = "1.0" }
3131
tokio = { workspace = true }
3232
up-rust = { workspace = true }
3333
uriparse = { version = "0.6" }

up-subscription/src/configuration.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* SPDX-License-Identifier: Apache-2.0
1212
********************************************************************************/
1313

14+
use std::path::{Path, PathBuf};
1415
use uriparse::Authority;
1516

1617
use up_rust::{
@@ -46,6 +47,8 @@ pub struct USubscriptionConfiguration {
4647
pub authority_name: String,
4748
pub subscription_command_buffer: usize,
4849
pub notification_command_buffer: usize,
50+
pub persistency_enabled: bool,
51+
pub persistency_path: PathBuf,
4952
}
5053

5154
/// Holder object for USubscription configuration options; this performs validation of configuration parameters at construction time,
@@ -58,8 +61,10 @@ impl USubscriptionConfiguration {
5861
/// # Arguments
5962
///
6063
/// * `authority_name` - Authority part of UUri that this USubscription instance is reachable on
61-
/// * `subscription_command_buffer` - buffer size for subscription manager commands, defaults to DEFAULT_COMMAND_BUFFER_SIZE when `None` or 0 is passed
62-
/// * `notification_command_buffer` - buffer size for notification manager commands, defaults to DEFAULT_COMMAND_BUFFER_SIZE when `None` or 0 is passed
64+
/// * `subscription_command_buffer` - buffer size for subscription manager commands, defaults to DEFAULT_COMMAND_BUFFER_SIZE when `None` or 0 is passed
65+
/// * `notification_command_buffer` - buffer size for notification manager commands, defaults to DEFAULT_COMMAND_BUFFER_SIZE when `None` or 0 is passed
66+
/// * `persistency_enabled` - if set to false, this USubscription instance will not persistently store subscription and notification state
67+
/// * `persistency_path` - filesystem path for persistently storing subscription and notification state, defaults to current working directory if empty
6368
///
6469
/// # Errors
6570
///
@@ -68,13 +73,32 @@ impl USubscriptionConfiguration {
6873
authority_name: String,
6974
subscription_command_buffer: Option<usize>,
7075
notification_command_buffer: Option<usize>,
76+
persistency_enabled: bool,
77+
persistency_path: Option<String>,
7178
) -> Result<USubscriptionConfiguration, ConfigurationError> {
7279
if let Err(e) = Authority::try_from(authority_name.as_bytes()) {
7380
return Err(ConfigurationError::new(format!(
7481
"Invalid authority name: {e}"
7582
)));
7683
}
7784

85+
// only accept persistency path if it points to an existing directory; if None set to cwd
86+
let persistency_path = if let Some(path_string) = persistency_path {
87+
let p = Path::new(&path_string);
88+
p.try_exists().unwrap_or_else(|_| {
89+
panic!("Persistency storage path does not exist {}", path_string)
90+
});
91+
if !p.is_dir() {
92+
panic!(
93+
"Persistency storage path is not a directory {}",
94+
path_string
95+
);
96+
}
97+
p.to_path_buf()
98+
} else {
99+
std::env::current_dir().expect("Error retrieving current working directory")
100+
};
101+
78102
Ok(USubscriptionConfiguration {
79103
authority_name,
80104
subscription_command_buffer: subscription_command_buffer
@@ -83,6 +107,8 @@ impl USubscriptionConfiguration {
83107
notification_command_buffer: notification_command_buffer
84108
.unwrap_or(DEFAULT_COMMAND_BUFFER_SIZE)
85109
.clamp(1, DEFAULT_COMMAND_BUFFER_SIZE),
110+
persistency_enabled,
111+
persistency_path,
86112
})
87113
}
88114
}

up-subscription/src/handlers/fetch_subscribers.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,8 @@
1313

1414
use async_trait::async_trait;
1515
use log::*;
16-
use std::sync::Arc;
1716
use tokio::{sync::mpsc::Sender, sync::oneshot};
1817

19-
use crate::{
20-
helpers,
21-
subscription_manager::{SubscribersResponse, SubscriptionEvent},
22-
};
23-
2418
use up_rust::{
2519
communication::{RequestHandler, ServiceInvocationError, UPayload},
2620
core::usubscription::{
@@ -30,12 +24,17 @@ use up_rust::{
3024
UAttributes,
3125
};
3226

27+
use crate::{
28+
helpers,
29+
subscription_manager::{SubscribersResponse, SubscriptionEvent},
30+
};
31+
3332
pub(crate) struct FetchSubscribersRequestHandler {
34-
subscription_sender: Arc<Sender<SubscriptionEvent>>,
33+
subscription_sender: Sender<SubscriptionEvent>,
3534
}
3635

3736
impl FetchSubscribersRequestHandler {
38-
pub(crate) fn new(subscription_sender: Arc<Sender<SubscriptionEvent>>) -> Self {
37+
pub(crate) fn new(subscription_sender: Sender<SubscriptionEvent>) -> Self {
3938
Self {
4039
subscription_sender,
4140
}
@@ -136,7 +135,7 @@ mod tests {
136135
mpsc::channel::<SubscriptionEvent>(1);
137136

138137
// create and spawn off handler, to make all the asnync goodness work
139-
let request_handler = FetchSubscribersRequestHandler::new(Arc::new(subscription_sender));
138+
let request_handler = FetchSubscribersRequestHandler::new(subscription_sender);
140139
tokio::spawn(async move {
141140
let result = request_handler
142141
.handle_request(
@@ -183,7 +182,7 @@ mod tests {
183182
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
184183

185184
// create handler and perform tested operation
186-
let request_handler = FetchSubscribersRequestHandler::new(Arc::new(subscription_sender));
185+
let request_handler = FetchSubscribersRequestHandler::new(subscription_sender);
187186

188187
let result = request_handler
189188
.handle_request(
@@ -212,7 +211,7 @@ mod tests {
212211
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
213212

214213
// create handler and perform tested operation
215-
let request_handler = FetchSubscribersRequestHandler::new(Arc::new(subscription_sender));
214+
let request_handler = FetchSubscribersRequestHandler::new(subscription_sender);
216215

217216
let result = request_handler
218217
.handle_request(
@@ -242,7 +241,7 @@ mod tests {
242241
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
243242

244243
// create handler and perform tested operation
245-
let request_handler = FetchSubscribersRequestHandler::new(Arc::new(subscription_sender));
244+
let request_handler = FetchSubscribersRequestHandler::new(subscription_sender);
246245

247246
let result = request_handler
248247
.handle_request(RESOURCE_ID_FETCH_SUBSCRIBERS, &message_attributes, None)
@@ -271,7 +270,7 @@ mod tests {
271270
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
272271

273272
// create handler and perform tested operation
274-
let request_handler = FetchSubscribersRequestHandler::new(Arc::new(subscription_sender));
273+
let request_handler = FetchSubscribersRequestHandler::new(subscription_sender);
275274

276275
let result = request_handler
277276
.handle_request(

up-subscription/src/handlers/fetch_subscriptions.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,8 @@
1313

1414
use async_trait::async_trait;
1515
use log::*;
16-
use std::sync::Arc;
1716
use tokio::{sync::mpsc::Sender, sync::oneshot};
1817

19-
use crate::{
20-
helpers,
21-
subscription_manager::{
22-
RequestKind, SubscriptionEntry, SubscriptionEvent, SubscriptionsResponse,
23-
},
24-
};
25-
2618
use up_rust::{
2719
communication::{RequestHandler, ServiceInvocationError, UPayload},
2820
core::usubscription::{
@@ -32,12 +24,19 @@ use up_rust::{
3224
UAttributes,
3325
};
3426

27+
use crate::{
28+
helpers,
29+
subscription_manager::{
30+
RequestKind, SubscriptionEntry, SubscriptionEvent, SubscriptionsResponse,
31+
},
32+
};
33+
3534
pub(crate) struct FetchSubscriptionsRequestHandler {
36-
subscription_sender: Arc<Sender<SubscriptionEvent>>,
35+
subscription_sender: Sender<SubscriptionEvent>,
3736
}
3837

3938
impl FetchSubscriptionsRequestHandler {
40-
pub(crate) fn new(subscription_sender: Arc<Sender<SubscriptionEvent>>) -> Self {
39+
pub(crate) fn new(subscription_sender: Sender<SubscriptionEvent>) -> Self {
4140
Self {
4241
subscription_sender,
4342
}
@@ -176,7 +175,7 @@ mod tests {
176175
mpsc::channel::<SubscriptionEvent>(1);
177176

178177
// create and spawn off handler, to make all the asnync goodness work
179-
let request_handler = FetchSubscriptionsRequestHandler::new(Arc::new(subscription_sender));
178+
let request_handler = FetchSubscriptionsRequestHandler::new(subscription_sender);
180179
tokio::spawn(async move {
181180
let result = request_handler
182181
.handle_request(
@@ -234,7 +233,7 @@ mod tests {
234233
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
235234

236235
// create handler and perform tested operation
237-
let request_handler = FetchSubscriptionsRequestHandler::new(Arc::new(subscription_sender));
236+
let request_handler = FetchSubscriptionsRequestHandler::new(subscription_sender);
238237

239238
let result = request_handler
240239
.handle_request(
@@ -263,7 +262,7 @@ mod tests {
263262
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
264263

265264
// create handler and perform tested operation
266-
let request_handler = FetchSubscriptionsRequestHandler::new(Arc::new(subscription_sender));
265+
let request_handler = FetchSubscriptionsRequestHandler::new(subscription_sender);
267266

268267
let result = request_handler
269268
.handle_request(
@@ -293,7 +292,7 @@ mod tests {
293292
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
294293

295294
// create handler and perform tested operation
296-
let request_handler = FetchSubscriptionsRequestHandler::new(Arc::new(subscription_sender));
295+
let request_handler = FetchSubscriptionsRequestHandler::new(subscription_sender);
297296

298297
let result = request_handler
299298
.handle_request(RESOURCE_ID_FETCH_SUBSCRIPTIONS, &message_attributes, None)
@@ -322,7 +321,7 @@ mod tests {
322321
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
323322

324323
// create handler and perform tested operation
325-
let request_handler = FetchSubscriptionsRequestHandler::new(Arc::new(subscription_sender));
324+
let request_handler = FetchSubscriptionsRequestHandler::new(subscription_sender);
326325

327326
let result = request_handler
328327
.handle_request(

0 commit comments

Comments
 (0)