Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ version = "0.4.0"

[workspace.dependencies]
async-trait = { version = "0.1" }
backon = { version = "1.6", default-features = false, features = [
"tokio-sleep",
] }
mockall = { version = "0.14" }
protobuf = { version = "3.7.2" }
test-case = { version = "3.3" }
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
tracing = { version = "0.1", default-features = false, features = [
"log",
"std",
] }
up-rust = { version = "0.9.0", features = ["usubscription"] }
up-subscription = { path = "./up-subscription" }

Expand All @@ -59,3 +65,4 @@ install-updater = false
[profile.dist]
inherits = "release"
lto = "thin"
overflow-checks = true
5 changes: 5 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ allow = ["Apache-2.0", "BSD-2-Clause", "MIT", "Unicode-3.0"]
name = "ring"
expression = "MIT AND ISC AND OpenSSL"
license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]

[advisories]
ignore = [
"RUSTSEC-2026-0007", # We perform integer overflow checks also in release builds
]
3 changes: 2 additions & 1 deletion up-subscription-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mqtt5 = ["dep:up-transport-mqtt5"]
zenoh = ["dep:up-transport-zenoh", "dep:serde_json"]

[dependencies]
backon = { workspace = true }
clap = { version = "4.5.53", default-features = false, features = [
"std",
"derive",
Expand All @@ -47,7 +48,7 @@ serde_json = { version = "1.0", optional = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
up-rust = { workspace = true, features = ["usubscription", "util" ] }
up-rust = { workspace = true, features = ["usubscription", "util"] }
up-subscription = { workspace = true }
up-transport-mqtt5 = { version = "0.4.0", optional = true }
up-transport-zenoh = { version = "0.9.0", optional = true }
26 changes: 22 additions & 4 deletions up-subscription-cli/src/transport/mqtt5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,36 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use backon::{ExponentialBuilder, Retryable};
use tracing::info;
use up_rust::UTransport;
use up_rust::{UCode, UTransport};
use up_transport_mqtt5::{Mqtt5Transport, Mqtt5TransportOptions};

pub(crate) async fn get_mqtt5_transport(
authority_name: &str,
mqtt5_args: Mqtt5TransportOptions,
) -> Result<Arc<dyn UTransport>, Box<dyn std::error::Error>> {
info!("Using MQTT 5 uProtocol transport");
Ok(Mqtt5Transport::new(mqtt5_args, authority_name)

let transport = Mqtt5Transport::new(mqtt5_args, authority_name)
.await
.map(Arc::new)?)
.map(Arc::new)?;

(|| transport.connect())
.retry(
ExponentialBuilder::default().with_total_delay(Some(Duration::from_secs(10))),
)
.notify(|error, sleep_duration| {
info!("Attempt to connect to MQTT broker failed [error: {error}], retrying in {sleep_duration:?}");
})
.when(|err| {
// no need to keep retrying if authentication or permission is denied
err.get_code() != UCode::UNAUTHENTICATED
&& err.get_code() != UCode::PERMISSION_DENIED
})
.await?;
info!("Connected to MQTT5 broker");
Ok(transport)
}
Loading