Skip to content
Open
8 changes: 8 additions & 0 deletions dir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,14 @@
children:
- message-queue/message-queue-task
- message-queue/message-queue-quick-start
- title_en: Message Stream
title_cn: 消息流
title_ja: Message Queue
path: message-stream/message-stream-concept
collapsed: true
children:
- message-stream/message-stream-task
- message-stream/message-stream-quick-start
- title_en: MQTT over QUIC
title_cn: MQTT over QUIC
title_ja: MQTT over QUIC
Expand Down
1 change: 1 addition & 0 deletions en_US/design/durable-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,5 @@ Durable Storage serves as the core data foundation for several high-reliability

- **[MQTT Durable Sessions](../durability/durability_introduction.md)**: A DS-based mechanism for persisting session state and undelivered messages.
- **[Message Queue](../message-queue/message-queue-concept.md)**: A built-in message queueing feature that provides ordered message delivery, message replay, and high availability across the EMQX cluster.
- **[Message Stream](../message-stream/message-stream-concept.md)**: A DS-based persistent message streaming and replay mechanism that continuously stores MQTT messages matching topic filters and enables historical message replay.
- **[Shared Subscription](../messaging/mqtt-shared-subscription.md)**: A load-balancing subscription mechanism that distributes messages among multiple subscribers in the same group.
33 changes: 21 additions & 12 deletions en_US/message-queue/message-queue-concept.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,35 @@ Message Queue extends the MQTT protocol in EMQX. It allows messages to be persis
## Message Queue Concepts

- **Queue Name**

An MQTT topic or topic filter that identifies the queue. Messages published to matching topics are automatically enqueued.

- **Queue Declaration**

The process of creating a durable queue and defining its behavior through configurable properties.

- **Queue Deletion**

The removal of a queue along with all its stored messages.

- **Last-Value Semantics**

An optional feature enabled by setting a **Queue Key Expression** during queue declaration. When enabled, EMQX will extract the `queue key` from each message as it enters the queue. A new message with the same key will overwrite any existing unconsumed message in the queue with that key. This behavior is ideal for stateful messaging or configuration updates, where only the latest value matters and older messages can be safely discarded.

- **Topic Prefix**

Queue subscriptions use the special `$q/{topic}` prefix to distinguish them from regular MQTT subscriptions.

- **Queue Properties**

Customizable settings that control queue behavior, such as message retention time and dispatch strategy.

- **Quality of Service (QoS)**

All messages in Message Queues are delivered with QoS 1 (at-least-once), regardless of the QoS level used when publishing or subscribing. This ensures reliable message delivery and unifies the queue's delivery behavior.

- **Message Persistence**

Messages are retained even when no subscribers are connected. By default, queues apply last-value semantics. For regular queues (without a key expression), messages are stored in the order received.

## How Message Queue Works
Expand All @@ -66,18 +81,12 @@ The Message Queue feature in EMQX is implemented as a loosely coupled extension

The following main components are involved:

