diff --git a/docs/ecosystem/doris-kafka-connector.md b/docs/ecosystem/doris-kafka-connector.md index 7baee63a94f10..892e8c11b855d 100644 --- a/docs/ecosystem/doris-kafka-connector.md +++ b/docs/ecosystem/doris-kafka-connector.md @@ -427,6 +427,67 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### Loading Data with Kafka Connect Single Message Transforms + +For example, consider data in the following format: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` +If we want to add a hard-coded column that doesn't exist in the Kafka topic data, we can use InsertField to accomplish this. Additionally, if we need to convert a bigint timestamp to a formatted time string, we can use TimestampConverter. + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +After InsertField and TimestampConverter transformations, the data becomes: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +For more examples of Kafka Connect Single Message Transforms (SMT), please refer to the [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## FAQ **1. The following error occurs when reading Json type data:** ```shell diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md index 25e4e638e3b7e..1acaedf54d086 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md @@ -427,6 +427,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### 使用 Kafka Connect SMT 转换数据 + +数据样例如下: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` + +假设我们想在 Kafka 消息中硬编码新增一个列,我们可以使用 InsertField 。另外, 我们也可以使用 TimestampConverter 将 Bigint 类型 timestamp 转换成时间字符串。 + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +样例数据经过 SMT 的处理之后,变成如下所示: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## 常见问题 **1. 读取 JSON 类型的数据报如下错误:** ```shell diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md index 981a68ac3efd3..5150340f2361b 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md @@ -427,6 +427,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### 使用 Kafka Connect SMT 转换数据 + +数据样例如下: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` + +假设我们想在 Kafka 消息中硬编码新增一个列,我们可以使用 InsertField 。另外, 我们也可以使用 TimestampConverter 将 Bigint 类型 timestamp 转换成时间字符串。 + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +样例数据经过 SMT 的处理之后,变成如下所示: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## 常见问题 **1. 读取 JSON 类型的数据报如下错误:** ```shell diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md index 48f1e27a41994..92a3b9c6d5291 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md @@ -427,6 +427,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### 使用 Kafka Connect SMT 转换数据 + +数据样例如下: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` + +假设我们想在 Kafka 消息中硬编码新增一个列,我们可以使用 InsertField 。另外, 我们也可以使用 TimestampConverter 将 Bigint 类型 timestamp 转换成时间字符串。 + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +样例数据经过 SMT 的处理之后,变成如下所示: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## 常见问题 **1. 读取 JSON 类型的数据报如下错误:** ```shell diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md index 015d2ce7179a2..e9191abc1a6af 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md @@ -427,6 +427,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### 使用 Kafka Connect SMT 转换数据 + +数据样例如下: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` + +假设我们想在 Kafka 消息中硬编码新增一个列,我们可以使用 InsertField 。另外, 我们也可以使用 TimestampConverter 将 Bigint 类型 timestamp 转换成时间字符串。 + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +样例数据经过 SMT 的处理之后,变成如下所示: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## 常见问题 **1. 读取 JSON 类型的数据报如下错误:** ```shell diff --git a/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md b/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md index 414b598a3b43a..996eb82c8e81d 100644 --- a/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md +++ b/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md @@ -427,6 +427,67 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### Loading Data with Kafka Connect Single Message Transforms + +For example, consider data in the following format: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` +If we want to add a hard-coded column that doesn't exist in the Kafka topic data, we can use InsertField to accomplish this. Additionally, if we need to convert a bigint timestamp to a formatted time string, we can use TimestampConverter. + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +After InsertField and TimestampConverter transformations, the data becomes: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +For more examples of Kafka Connect Single Message Transforms (SMT), please refer to the [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## FAQ **1. The following error occurs when reading Json type data:** ```shell diff --git a/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md b/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md index 414b598a3b43a..996eb82c8e81d 100644 --- a/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md +++ b/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md @@ -427,6 +427,67 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### Loading Data with Kafka Connect Single Message Transforms + +For example, consider data in the following format: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` +If we want to add a hard-coded column that doesn't exist in the Kafka topic data, we can use InsertField to accomplish this. Additionally, if we need to convert a bigint timestamp to a formatted time string, we can use TimestampConverter. + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +After InsertField and TimestampConverter transformations, the data becomes: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +For more examples of Kafka Connect Single Message Transforms (SMT), please refer to the [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## FAQ **1. The following error occurs when reading Json type data:** ```shell diff --git a/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md b/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md index 98b9c791f723e..29d77f5a1cf0f 100644 --- a/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md +++ b/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md @@ -427,6 +427,67 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X }' ``` +### Loading Data with Kafka Connect Single Message Transforms + +For example, consider data in the following format: +```shell +{ + "registertime": 1513885135404, + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE" +} +``` +If we want to add a hard-coded column that doesn't exist in the Kafka topic data, we can use InsertField to accomplish this. Additionally, if we need to convert a bigint timestamp to a formatted time string, we can use TimestampConverter. + +```shell +curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ + "name": "insert_field_tranform", + "config": { + "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector", + "tasks.max": "1", + "topics": "users", + "doris.topic2table.map": "users:kf_users", + "buffer.count.records": "10", + "buffer.flush.time": "11", + "buffer.size.bytes": "5000000", + "doris.urls": "127.0.0.1:8030", + "doris.user": "root", + "doris.password": "123456", + "doris.http.port": "8030", + "doris.query.port": "9030", + "doris.database": "testdb", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "transforms": "InsertField,TimestampConverter", + // Insert Static Field + "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.InsertField.static.field": "repo", + "transforms.InsertField.static.value": "Apache Doris", + // Convert Timestamp Format + "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", + "transforms.TimestampConverter.field": "registertime", + "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS", + "transforms.TimestampConverter.target.type": "string" + } +}' +``` + +After InsertField and TimestampConverter transformations, the data becomes: +```shell +{ + "userid": "User_9", + "regionid": "Region_3", + "gender": "MALE", + "repo": "Apache Doris",// Static field added + "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to string +} +``` + +For more examples of Kafka Connect Single Message Transforms (SMT), please refer to the [SMT documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html). + + ## FAQ **1. The following error occurs when reading Json type data:** ```shell