Skip to content

Commit da15d53

Browse files
committed
Connect mqtt5 transport at startup
1 parent e19518b commit da15d53

4 files changed

Lines changed: 32 additions & 6 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,17 @@ version = "0.4.0"
3030

3131
[workspace.dependencies]
3232
async-trait = { version = "0.1" }
33+
backon = { version = "1.6", default-features = false, features = [
34+
"tokio-sleep",
35+
] }
3336
mockall = { version = "0.14" }
3437
protobuf = { version = "3.7.2" }
3538
test-case = { version = "3.3" }
3639
tokio = { version = "1", features = ["full"] }
37-
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
40+
tracing = { version = "0.1", default-features = false, features = [
41+
"log",
42+
"std",
43+
] }
3844
up-rust = { version = "0.9.0", features = ["usubscription"] }
3945
up-subscription = { path = "./up-subscription" }
4046

up-subscription-cli/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mqtt5 = ["dep:up-transport-mqtt5"]
3333
zenoh = ["dep:up-transport-zenoh", "dep:serde_json"]
3434

3535
[dependencies]
36+
backon = { workspace = true }
3637
clap = { version = "4.5.53", default-features = false, features = [
3738
"std",
3839
"derive",
@@ -47,7 +48,7 @@ serde_json = { version = "1.0", optional = true }
4748
tokio = { workspace = true }
4849
tracing = { workspace = true }
4950
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
50-
up-rust = { workspace = true, features = ["usubscription", "util" ] }
51+
up-rust = { workspace = true, features = ["usubscription", "util"] }
5152
up-subscription = { workspace = true }
5253
up-transport-mqtt5 = { version = "0.4.0", optional = true }
5354
up-transport-zenoh = { version = "0.9.0", optional = true }

up-subscription-cli/src/transport/mqtt5.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,36 @@
1111
* SPDX-License-Identifier: Apache-2.0
1212
********************************************************************************/
1313

14-
use std::sync::Arc;
14+
use std::{sync::Arc, time::Duration};
1515

16+
use backon::{ExponentialBuilder, Retryable};
1617
use tracing::info;
17-
use up_rust::UTransport;
18+
use up_rust::{UCode, UTransport};
1819
use up_transport_mqtt5::{Mqtt5Transport, Mqtt5TransportOptions};
1920

2021
pub(crate) async fn get_mqtt5_transport(
2122
authority_name: &str,
2223
mqtt5_args: Mqtt5TransportOptions,
2324
) -> Result<Arc<dyn UTransport>, Box<dyn std::error::Error>> {
2425
info!("Using MQTT 5 uProtocol transport");
25-
Ok(Mqtt5Transport::new(mqtt5_args, authority_name)
26+
27+
let transport = Mqtt5Transport::new(mqtt5_args, authority_name)
2628
.await
27-
.map(Arc::new)?)
29+
.map(Arc::new)?;
30+
31+
(|| transport.connect())
32+
.retry(
33+
ExponentialBuilder::default().with_total_delay(Some(Duration::from_secs(10))),
34+
)
35+
.notify(|error, sleep_duration| {
36+
info!("Attempt to connect to MQTT broker failed [error: {error}], retrying in {sleep_duration:?}");
37+
})
38+
.when(|err| {
39+
// no need to keep retrying if authentication or permission is denied
40+
err.get_code() != UCode::UNAUTHENTICATED
41+
&& err.get_code() != UCode::PERMISSION_DENIED
42+
})
43+
.await?;
44+
info!("Connected to MQTT5 broker");
45+
Ok(transport)
2846
}

0 commit comments

Comments
 (0)