Skip to content

Commit 47bbc6c

Browse files
committed
Update to paho-mqtt 0.14.0
1 parent e5b10c5 commit 47bbc6c

6 files changed

Lines changed: 109 additions & 49 deletions

File tree

Cargo.lock

Lines changed: 36 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ default = ["cli"]
2929
cli = ["clap"]
3030

3131
[dependencies]
32-
async-channel = { version = "1.9" }
32+
async-channel = { version = "2.0.0" }
3333
async-trait = { version = "0.1" }
3434
backon = { version = "1.6", default-features = false, features = ["tokio-sleep"] }
3535
bytes = { version = "1.11.1" }
@@ -45,22 +45,22 @@ clap = { version = "4.5", optional = true, default-features = false, features =
4545
] }
4646
futures = { version = "0.3" }
4747
log = { version = "0.4" }
48-
paho-mqtt = { version = "0.13.3", features = ["vendored-ssl"] }
48+
paho-mqtt = { version = "0.14.0", features = ["vendored-ssl"] }
4949
protobuf = { version = "3.7.2" }
5050
slab = { version = "0.4.11" }
5151
tokio = { version = "1.45.1", default-features = false, features = ["rt", "rt-multi-thread", "sync"] }
5252
up-rust = { version = "0.9", default-features = false }
5353

5454
[build-dependencies]
55-
testcontainers = { version = "0.27.1", default-features = false, features = ["blocking"] }
55+
testcontainers = { version = "0.27.2", default-features = false, features = ["blocking"] }
5656

5757
[dev-dependencies]
5858
env_logger = { version = "0.11.8" }
5959
mockall = { version = "0.14.0" }
6060
serial_test = { version = "3.2.0" }
6161
tempfile = { version = "3.23.0" }
6262
test-case = { version = "3.3.1" }
63-
testcontainers = { version = "0.27.1", default-features = false }
63+
testcontainers = { version = "0.27.2", default-features = false }
6464
# https://rustsec.org/advisories/RUSTSEC-2026-0049
6565
rustls-webpki = { version = "0.103.10" }
6666
# https://rustsec.org/advisories/RUSTSEC-2026-0009