- **Message Queue Registry**
Manages the lifecycle of all message queues. Responsible for creating, deleting, and looking up queues.
- **Message Queue Message DB**
Stores the actual messages published to queues and is built on EMQX’s [Durable Storage](../durability/durability_introduction.md#durable-storage-architecture).
- **Message Queue State Storage**
Persists consumption progress and queue metadata (e.g., TTL, properties).
- **Message Queue Consumer**
Retrieves messages from the queue and dispatches them to connected subscribers based on the dispatch strategy.
- **Message Queue Subscription Registry**
Tracks which channels (clients) are subscribed to which queues. Stores subscription state in each channel’s context.
- **Message Queue Hooks**
Hook into publish and subscribe events to intercept messages and route them to queues or consumers.
- **Message Queue Registry**: Manages the lifecycle of all message queues. Responsible for creating, deleting, and looking up queues.
- **Message Queue Message DB**: Stores the actual messages published to queues and is built on EMQX’s [Durable Storage](../durability/durability_introduction.md#durable-storage-architecture).
- **Message Queue State Storage**: Persists consumption progress and queue metadata (e.g., TTL, properties).
- **Message Queue Consumer**: Retrieves messages from the queue and dispatches them to connected subscribers based on the dispatch strategy.
- **Message Queue Subscription Registry**: Tracks which channels (clients) are subscribed to which queues. Stores subscription state in each channel’s context.
- **Message Queue Hooks**: Hook into publish and subscribe events to intercept messages and route them to queues or consumers.

### Message Queue Data Flow Diagram

Expand Down
16 changes: 6 additions & 10 deletions en_US/message-queue/message-queue-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

This page walks you through the practical usage of the Message Queue feature in EMQX, from creating queues to configuring their behavior and managing them using the Dashboard, REST API, or configuration files.

## Manually Create Message Queue via Dashboard
## Manually Create Message Queues via Dashboard

Message Queues must be explicitly declared/created before they can store or dispatch messages. You can create message queues either manually or automatically. For details about automatic creation, see [Automatically Create Message Queue via Dashboard](#automatically-create-message-queue-via-dashboard).

To create a new Message Queue manually using the EMQX Dashboard:
Message queues must be explicitly declared/created before they can store or dispatch messages. You can create message queues either manually or automatically. For details about automatic creation, see [Automatically Create Message Queues via Dashboard](#automatically-create-message-queue-via-dashboard).

1. Navigate to **Message Queue** in the left menu.

Expand Down Expand Up @@ -120,9 +118,9 @@ Queue Key Expressions are evaluated against the following message structure:

</details>

## Automatically Create Message Queue via Dashboard
## Automatically Create Message Queues via Dashboard

Starting from EMQX 6.0.1, Message Queues can be automatically created when clients subscribe to a `$q/`-prefixed topic. This allows queues to be provisioned dynamically without manual setup.
Starting from EMQX 6.0.1, message queues can be automatically created when clients subscribe to a `$q/`-prefixed topic. This allows queues to be provisioned dynamically without manual setup.

The queues may be auto-created either as regular queues or last-value semantics queues.

Expand Down Expand Up @@ -163,14 +161,12 @@ This option can be enabled manually if you prefer regular queues where messages

## Configure Message Queue Settings

This section explains how to configure global settings that apply to all Message Queues in EMQX. These settings control message retention, cleanup intervals, internal queue behavior, and queue auto-creation behavior. You can configure them via the Dashboard, REST API, or configuration file.
This section explains how to configure global settings that apply to all message queues in EMQX. These settings control message retention, cleanup intervals, internal queue behavior, and queue auto-creation behavior. You can configure them via the Dashboard, REST API, or configuration file.

### Dashboard

You can update Message Queue settings directly from the EMQX Dashboard without restarting the broker. This is useful for making changes to system-wide behavior at runtime.

To configure global settings for Message Queues via the Dashboard:

1. Go to **Management** -> **MQTT Settings** -> **Message Queue** tab.

Alternatively, you can click the **Settings** button in the top-right corner of the **Message Queue** page.
Expand Down Expand Up @@ -232,7 +228,7 @@ mq {
Determines how frequently a subscriber retries to locate a queue when subscribing to a `$q/` topic that does not yet exist.
- **`max_queue_count`**: (Optional) Sets the maximum number of queues that can be created.

## Manage Message Queue via REST API
## Manage Message Queues via REST API

EMQX provides a set of REST APIs to manage the lifecycle of Message Queues, including creation, retrieval, update, and deletion.

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added en_US/message-stream/assets/replay_messages.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
145 changes: 145 additions & 0 deletions en_US/message-stream/message-stream-concept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Message Stream

EMQX 6.1 introduces the Message Stream, a streaming and replay feature that extends MQTT’s real-time publish/subscribe model with persistent, replayable message streams. It enables Kafka-like streaming capabilities while preserving MQTT semantics.

This page provides a complete overview of the Message Stream feature in EMQX, covering its design motivation, key concepts, internal architecture, message flow, and real-world application scenarios.

## What Is a Message Stream?

A Message Stream is a logical collection of MQTT messages that automatically collects messages matching a topic filter during its lifetime. It stores matching messages durably and allows clients to replay historical data by subscribing to a stream-specific topic.

## Why Use Message Stream?

MQTT is optimized for real-time messaging, but it has inherent limitations:

- Messages are typically delivered only to online subscribers.
- Historical data replay is not natively supported.
- Reprocessing past data requires external systems.
- Maintaining an ordered, replayable message history is difficult.

Message Stream extends MQTT with durable message storage and replay. It allows consumers to read historical messages and retrieve the latest state of devices without changing how MQTT clients publish or subscribe.

## Message Stream Key Concepts

- **Message Stream**

A logical resource identified by an MQTT topic filter and managed with an explicit lifecycle. While active, it continuously stores matching messages within configured time or size limits. Stored messages can be replayed by subscribing consumers, without requiring any changes on the publishing side.

- **Regular Message Stream**: A regular message stream stores all matching messages without overwriting historical data. Consumers can replay all messages published after any given point in time by subscribing with a timestamp.
- **Last-Value Message Stream**: A last-value message stream enables [Last-Value semantics](#last-value-semantics). For messages with the same stream key, newer messages overwrite older ones, and the stream retains only the latest message associated with each key.

- **Topic Filter**

An MQTT topic filter, such as `sensors/+/data`, that determines which published messages are captured into a stream. Only matching messages are ingested, and a single message may belong to multiple streams.

- **Stream Subscription**

A special MQTT subscription used to consume messages from a stream. Clients subscribe using the `$s/<timestamp>/<topic_filter>` format. The timestamp specifies the replay starting point. Stream subscriptions operate independently of regular MQTT subscriptions and are delivered through the External Subscription mechanism.

- **Key Expression**

A user-defined expression evaluated on each incoming message to extract a key. The expression may reference message content or metadata. The extracted key is used to guarantee message ordering within a storage partition. When Last-Value semantics are enabled for the message stream, the key also defines the overwrite scope: newer messages with the same key replace older ones.

## Message Streams Architecture

Message Streams are implemented as a standalone EMQX application that is loosely coupled with the broker core and reuses existing infrastructure. Integration with EMQX is achieved through internal hooks and the External Subscription framework.

::: tip

External Subscription is an EMQX mechanism that connects external message sources (messages originating outside the live MQTT publish path) to MQTT client sessions, allowing those messages to be delivered to MQTT clients through standard MQTT subscriptions without changing client behavior.

:::

### Main Components

- **Streams Registry**: Manages the lifecycle of Message Streams and maintains stream metadata and indexes. It uses a Mnesia table to efficiently look up streams by topic filter.
- **Streams Message Database**: Provides durable storage for stream messages and is built on top of EMQX [Durable Storage](../design/durable-storage.md). It persists messages, enforces retention limits, applies Last-Value semantics when enabled, and supports efficient message retrieval until messages expire according to retention policies.
- **Streams ExtSub Handler**: Integrates Message Streams with MQTT client sessions. It retrieves messages from Durable Storage and delivers them to subscribing clients through the External Subscription framework.

### Message Stream Data Flow Diagram

The following diagram shows the data flow between the Message Stream components:

```ascii
+-----------------------+
| Message Stream DS DB |
+-----------------------+
^ ^
| |
| |
| | Subscription on topic data
| |
| +-------------------------------------+ +--------------------------------+
| | | Client Connection |
| | (Subscribing channel) |
| Write transaction | | +----------------------------+ |
| | | | ExtSub | |
| | | | +------------------------+ | |
| | | | | Streams ExtSub Handler | | |
+---------------------------+ +----->| | | |
| Client Connection |
|(Publishing channel) | | | +----------------|-------+ | |
| | | +------------------|---------+ |
+---------------------------+ +--------------------|-----------+
| |
| Stream |
| lookup V
| +--------------------------+
| Fast stream lookup in the index | Streams Registry |
+----------------------------------------------------------------> | |
+--------------------------+
```

### Publishing Flow

1. A client publishes a message to an MQTT topic.
2. A Message Stream hook is triggered to process the publication.
3. The hook queries the Streams Registry to identify streams whose topic filters match the message topic.
4. For each matching stream, the message is written to the stream and persisted in Durable Storage.

### Subscribing and Consuming Flow

1. A client subscribes to a stream topic (`$s/<timesstamp>/<topic_filter>`).
2. The External Subscription framework handles the subscription and initializes a Streams ExtSub handler for the stream topic.
3. The handler retrieves messages from Durable Storage according to the specified timestamp and retention rules.
4. Retrieved messages are passed to the External Subscription framework.
5. The ExtSub application delivers messages to the client via standard MQTT delivery.

## Message Stream Core Features

Message Stream provides a set of core capabilities that define how messages are stored, ordered, retained, and delivered for replay-based consumption.

- **Timestamp-Based Replay**

Message streams support replay starting from a specified timestamp. Consumers choose the timestamp when subscribing. Messages published before the timestamp are skipped.

- **Retention**

The stream’s retention policy limits the message replay. Messages are retained for a limited time or size. Expired messages are removed automatically, regardless of whether they have been consumed.

- **Per-Key Ordering**

Message streams do not guarantee a single global delivery order. Messages that share the same key are always delivered in the order in which they were published. Messages with different keys may be delivered in any order.

- **Last-Value Semantics**

A stream may enable Last-Value semantics. Messages with the same key overwrite earlier messages. Only the latest value is retained. Messages without a resolved key are stored normally.

- **MQTT-Native Delivery**

Stream messages are delivered using standard MQTT mechanisms. Publishers do not need to change their behavior. Message delivery to subscribers is integrated through External Subscription.

## Typical Use Cases

- **Historical Data Replay**: Reprocess past MQTT events for debugging or new business logic.
- **Time-Series Analysis**: Store and replay sensor data for analytics and predictive maintenance.
- **Event Sourcing**: Persist all state changes as an immutable event log.
- **IoT Digital Twins**: Maintain the latest state of physical devices in digital form.
- **Configuration Synchronization**: Ensure devices always receive the most recent configuration.

## Next Steps

Now that you understand the Message Stream fundamentals, explore how to put them into practice:

- [Create and Configure a Stream](./message-stream-task.md): Learn how to declare streams via Dashboard or REST API, and set last-value semantics and retention policies.
- [Quick Start Tutorial](./message-stream-quick-start.md): Follow a step-by-step guide using MQTTX to simulate real-world publisher and subscriber scenarios.
Loading