|
| 1 | +# Message Stream Quick Start |
| 2 | + |
| 3 | +This page walks you through how to use the Message Stream feature in EMQX 6.1. You’ll use MQTTX to simulate clients, create and manage message streams from the EMQX Dashboard, and experience how messages can be stored, replayed, and compacted. |
| 4 | + |
| 5 | +## Objectives |
| 6 | + |
| 7 | +This quick start demonstrates how EMQX Message Stream can: |
| 8 | + |
| 9 | +- Persist messages independently of subscriber availability |
| 10 | +- Support timestamp-based replay |
| 11 | +- Enable Last-Value semantics for state-oriented messaging |
| 12 | + |
| 13 | +## Prerequisites |
| 14 | + |
| 15 | +Before starting, ensure you have: |
| 16 | + |
| 17 | +- EMQX 6.1+ running |
| 18 | +- [MQTTX](https://mqttx.app/) (or any MQTT 5.0-capable client) |
| 19 | +- Access to the EMQX Dashboard (default: `http://localhost:18083`) |
| 20 | + |
| 21 | +## Test Message Stream Basic Features (Regular Stream) |
| 22 | + |
| 23 | +This section demonstrates how Message Stream stores messages and allows consumers to replay historical data. |
| 24 | + |
| 25 | +### Prerequisite |
| 26 | + |
| 27 | +Before starting, ensure that the Message Stream feature is enabled and that the auto-creation behavior will not interfere with this example. |
| 28 | + |
| 29 | +1. Go to **Message Stream** in the left menu. |
| 30 | + |
| 31 | +2. If Message Stream is disabled, click **Settings**. You will be redirected to the **Management** -> **MQTT Settings** -> **Message Stream** page. |
| 32 | + |
| 33 | +3. Toggle the **Enable Message Stream** switch on. |
| 34 | + |
| 35 | +4. Verify the auto-create settings to ensure a **regular Message Stream** is used: |
| 36 | + |
| 37 | + - **Enable Auto Create Message Stream** is disabled, or |
| 38 | + |
| 39 | + - **Auto Create Message Stream Type** is set to **Regular Message Stream** |
| 40 | + |
| 41 | + > This prevents the stream from being auto-created as a Last-Value Message Stream, which would retain only the most recent message per key. |
| 42 | +
|
| 43 | +4. If you make any changes, click **Save Changes** to apply them. |
| 44 | + |
| 45 | + <img src="./assets/message_stream_settings.png" alt="message_stream_settings" style="zoom:67%;" /> |
| 46 | + |
| 47 | +### Step 1: Create a Message Stream |
| 48 | + |
| 49 | +1. Navigate to **Message Stream** in the left menu. |
| 50 | + |
| 51 | +2. Click **Create Stream** on the page, or click **Create** in the upper-right corner. |
| 52 | + |
| 53 | +3. In the **Create Message Stream** dialog, configure the following settings: |
| 54 | + - **Topic Filter**: `demo/stream` |
| 55 | + - **Data Retention Period**: `1` day |
| 56 | + - **Last-Value Semantics**: Disabled |
| 57 | + - **Stream Key Expression**: `message.from` |
| 58 | + |
| 59 | +4. Click **Create**. |
| 60 | + |
| 61 | +  |
| 62 | + |
| 63 | +### Step 2: Publish Messages |
| 64 | + |
| 65 | +Use MQTTX to simulate a client acting as a **publisher**: |
| 66 | + |
| 67 | +1. Open MQTTX and create a client (for example, `publisher`). |
| 68 | +2. Connect to EMQX (`mqtt://localhost:1883`). |
| 69 | +3. Publish several messages to the topic `demo/stream` with QoS 1. |
| 70 | + |
| 71 | +Examples: |
| 72 | + |
| 73 | +``` |
| 74 | +Topic: demo/stream |
| 75 | +QoS: 1 |
| 76 | +Payload: {"value": 1} |
| 77 | +Payload: {"value": 2} |
| 78 | +Payload: {"value": 3} |
| 79 | +``` |
| 80 | + |
| 81 | +Since this is a regular stream, **all messages are stored** in the stream. |
| 82 | + |
| 83 | +### Step 3: Replay All Messages from the Stream |
| 84 | + |
| 85 | +Now simulate a **consumer** that replays stored messages. |
| 86 | + |
| 87 | +1. Open a second MQTTX client (for example, `consumer`). |
| 88 | + |
| 89 | +2. Connect to EMQX. |
| 90 | + |
| 91 | +3. Subscribe to the stream topic using the earliest timestamp: |
| 92 | + |
| 93 | + ``` |
| 94 | + Topic: $s/0/demo/stream |
| 95 | + QoS: 1 |
| 96 | + ``` |
| 97 | + |
| 98 | +  |
| 99 | + |
| 100 | +**Expected Behavior**: |
| 101 | + You should receive all previously published messages, in publish order: |
| 102 | + |
| 103 | +``` |
| 104 | +{"value": 1} |
| 105 | +{"value": 2} |
| 106 | +{"value": 3} |
| 107 | +``` |
| 108 | + |
| 109 | +This confirms that: |
| 110 | + |
| 111 | +- The stream is a regular message stream. |
| 112 | +- Timestamp-based replay is working correctly. |
| 113 | +- No messages were compacted or overwritten. |
| 114 | + |
| 115 | + |
| 116 | + |
| 117 | +## Replay Messages from Different Positions |
| 118 | + |
| 119 | +Message Streams allow consumers to control where message replay starts by specifying a timestamp when subscribing. |
| 120 | + |
| 121 | +1. Publish additional messages from the `publisher` client: |
| 122 | + |
| 123 | + ``` |
| 124 | + {"value": 4} |
| 125 | + {"value": 5} |
| 126 | + ``` |
| 127 | + |
| 128 | +2. In a new MQTTX client, subscribe to the stream using a later timestamp: |
| 129 | + |
| 130 | + ``` |
| 131 | + Topic: $s/1766383734000/demo/stream |
| 132 | + QoS: 1 |
| 133 | + ``` |
| 134 | + |
| 135 | + In this example, `1766383734000` is a Unix timestamp in milliseconds. Only messages published **at or after** this time are delivered to the subscriber. |
| 136 | + |
| 137 | + ::: tip |
| 138 | + |
| 139 | + - The timestamp must be a Unix timestamp in milliseconds. |
| 140 | + - Use `0` to replay from the earliest available message. |
| 141 | + - Use a later timestamp to replay only newer messages. |
| 142 | + |
| 143 | + You can obtain the current timestamp in milliseconds using: |
| 144 | + |
| 145 | + - **Linux / macOS**: |
| 146 | + |
| 147 | + ``` |
| 148 | + date +%s000 |
| 149 | + ``` |
| 150 | +
|
| 151 | + - **JavaScript**: |
| 152 | +
|
| 153 | + ``` |
| 154 | + Date.now() |
| 155 | + ``` |
| 156 | +
|
| 157 | + ::: |
| 158 | +
|
| 159 | +3. Click **Confirm**. Only messages published **at or after** the specified timestamp are delivered. |
| 160 | +
|
| 161 | +  |
| 162 | +
|
| 163 | +This demonstrates consumer-controlled replay, where different consumers can independently read the same message stream from different points in time without affecting each other. |
| 164 | +
|
| 165 | +## Test Last-Value Semantics |
| 166 | +
|
| 167 | +This section demonstrates how **Last-Value Message Streams** keep only the latest message per key, which is useful for representing state. |
| 168 | +
|
| 169 | +### Step 1: Delete the Existing Stream |
| 170 | +
|
| 171 | +1. Navigate to **Message Stream** in the Dashboard. |
| 172 | +2. Locate the stream with the topic filter `demo/stream`. |
| 173 | +3. Click **Delete** and confirm. |
| 174 | +
|
| 175 | +### Step 2: Create a Last-Value Message Stream |
| 176 | +
|
| 177 | +1. Click **Create** on the **Message Stream** page. |
| 178 | +2. Configure the following settings: |
| 179 | + - **Topic Filter**: `device/state` |
| 180 | + - **Data Retention Period**: `1` day |
| 181 | + - **Last-Value Semantics**: Enabled |
| 182 | + - **Stream Key Expression**: `message.from` |
| 183 | +3. Click **Create**. |
| 184 | +
|
| 185 | +The stream is now configured to retain only the latest message per key. |
| 186 | +
|
| 187 | +### Step 3: Publish State Updates |
| 188 | +
|
| 189 | +1. In MQTTX, use a client with ID `device-1`. |
| 190 | +
|
| 191 | +2. Publish messages to `device/state`: |
| 192 | +
|
| 193 | + | Field | Value | |
| 194 | + | ------- | ---------------------- | |
| 195 | + | Topic | `device/state` | |
| 196 | + | QoS | 1 | |
| 197 | + | Payload | `{"status": "online"}` | |
| 198 | +
|
| 199 | +3. Publish another message from the same client: |
| 200 | +
|
| 201 | + ``` |
| 202 | + {"status": "offline"} |
| 203 | + ``` |
| 204 | +
|
| 205 | +Because the **Stream Key Expression** is `message.from`, both messages share the same key. The second message overwrites the first. |
| 206 | +
|
| 207 | +### Step 4: Subscribe to the Stream |
| 208 | +
|
| 209 | +1. Open a new MQTTX client (for example, `monitor`). |
| 210 | +
|
| 211 | +2. Subscribe to the stream topic: |
| 212 | +
|
| 213 | + ``` |
| 214 | + Topic: $s/0/device/state |
| 215 | + QoS: 1 |
| 216 | + ``` |
| 217 | +
|
| 218 | +  |
| 219 | +
|
| 220 | +**Expected Behavior**: |
| 221 | +
|
| 222 | +Only the most recent message is delivered: |
| 223 | +
|
| 224 | +``` |
| 225 | +{"status": "offline"} |
| 226 | +``` |
| 227 | +
|
| 228 | + |
| 229 | +
|
| 230 | +This demonstrates how Message Streams support state-oriented messaging patterns using Last-Value semantics. |
0 commit comments