Skip to content

Commit 3bbccd3

Browse files
authored
Merge pull request #2679 from emqx/20240927-r58-external-registries
feat: external schema registry
2 parents 63161a5 + ffcfd95 commit 3bbccd3

File tree

2 files changed

+263
-3
lines changed

2 files changed

+263
-3
lines changed

en_US/data-integration/schema-registry.md

Lines changed: 133 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ Because of the variety of IoT device terminals and the different coding formats
1111
The Schema Registry manages the Schema used for coding and decoding, processes the encoding or decoding requests, and returns the results. The Schema Registry in collaboration with the rule engine can be adapted for device access and rule design in
1212
various scenarios.
1313

14-
EMQX Schema Registry currently supports codecs in below formats:
14+
EMQX Schema Registry currently supports codecs in the below formats:
1515

1616
- [Avro](https://avro.apache.org)
1717
- [Protobuf](https://developers.google.com/protocol-buffers/)
1818
- [JSON Schema](https://json-schema.org/)
1919

2020
Avro and Protobuf are Schema-dependent data formats. The encoded data is binary and the decoded data is in [Map format](#rule-engine-internal-data-format-map). The decoded data can be used directly by the rule engine and other plugins. Schema Registry maintains Schema text for built-in encoding formats such as Avro and Protobuf.
2121

22-
JSON schema can be used to validate if input JSON object is following the schema definetions or if the JSON object output from the rule engine is valid before producing the data to downstream.
22+
JSON schema can be used to validate if the input JSON object is following the schema definitions or if the JSON object output from the rule engine is valid before producing the data to downstream.
2323

2424
The diagram below shows an example of a Schema Registry application. Multiple devices report data in different formats, which are decoded by Schema Registry into a uniform internal format and then forwarded to the backend application.
2525

@@ -105,3 +105,134 @@ The SQL statement above will match an MQTT message with the content of the paylo
105105
106106
**Note:** The `AS` clause is required to assign the decoded data to a key so that subsequent operations can be performed on it later.
107107
108+
109+
## External Schema Registry
110+
111+
Starting with version 5.8.1, EMQX supports configuring an external Confluent Schema Registry (CSR). This feature allows users to dynamically retrieve schemas from external registries during rule processing, enabling efficient message encoding and decoding.
112+
113+
### Create External Schema Registry in Dashboard
114+
115+
You can configure an external schema registry directly through the EMQX Dashboard, making it easy to manage your schema integration.
116+
117+
Go to **Integration** -> **Schema** on EMQX Dashboard. Select the **External** tab on the Schema page.
118+
119+
Click the **Create** button at the upper right corner. Configure the following fields:
120+
121+
- **Name**: Enter an external schema registry name that will be used in the encoding and decoding functions.
122+
- **Type**: Select the type of external schema registry. Currently, only `Confluent` is supported.
123+
- **URL**: Enter the endpoint of your Confluent Schema Registry.
124+
- **Authentication**: If you select `Basic auth`, enter the authentication credentials (username and password) for accessing the external registry.
125+
126+
Click **Create** after you complete the settings.
127+
128+
### Configure External Schema Registry via Configuration File
129+
130+
You can configure an external Confluent Schema Registry via the EMQX configuration file. Here’s an example of how to set it up:
131+
132+
133+
```hcl
134+
schema_registry {
135+
external {
136+
my_external_registry {
137+
type = confluent
138+
url = "https://confluent.registry.url:8081"
139+
auth {
140+
username = "myuser"
141+
password = "secret"
142+
}
143+
}
144+
}
145+
}
146+
```
147+
148+
In this example:
149+
150+
- `my_external_registry` is the name assigned to the external schema registry.
151+
- `type = confluent` specifies the type of external registry.
152+
- `url` is the endpoint of your Confluent Schema Registry.
153+
- `auth` contains the authentication credentials (username and password) for accessing the external registry.
154+
155+
### Use External Schema Registry in Rule Engine
156+
157+
Once an external registry is configured, you can use several functions in the EMQX rule engine to encode and decode payloads using the schemas stored in the external registry.
158+
159+
The following functions utilize a configured external CSR:
160+
161+
```sql
162+
avro_encode('my_external_registry', payload, my_schema_id)
163+
avro_decode('my_external_registry', payload, my_schema_id)
164+
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
165+
schema_decode_tagged('my_external_registry', payload)
166+
```
167+
168+
In all function usage examples below, the following example values and variable names are used:
169+
170+
- `my_external_registry` is the name you assigned to the external registry in EMQX.
171+
- `my_schema_id` is the schema ID registered in the CSR (always an integer in CSR).
172+
- `my_local_avro_schema` is the name of a locally configured Avro schema in EMQX.
173+
- `my_subject` is the subject name defined in the CSR.
174+
175+
#### Function Usage Examples
176+
177+
##### `avro_encode`
178+
179+
`avro_encode` encodes a payload using the schema ID from the external registry. The schema is retrieved dynamically at runtime and cached for subsequent runs. In Confluent Schema Registry, schema IDs are integers.
180+
181+
::: tip Note
182+
183+
When encoding, the payload must be in the internal data format of the rule engine, which is a decoded map. This is why `json_decode` is used in the example.
184+
185+
:::
186+
187+
Example:
188+
189+
```sql
190+
select
191+
-- 123 is the schema ID that is registered in CSR
192+
avro_encode('my_external_registry', json_decode(payload), 123) as encoded
193+
from 't'
194+
```
195+
196+
##### `avro_decode`
197+
198+
This function decodes an Avro payload based on the specified schema ID from the external registry. The schema is dynamically fetched during runtime and cached for subsequent operations.
199+
200+
Example:
201+
202+
```sql
203+
select
204+
-- 123 is the schema ID that is registered in CSR
205+
avro_decode('my_external_registry', payload, 123) as decoded
206+
from 't'
207+
```
208+
209+
##### `schema_encode_and_tag`
210+
211+
This function uses a locally registered Avro schema, an external CSR schema name, and a subject to encode a payload (already in internal map format), and to tag the resulting payload with a schema ID. The schema ID comes from registering the local schema to CSR.
212+
213+
Example:
214+
215+
```sql
216+
select
217+
schema_encode_and_tag(
218+
'my_local_avro_schema',
219+
'my_external_registry',
220+
json_decode(payload),
221+
'my_subject'
222+
) as encoded
223+
from 't'
224+
```
225+
226+
##### `schema_decode_tagged`
227+
228+
This function uses a CSR name to decode a payload, assuming it is tagged with the schema ID retrieved from CSR.
229+
230+
```sql
231+
select
232+
schema_decode_tagged(
233+
'my_external_registry',
234+
payload
235+
) as decoded
236+
from 't'
237+
```
238+

zh_CN/data-integration/schema-registry.md

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ schema_check(SchemaName, Map | Bytes) -> Boolean
5858

5959
## 编解码 + 规则引擎
6060

61-
EMQX 的消息处理层面可分为消息路由(Messaging)、规则引擎(Rule Engine)、数据格式转换(Data Conversion) 三个部分。
61+
EMQX 的消息处理层面可分为消息路由 (Messaging)、规则引擎 (Rule Engine)、数据格式转换 (Data Conversion) 三个部分。
6262

6363
EMQXPUB/SUB 系统将消息路由到指定的主题。规则引擎可以灵活地配置数据的业务规则,按规则匹配消息,然后指定相应动作。数据格式转换发生在规则匹配的过程之前,先将数据转换为可参与规则匹配的 Map 格式,然后进行匹配。
6464

@@ -100,3 +100,132 @@ SELECT json_decode(payload) AS p FROM "t/#" WHERE p.x = p.y
100100
```
101101

102102
**注意:** `AS` 子句是必须的,将解码之后的数据赋值给某个Key,后面才能对其进行后续操作。
103+
104+
## 外部 Schema Registry
105+
106+
从 EMQX 版本 5.8.1 开始,支持在 EMQX 中配置外部 Confluent Schema Registry (CSR)。该功能允许用户在规则处理时动态获取外部 Schema Registry 中的 Schema,从而实现高效的消息编码和解码。
107+
108+
### 在 Dashboard 中创建外部 Schema Registry
109+
110+
您可以直接通过 EMQX Dashboard 配置外部 Schema Registry,方便地管理 Schema 集成。
111+
112+
进入 EMQX Dashboard 的 **集成** -> **Schema** 页面。在 Schema 页面中选择 **外部 Schema** 选项卡。
113+
114+
点击右上角的**创建**按钮,并配置以下字段:
115+
116+
- **名称**:输入外部 Schema Registry 的名称,该名称将在编码和解码函数中使用。
117+
- **类型**:选择外部 Schema Registry 的类型。目前仅支持 `Confluent`
118+
- **URL**:输入您的 Confluent Schema Registry 的端点地址。
119+
- **认证**:如果选择 `基础认证`,请输入访问外部 Schema Registry 所需的认证信息(用户名和密码)。
120+
121+
完成设置后,点击**创建**按钮。
122+
123+
### 通过配置文件配置外部 Schema Registry
124+
125+
您也可以通过 EMQX 配置文件配置外部 Confluent Schema Registry。以下是配置示例:
126+
127+
```hcl
128+
schema_registry {
129+
external {
130+
my_external_registry {
131+
type = confluent
132+
url = "https://confluent.registry.url:8081"
133+
auth {
134+
username = "myuser"
135+
password = "secret"
136+
}
137+
}
138+
}
139+
}
140+
```
141+
142+
在此示例中:
143+
144+
- `my_external_registry` 是分配给外部 Schema Registry 的名称。
145+
- `type = confluent` 指定外部 Schema Registry 的类型。
146+
- `url` 是 Confluent Schema Registry 的端点地址。
147+
- `auth` 包含访问外部 Schema Registry 所需的认证信息(用户名和密码)。
148+
149+
### 在规则引擎中使用外部 Schema Registry
150+
151+
配置外部 Schema Registry 后,您可以在 EMQX 规则引擎中使用多个函数,利用外部 Schema Registry 中存储的 Schema 对 payload 进行编码和解码。
152+
153+
配置的外部 CSR 可以在以下函数中使用:
154+
155+
```sql
156+
avro_encode('my_external_registry', payload, my_schema_id)
157+
avro_decode('my_external_registry', payload, my_schema_id)
158+
schema_encode_and_tag('my_local_avro_schema', 'my_external_registry', payload, 'my_subject')
159+
schema_decode_tagged('my_external_registry', payload)
160+
```
161+
162+
在下面所有函数使用示例中,使用了以下示例值和变量名:
163+
164+
- `my_external_registry` 是您在 EMQX 中为外部 Schema Registry 指定的名称。
165+
- `my_schema_id` 是注册在 CSR 中的 Schema ID(在 CSR 中始终是整数)。
166+
- `my_local_avro_schema` 是在 EMQX 中配置的本地 Avro Schema 名称。
167+
- `my_subject` 是在 CSR 中定义的主题名称。
168+
169+
#### 函数使用示例
170+
171+
##### `avro_encode`
172+
173+
`avro_encode` 使用外部 Schema Registry 中的 Schema ID 对 payload 进行编码。Schema 会在运行时动态获取,并缓存以供后续使用。在 Confluent Schema Registry 中,Schema ID 是整数。
174+
175+
::: tip 提示
176+
177+
编码时,payload 必须是规则引擎的内部数据格式,即已解码的 Map。因此在示例中使用了 `json_decode`
178+
179+
:::
180+
181+
示例:
182+
183+
```sql
184+
select
185+
-- 123 是在 CSR 中注册的 Schema ID
186+
avro_encode('my_external_registry', json_decode(payload), 123) as encoded
187+
from 't'
188+
```
189+
190+
##### `avro_decode`
191+
192+
该函数根据外部 Schema Registry 中的 Schema ID 对 Avro payload 进行解码。Schema 会在运行时动态获取,并缓存以供后续操作。
193+
194+
示例:
195+
196+
```sql
197+
select
198+
-- 123 是在 CSR 中注册的 Schema ID
199+
avro_decode('my_external_registry', payload, 123) as decoded
200+
from 't'
201+
```
202+
203+
##### `schema_encode_and_tag`
204+
205+
此函数使用本地注册的 Avro Schema、外部 CSR 的 Schema 名称和主题对 payload 进行编码,并将编码后的 payload(已为内部 Map 格式)标记为带有 Schema ID。Schema ID 是通过将本地 Schema 注册到 CSR 获得的。
206+
207+
示例:
208+
209+
```sql
210+
select
211+
schema_encode_and_tag(
212+
'my_local_avro_schema',
213+
'my_external_registry',
214+
json_decode(payload),
215+
'my_subject'
216+
) as encoded
217+
from 't'
218+
```
219+
220+
##### `schema_decode_tagged`
221+
222+
此函数使用 CSR 名称对 payload 进行解码,假设该 payload 带有从 CSR 获取的 Schema ID。
223+
224+
```sql
225+
select
226+
schema_decode_tagged(
227+
'my_external_registry',
228+
payload
229+
) as decoded
230+
from 't'
231+
```

0 commit comments

Comments
 (0)