about.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ accepted = [
66
"Apache-2.0",
77
"BSD-2-Clause",
88
"BSD-3-Clause",
9-
"CDLA-Permissive-2.0",
109
"EPL-2.0",
1110
"ISC",
1211
"MIT",

deny.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ allow = [
1919
"Apache-2.0",
2020
"BSD-2-Clause",
2121
"BSD-3-Clause",
22-
"CDLA-Permissive-2.0",
2322
"EPL-2.0",
2423
"ISC",
2524
"MIT",
@@ -39,6 +38,4 @@ skip-tree = [
3938

4039
[advisories]
4140
ignore = [
42-
# https://rustsec.org/advisories/RUSTSEC-2026-0066
43-
{ id = "RUSTSEC-2026-0066", reason = "this affects the testcontainers crate which is used for running tests only" },
4441
]

src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use async_channel::Receiver;
3636
use bytes::Bytes;
3737
#[cfg(feature = "cli")]
3838
use clap::{Args, ValueEnum};
39-
use futures::stream::StreamExt;
4039
use listener_registry::{RegisteredListeners, SubscriptionIdentifier};
4140
use log::{debug, trace};
4241
use mqtt_client::MqttClientOperations;
@@ -429,12 +428,12 @@ impl Mqtt5Transport {
429428
/// * `registered_listeners` - Map of topic filters to listeners.
430429
/// * `message_stream` - Stream of incoming MQTT PUBLISH packets.
431430
/// * `mqtt_client_operations` - The client to use for interacting with the MQTT broker.
432-
fn create_cb_message_handler(&mut self, mut message_stream: Receiver<Option<Message>>) {
431+
fn create_cb_message_handler(&mut self, message_stream: Receiver<Option<Message>>) {
433432
let cloned_client_operations = self.mqtt_client.clone();
434433
let cloned_registered_listeners = self.registered_listeners.clone();
435434
let cloned_message_mapper = self.message_mapper.clone();
436435
let handle = tokio::spawn(async move {
437-
while let Some(msg_opt) = message_stream.next().await {
436+
while let Ok(msg_opt) = message_stream.recv().await {
438437
let Some(msg) = msg_opt else {
439438
// None means that the connection is dropped.
440439
debug!("Lost connection to MQTT broker");

src/mqtt_client.rs

Lines changed: 67 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use async_trait::async_trait;
2323
use backon::Retryable;
2424
#[cfg(feature = "cli")]
2525
use clap::Args;
26-
use log::{debug, trace};
26+
use log::{debug, error, trace};
2727
use up_rust::{UCode, UStatus};
2828

2929
use crate::{listener_registry::SubscribedTopicProvider, SubscriptionIdentifier};
@@ -150,7 +150,7 @@ impl TryFrom<&MqttClientOptions> for paho_mqtt::ConnectOptions {
150150
connect_options_builder.user_name(v);
151151
}
152152
if let Some(v) = options.password.as_ref() {
153-
connect_options_builder.password(v);
153+
connect_options_builder.password(v.as_bytes());
154154
}
155155
Ok(connect_options_builder.finalize())
156156
}
@@ -221,6 +221,49 @@ impl TryFrom<&SslOptions> for paho_mqtt::SslOptions {
221221
}
222222
}
223223

224+
fn ustatus_from_paho_error(paho_error: paho_mqtt::Error) -> UStatus {
225+
match paho_error {
226+
paho_mqtt::Error::Disconnected => {
227+
UStatus::fail_with_code(UCode::UNAVAILABLE, "not connected to MQTT broker")
228+
}
229+
paho_mqtt::Error::TcpTlsConnectFailure => {
230+
UStatus::fail_with_code(UCode::UNAVAILABLE, "failed to connect to MQTT broker")
231+
}
232+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::BadUserNameOrPassword, _) => {
233+
UStatus::fail_with_code(UCode::UNAUTHENTICATED, "bad credentials")
234+
}
235+
// [impl->dsn~mqtt5-transport-authorization~1]
236+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::NotAuthorized, _) => {
237+
UStatus::fail_with_code(UCode::PERMISSION_DENIED, "not authorized")
238+
}
239+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::ServerUnavailable, _) => {
240+
UStatus::fail_with_code(UCode::UNAVAILABLE, "server not available")
241+
}
242+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::ServerBusy, _) => {
243+
UStatus::fail_with_code(UCode::UNAVAILABLE, "server busy")
244+
}
245+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::BadAuthenticationMethod, _) => {
246+
UStatus::fail_with_code(UCode::UNAUTHENTICATED, "bad authentication method")
247+
}
248+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::MessageRateTooHigh, _) => {
249+
UStatus::fail_with_code(UCode::RESOURCE_EXHAUSTED, "message rate to high")
250+
}
251+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::QuotaExceeded, _) => {
252+
UStatus::fail_with_code(UCode::RESOURCE_EXHAUSTED, "quota exceeded")
253+
}
254+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::ConnectionRateExceeded, _) => {
255+
UStatus::fail_with_code(UCode::RESOURCE_EXHAUSTED, "connection rate exceeded")
256+
}
257+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::MaximumConnectTime, _) => {
258+
UStatus::fail_with_code(UCode::RESOURCE_EXHAUSTED, "maximum connect time exceeded")
259+
}
260+
_ => {
261+
error!("paho error: {paho_error:?}");
262+
UStatus::fail_with_code(UCode::INTERNAL, paho_error.to_string())
263+
}
264+
}
265+
}
266+
224267
/// Basic operations that an MQTT client performs.
225268
#[cfg_attr(test, mockall::automock)]
226269
#[async_trait]
@@ -331,18 +374,6 @@ pub(crate) struct PahoBasedMqttClientOperations {
331374
}
332375

