Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `rustyfarian-esp-idf-mqtt`: non-blocking `MqttHandle::try_publish`, `try_publish_retained`, and `try_publish_with` with `TryPublishError` for time-critical loops

## [0.1.0] - 2026-03-16

### Added
Expand Down
100 changes: 100 additions & 0 deletions crates/rustyfarian-esp-idf-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@
//! handle.publish("status", "online")?;
//! ```
//!
//! ## Non-blocking publish
//!
//! For time-critical loops (e.g. ESP-NOW at 50 Hz), use [`MqttHandle::try_publish`]
//! to avoid blocking when the event loop holds the client mutex during reconnects.
//! Messages are silently dropped on `WouldBlock` — buffer or count misses at the
//! application layer if lossless delivery matters:
//!
//! ```ignore
//! use rustyfarian_esp_idf_mqtt::TryPublishError;
//!
//! match handle.try_publish("sensors/temp", "22.5") {
//! Ok(()) => log::info!("published"),
//! Err(TryPublishError::WouldBlock) => { /* skip this tick, retry next */ }
//! Err(TryPublishError::Other(e)) => log::warn!("publish failed: {}", e),
//! }
//! ```
//!
//! [`MqttManager`] is still available but deprecated — use [`MqttBuilder`] for
//! new code.

Expand Down Expand Up @@ -61,6 +78,26 @@ use esp_idf_svc::mqtt::client::{
EspMqttClient, EventPayload, LwtConfiguration, MqttClientConfiguration, QoS,
};

/// Error returned by the `try_publish*` family when the publish cannot
/// complete without blocking.
#[derive(Debug)]
pub enum TryPublishError {
/// The MQTT client mutex is held by the event loop (e.g. during reconnect).
/// The caller should retry on the next tick.
WouldBlock,
/// Any other publish failure (invalid topic, enqueue error, poisoned mutex).
Other(anyhow::Error),
}

impl std::fmt::Display for TryPublishError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::WouldBlock => write!(f, "MQTT client busy (would block)"),
Self::Other(e) => write!(f, "{:#}", e),
}
}
}

/// Last Will and Testament configuration.
///
/// The broker publishes this message on behalf of the client when it
Expand Down Expand Up @@ -792,6 +829,69 @@ impl MqttHandle {
Ok(())
}

/// Non-blocking publish with QoS 1 and no retain flag.
///
/// Returns [`TryPublishError::WouldBlock`] if the MQTT client mutex is
/// held by the event loop (e.g. during a reconnect).
pub fn try_publish(&self, topic: &str, payload: &str) -> Result<(), TryPublishError> {
self.try_publish_with(topic, payload.as_bytes(), QoS::AtLeastOnce, false)
}

/// Non-blocking retained publish with QoS 1.
///
/// Returns [`TryPublishError::WouldBlock`] if the mutex is held.
pub fn try_publish_retained(&self, topic: &str, payload: &str) -> Result<(), TryPublishError> {
self.try_publish_with(topic, payload.as_bytes(), QoS::AtLeastOnce, true)
}

/// Non-blocking publish with explicit QoS and retain control.
///
/// Uses `Mutex::try_lock()` instead of `lock()`, returning immediately
/// with [`TryPublishError::WouldBlock`] when the event loop thread
/// holds the client mutex (e.g. during a WiFi-triggered reconnect).
///
/// # Message loss
///
/// Messages are **silently dropped** when `WouldBlock` is returned.
/// During prolonged reconnects (tens of seconds on poor WiFi) every
/// call will return `WouldBlock`, so the caller must decide whether to
/// discard, buffer, or count missed publishes at the application layer.
///
/// # Tight-loop usage
///
/// Calling this in a busy loop without any yield or sleep will spin the
/// CPU. In a fixed-rate loop (e.g. 50 Hz game tick) the natural tick
/// interval provides sufficient back-off. Free-running loops should
/// add a short delay or yield between retries.
///
/// # Arguments
///
/// * `topic` - The topic to publish to
/// * `payload` - The message payload
/// * `qos` - Quality of Service level
/// * `retain` - Whether the broker should retain this message
pub fn try_publish_with(
&self,
topic: &str,
payload: &[u8],
qos: QoS,
retain: bool,
) -> Result<(), TryPublishError> {
validate_publish_topic(topic)
.map_err(|e| TryPublishError::Other(anyhow::anyhow!("invalid publish topic: {}", e)))?;
log::debug!("[mqtt] try_publish to '{}': {} bytes", topic, payload.len());
let mut guard = self.client.try_lock().map_err(|e| match e {
std::sync::TryLockError::WouldBlock => TryPublishError::WouldBlock,
std::sync::TryLockError::Poisoned(_) => {
TryPublishError::Other(anyhow::anyhow!("MQTT client mutex poisoned"))
}
})?;
guard
.enqueue(topic, qos, retain, payload)
.map_err(|e| TryPublishError::Other(e.into()))?;
Ok(())
}

/// Subscribes to a topic.
///
/// # Important
Expand Down
Loading