From b4b42b5cae879b35277337fdf98a31573c21ad3c Mon Sep 17 00:00:00 2001 From: Florian Waibel Date: Mon, 16 Mar 2026 13:19:47 +0100 Subject: [PATCH] Add non-blocking publish methods to MqttHandle for time-critical loops --- CHANGELOG.md | 4 + crates/rustyfarian-esp-idf-mqtt/src/lib.rs | 100 +++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd913d1..61ff9dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `rustyfarian-esp-idf-mqtt`: non-blocking `MqttHandle::try_publish`, `try_publish_retained`, and `try_publish_with` with `TryPublishError` for time-critical loops + ## [0.1.0] - 2026-03-16 ### Added diff --git a/crates/rustyfarian-esp-idf-mqtt/src/lib.rs b/crates/rustyfarian-esp-idf-mqtt/src/lib.rs index 103701f..c20c5de 100644 --- a/crates/rustyfarian-esp-idf-mqtt/src/lib.rs +++ b/crates/rustyfarian-esp-idf-mqtt/src/lib.rs @@ -23,6 +23,23 @@ //! handle.publish("status", "online")?; //! ``` //! +//! ## Non-blocking publish +//! +//! For time-critical loops (e.g. ESP-NOW at 50 Hz), use [`MqttHandle::try_publish`] +//! to avoid blocking when the event loop holds the client mutex during reconnects. +//! Messages are silently dropped on `WouldBlock` — buffer or count misses at the +//! application layer if lossless delivery matters: +//! +//! ```ignore +//! use rustyfarian_esp_idf_mqtt::TryPublishError; +//! +//! match handle.try_publish("sensors/temp", "22.5") { +//! Ok(()) => log::info!("published"), +//! Err(TryPublishError::WouldBlock) => { /* skip this tick, retry next */ } +//! Err(TryPublishError::Other(e)) => log::warn!("publish failed: {}", e), +//! } +//! ``` +//! //! [`MqttManager`] is still available but deprecated — use [`MqttBuilder`] for //! new code. @@ -61,6 +78,26 @@ use esp_idf_svc::mqtt::client::{ EspMqttClient, EventPayload, LwtConfiguration, MqttClientConfiguration, QoS, }; +/// Error returned by the `try_publish*` family when the publish cannot +/// complete without blocking. +#[derive(Debug)] +pub enum TryPublishError { + /// The MQTT client mutex is held by the event loop (e.g. during reconnect). + /// The caller should retry on the next tick. + WouldBlock, + /// Any other publish failure (invalid topic, enqueue error, poisoned mutex). + Other(anyhow::Error), +} + +impl std::fmt::Display for TryPublishError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::WouldBlock => write!(f, "MQTT client busy (would block)"), + Self::Other(e) => write!(f, "{:#}", e), + } + } +} + /// Last Will and Testament configuration. /// /// The broker publishes this message on behalf of the client when it @@ -792,6 +829,69 @@ impl MqttHandle { Ok(()) } + /// Non-blocking publish with QoS 1 and no retain flag. + /// + /// Returns [`TryPublishError::WouldBlock`] if the MQTT client mutex is + /// held by the event loop (e.g. during a reconnect). + pub fn try_publish(&self, topic: &str, payload: &str) -> Result<(), TryPublishError> { + self.try_publish_with(topic, payload.as_bytes(), QoS::AtLeastOnce, false) + } + + /// Non-blocking retained publish with QoS 1. + /// + /// Returns [`TryPublishError::WouldBlock`] if the mutex is held. + pub fn try_publish_retained(&self, topic: &str, payload: &str) -> Result<(), TryPublishError> { + self.try_publish_with(topic, payload.as_bytes(), QoS::AtLeastOnce, true) + } + + /// Non-blocking publish with explicit QoS and retain control. + /// + /// Uses `Mutex::try_lock()` instead of `lock()`, returning immediately + /// with [`TryPublishError::WouldBlock`] when the event loop thread + /// holds the client mutex (e.g. during a WiFi-triggered reconnect). + /// + /// # Message loss + /// + /// Messages are **silently dropped** when `WouldBlock` is returned. + /// During prolonged reconnects (tens of seconds on poor WiFi) every + /// call will return `WouldBlock`, so the caller must decide whether to + /// discard, buffer, or count missed publishes at the application layer. + /// + /// # Tight-loop usage + /// + /// Calling this in a busy loop without any yield or sleep will spin the + /// CPU. In a fixed-rate loop (e.g. 50 Hz game tick) the natural tick + /// interval provides sufficient back-off. Free-running loops should + /// add a short delay or yield between retries. + /// + /// # Arguments + /// + /// * `topic` - The topic to publish to + /// * `payload` - The message payload + /// * `qos` - Quality of Service level + /// * `retain` - Whether the broker should retain this message + pub fn try_publish_with( + &self, + topic: &str, + payload: &[u8], + qos: QoS, + retain: bool, + ) -> Result<(), TryPublishError> { + validate_publish_topic(topic) + .map_err(|e| TryPublishError::Other(anyhow::anyhow!("invalid publish topic: {}", e)))?; + log::debug!("[mqtt] try_publish to '{}': {} bytes", topic, payload.len()); + let mut guard = self.client.try_lock().map_err(|e| match e { + std::sync::TryLockError::WouldBlock => TryPublishError::WouldBlock, + std::sync::TryLockError::Poisoned(_) => { + TryPublishError::Other(anyhow::anyhow!("MQTT client mutex poisoned")) + } + })?; + guard + .enqueue(topic, qos, retain, payload) + .map_err(|e| TryPublishError::Other(e.into()))?; + Ok(()) + } + /// Subscribes to a topic. /// /// # Important