333376
impl PahoBasedMqttClientOperations {
334-
fn ustatus_from_paho_error(paho_error: &paho_mqtt::Error) -> UStatus {
335-
match paho_error {
336-
paho_mqtt::Error::Disconnected => {
337-
UStatus::fail_with_code(UCode::UNAVAILABLE, "not connected to MQTT broker")
338-
}
339-
paho_mqtt::Error::TcpTlsConnectFailure => {
340-
UStatus::fail_with_code(UCode::UNAVAILABLE, "failed to connect to MQTT broker")
341-
}
342-
_ => UStatus::fail_with_code(UCode::UNKNOWN, paho_error.to_string()),
343-
}
344-
}
345-
346377
/// Creates new MQTT client.
347378
///
348379
/// # Arguments
@@ -491,12 +522,7 @@ impl MqttClientOperations for PahoBasedMqttClientOperations {
491522
Self::process_connack(user_data, response);
492523
}
493524
})
494-
.map_err(|e| {
495-
UStatus::fail_with_code(
496-
UCode::INTERNAL,
497-
format!("Failed to connect to MQTT broker: {e:?}"),
498-
)
499-
})
525+
.map_err(ustatus_from_paho_error)
500526
}
501527

502528
fn is_connected(&self) -> bool {
@@ -607,7 +633,7 @@ impl MqttClientOperations for PahoBasedMqttClientOperations {
607633
self.inner_mqtt_client
608634
.publish(mqtt_message)
609635
.await
610-
.map_err(|paho_error| Self::ustatus_from_paho_error(&paho_error))
636+
.map_err(ustatus_from_paho_error)
611637
}
612638

613639
async fn subscribe(&self, topic: &str, id: u16) -> Result<(), UStatus> {
@@ -634,22 +660,25 @@ impl MqttClientOperations for PahoBasedMqttClientOperations {
634660
// QOS 1 - Delivered and received at least once
635661
.subscribe_with_options(topic, paho_mqtt::QOS_1, None, subscription_properties)
636662
.await
637-
.map_err(|paho_error| Self::ustatus_from_paho_error(&paho_error))
663+
.map_err(ustatus_from_paho_error)
638664
.map(|_| ())
639665
}
640666

641667
async fn unsubscribe(&self, topic: &str) -> Result<(), UStatus> {
642668
self.inner_mqtt_client
643669
.unsubscribe(topic)
644670
.await
645-
.map_err(|paho_error| Self::ustatus_from_paho_error(&paho_error))
671+
.map_err(ustatus_from_paho_error)
646672
.map(|_| ())
647673
}
648674
}
649675

650676
#[cfg(test)]
651677
mod tests {
652-
use paho_mqtt::ConnectOptions;
678+
use paho_mqtt::{ConnectOptions, Properties};
679+
use up_rust::UCode;
680+
681+
use crate::mqtt_client::ustatus_from_paho_error;
653682

654683
use super::MqttClientOptions;
655684

@@ -668,4 +697,18 @@ mod tests {
668697
// because the ConnectOptions struct does not (yet) provide access to the CONNECT packet
669698
// properties
670699
}
700+
701+
#[test_case::test_case(
702+
paho_mqtt::Error::TcpTlsConnectFailure => UCode::UNAVAILABLE;
703+
"connect failure")]
704+
#[test_case::test_case(
705+
paho_mqtt::Error::Disconnected => UCode::UNAVAILABLE;
706+
"disconnected")]
707+
// [utest->dsn~mqtt5-transport-authorization~1]
708+
#[test_case::test_case(
709+
paho_mqtt::Error::ReasonCode(paho_mqtt::ReasonCode::NotAuthorized, Properties::new()) => UCode::PERMISSION_DENIED;
710+
"not authorized")]
711+
fn test_ustatus_from_paho_error(paho_error: paho_mqtt::Error) -> UCode {
712+
ustatus_from_paho_error(paho_error).get_code()
713+
}
671714
}

0 commit comments

Comments
 (0)