Skip to content

Commit 6a2363f

Browse files
dataroaringclaude
andcommitted
Add built-in Streaming Job option for MySQL and PostgreSQL migration
Replace the file-based Streaming Job description with the actual built-in CDC sync feature (FROM MYSQL/POSTGRES syntax). This uses Flink CDC under the hood to read binlog/WAL directly, with auto table creation and full + incremental sync in a single SQL command. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 41cb96e commit 6a2363f

File tree

4 files changed

+252
-36
lines changed

4 files changed

+252
-36
lines changed

docs/migration/mysql-to-doris.md

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,74 @@ For detailed setup, see the [Flink Doris Connector](../ecosystem/flink-doris-con
7070

7171
The [JDBC Catalog](../lakehouse/catalogs/jdbc-catalog.md) allows direct querying and batch migration from MySQL. This is the simplest approach for one-time or periodic batch migrations.
7272

73-
### Option 3: Streaming Job (Continuous File Loading)
73+
### Option 3: Streaming Job (Built-in CDC Sync)
7474

75-
Doris's built-in [Streaming Job](../data-operate/import/streaming-job.md) (`CREATE JOB ON STREAMING`) provides continuous file-based loading without external tools. Export MySQL data to S3/object storage, and the Streaming Job automatically picks up new files and loads them into Doris.
75+
Doris's built-in [Streaming Job](../data-operate/import/streaming-job/streaming-job-multi-table.md) can directly synchronize full and incremental data from MySQL to Doris without external tools like Flink. It uses CDC under the hood to read MySQL binlog and automatically creates target tables (UNIQUE KEY model) with primary keys matching the source.
7676

7777
This option is suited for:
7878

79-
- Continuous incremental migration via file export pipelines
80-
- Environments where you prefer Doris-native features over external tools like Flink
81-
- Scenarios where MySQL data is periodically exported to object storage
82-
83-
**Prerequisites**: Data exported to S3-compatible object storage; Doris 2.1+ with Job Scheduler enabled.
84-
85-
For detailed setup, see the [Streaming Job](../data-operate/import/streaming-job.md) and [CREATE STREAMING JOB](../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md) documentation.
79+
- Real-time multi-table sync without deploying a Flink cluster
80+
- Environments where you prefer Doris-native features over external tools
81+
- Full + incremental migration with a single SQL command
82+
83+
**Prerequisites**: MySQL with binlog enabled (`binlog_format = ROW`); MySQL JDBC driver deployed to Doris.
84+
85+
#### Step 1: Enable MySQL Binlog
86+
87+
Ensure `my.cnf` contains:
88+
89+
```ini
90+
[mysqld]
91+
log-bin = mysql-bin
92+
binlog_format = ROW
93+
server-id = 1
94+
```
95+
96+
#### Step 2: Create Streaming Job
97+
98+
```sql
99+
CREATE JOB mysql_sync
100+
ON STREAMING
101+
FROM MYSQL (
102+
"jdbc_url" = "jdbc:mysql://mysql-host:3306",
103+
"driver_url" = "mysql-connector-j-8.0.31.jar",
104+
"driver_class" = "com.mysql.cj.jdbc.Driver",
105+
"user" = "root",
106+
"password" = "password",
107+
"database" = "source_db",
108+
"include_tables" = "orders,customers,products",
109+
"offset" = "initial"
110+
)
111+
TO DATABASE target_db (
112+
"table.create.properties.replication_num" = "3"
113+
)
114+
```
115+
116+
Key parameters:
117+
118+
| Parameter | Description |
119+
|-----------|-------------|
120+
| `include_tables` | Comma-separated list of tables to sync |
121+
| `offset` | `initial` for full + incremental; `latest` for incremental only |
122+
| `snapshot_split_size` | Row count per split during full sync (default: 8096) |
123+
| `snapshot_parallelism` | Parallelism during full sync phase (default: 1) |
124+
125+
#### Step 3: Monitor Sync Status
126+
127+
```sql
128+
-- Check job status
129+
SELECT * FROM jobs(type=insert) WHERE ExecuteType = "STREAMING";
130+
131+
-- Check task history
132+
SELECT * FROM tasks(type='insert') WHERE jobName = 'mysql_sync';
133+
134+
-- Pause / Resume / Drop
135+
PAUSE JOB WHERE jobname = 'mysql_sync';
136+
RESUME JOB WHERE jobname = 'mysql_sync';
137+
DROP JOB WHERE jobname = 'mysql_sync';
138+
```
139+
140+
For detailed reference, see the [Streaming Job Multi-Table Sync](../data-operate/import/streaming-job/streaming-job-multi-table.md) documentation.
86141

87142
### Option 4: DataX
88143

docs/migration/postgresql-to-doris.md

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,72 @@ Flink CDC captures changes from PostgreSQL WAL (Write-Ahead Log) and streams the
6868

6969
For detailed setup, see the [Flink Doris Connector](../ecosystem/flink-doris-connector.md) documentation.
7070

71-
### Option 3: Streaming Job (Continuous File Loading)
71+
### Option 3: Streaming Job (Built-in CDC Sync)
7272

73-
Doris's built-in [Streaming Job](../data-operate/import/streaming-job.md) (`CREATE JOB ON STREAMING`) provides continuous file-based loading without external tools. Export PostgreSQL data to S3/object storage, and the Streaming Job automatically picks up new files and loads them into Doris.
73+
Doris's built-in [Streaming Job](../data-operate/import/streaming-job/streaming-job-multi-table.md) can directly synchronize full and incremental data from PostgreSQL to Doris without external tools like Flink. It uses CDC under the hood to read PostgreSQL WAL and automatically creates target tables (UNIQUE KEY model) with primary keys matching the source.
7474

7575
This option is suited for:
7676

77-
- Continuous incremental migration via file export pipelines
78-
- Environments where you prefer Doris-native features over external tools like Flink
79-
- Scenarios where PostgreSQL data is periodically exported to object storage
80-
81-
**Prerequisites**: Data exported to S3-compatible object storage; Doris 2.1+ with Job Scheduler enabled.
82-
83-
For detailed setup, see the [Streaming Job](../data-operate/import/streaming-job.md) and [CREATE STREAMING JOB](../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md) documentation.
77+
- Real-time multi-table sync without deploying a Flink cluster
78+
- Environments where you prefer Doris-native features over external tools
79+
- Full + incremental migration with a single SQL command
80+
81+
**Prerequisites**: PostgreSQL with logical replication enabled (`wal_level = logical`); PostgreSQL JDBC driver deployed to Doris.
82+
83+
#### Step 1: Enable Logical Replication
84+
85+
Ensure `postgresql.conf` contains:
86+
87+
```ini
88+
wal_level = logical
89+
```
90+
91+
#### Step 2: Create Streaming Job
92+
93+
```sql
94+
CREATE JOB pg_sync
95+
ON STREAMING
96+
FROM POSTGRES (
97+
"jdbc_url" = "jdbc:postgresql://pg-host:5432/source_db",
98+
"driver_url" = "postgresql-42.5.6.jar",
99+
"driver_class" = "org.postgresql.Driver",
100+
"user" = "postgres",
101+
"password" = "password",
102+
"database" = "source_db",
103+
"schema" = "public",
104+
"include_tables" = "orders,customers,products",
105+
"offset" = "initial"
106+
)
107+
TO DATABASE target_db (
108+
"table.create.properties.replication_num" = "3"
109+
)
110+
```
111+
112+
Key parameters:
113+
114+
| Parameter | Description |
115+
|-----------|-------------|
116+
| `include_tables` | Comma-separated list of tables to sync |
117+
| `offset` | `initial` for full + incremental; `latest` for incremental only |
118+
| `snapshot_split_size` | Row count per split during full sync (default: 8096) |
119+
| `snapshot_parallelism` | Parallelism during full sync phase (default: 1) |
120+
121+
#### Step 3: Monitor Sync Status
122+
123+
```sql
124+
-- Check job status
125+
SELECT * FROM jobs(type=insert) WHERE ExecuteType = "STREAMING";
126+
127+
-- Check task history
128+
SELECT * FROM tasks(type='insert') WHERE jobName = 'pg_sync';
129+
130+
-- Pause / Resume / Drop
131+
PAUSE JOB WHERE jobname = 'pg_sync';
132+
RESUME JOB WHERE jobname = 'pg_sync';
133+
DROP JOB WHERE jobname = 'pg_sync';
134+
```
135+
136+
For detailed reference, see the [Streaming Job Multi-Table Sync](../data-operate/import/streaming-job/streaming-job-multi-table.md) documentation.
84137

85138
### Option 4: Export and Load
86139

i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/mysql-to-doris.md

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,74 @@ Flink CDC 捕获 MySQL binlog 变更并流式传输到 Doris。此方法适用
7070

7171
[JDBC Catalog](../lakehouse/catalogs/jdbc-catalog.md) 允许从 MySQL 直接查询和批量迁移。这是一次性或定期批量迁移最简单的方法。
7272

73-
### 选项 3:Streaming Job(持续文件加载
73+
### 选项 3:Streaming Job(内置 CDC 同步
7474

75-
Doris 内置的 [Streaming Job](../data-operate/import/streaming-job.md)`CREATE JOB ON STREAMING`)提供无需外部工具的持续文件加载能力。将 MySQL 数据导出到 S3/对象存储,Streaming Job 会自动发现新文件并加载到 Doris
75+
Doris 内置的 [Streaming Job](../data-operate/import/streaming-job/streaming-job-multi-table.md) 可以直接从 MySQL 同步全量和增量数据到 Doris,无需部署 Flink 等外部工具。底层使用 CDC 读取 MySQL binlog,并自动创建目标表(UNIQUE KEY 模型),主键与源表保持一致
7676

7777
此选项适用于:
7878

79-
- 通过文件导出管道进行持续增量迁移
80-
- 偏好使用 Doris 原生功能而非 Flink 等外部工具的环境
81-
- MySQL 数据定期导出到对象存储的场景
82-
83-
**前提条件**:数据已导出到 S3 兼容的对象存储;Doris 2.1+ 并启用 Job Scheduler。
84-
85-
详细设置请参考 [Streaming Job](../data-operate/import/streaming-job.md)[CREATE STREAMING JOB](../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md) 文档。
79+
- 无需部署 Flink 集群的实时多表同步
80+
- 偏好使用 Doris 原生功能而非外部工具的环境
81+
- 通过单条 SQL 命令实现全量 + 增量迁移
82+
83+
**前提条件**:MySQL 启用 binlog(`binlog_format = ROW`);MySQL JDBC 驱动已部署到 Doris。
84+
85+
#### 步骤 1:启用 MySQL Binlog
86+
87+
确保 `my.cnf` 包含:
88+
89+
```ini
90+
[mysqld]
91+
log-bin = mysql-bin
92+
binlog_format = ROW
93+
server-id = 1
94+
```
95+
96+
#### 步骤 2:创建 Streaming Job
97+
98+
```sql
99+
CREATE JOB mysql_sync
100+
ON STREAMING
101+
FROM MYSQL (
102+
"jdbc_url" = "jdbc:mysql://mysql-host:3306",
103+
"driver_url" = "mysql-connector-j-8.0.31.jar",
104+
"driver_class" = "com.mysql.cj.jdbc.Driver",
105+
"user" = "root",
106+
"password" = "password",
107+
"database" = "source_db",
108+
"include_tables" = "orders,customers,products",
109+
"offset" = "initial"
110+
)
111+
TO DATABASE target_db (
112+
"table.create.properties.replication_num" = "3"
113+
)
114+
```
115+
116+
关键参数:
117+
118+
| 参数 | 说明 |
119+
|------|------|
120+
| `include_tables` | 逗号分隔的待同步表列表 |
121+
| `offset` | `initial` 全量 + 增量;`latest` 仅增量 |
122+
| `snapshot_split_size` | 全量同步时每个分片的行数(默认:8096) |
123+
| `snapshot_parallelism` | 全量同步阶段的并行度(默认:1) |
124+
125+
#### 步骤 3:监控同步状态
126+
127+
```sql
128+
-- 查看 Job 状态
129+
SELECT * FROM jobs(type=insert) WHERE ExecuteType = "STREAMING";
130+
131+
-- 查看 Task 历史
132+
SELECT * FROM tasks(type='insert') WHERE jobName = 'mysql_sync';
133+
134+
-- 暂停 / 恢复 / 删除
135+
PAUSE JOB WHERE jobname = 'mysql_sync';
136+
RESUME JOB WHERE jobname = 'mysql_sync';
137+
DROP JOB WHERE jobname = 'mysql_sync';
138+
```
139+
140+
详细参考请见 [Streaming Job 多表同步](../data-operate/import/streaming-job/streaming-job-multi-table.md) 文档。
86141

87142
### 选项 4:DataX
88143

i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/postgresql-to-doris.md

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,72 @@ Flink CDC 从 PostgreSQL WAL(预写日志)捕获变更并实时流式传输
6868

6969
详细设置请参考 [Flink Doris Connector](../ecosystem/flink-doris-connector.md) 文档。
7070

71-
### 选项 3:Streaming Job(持续文件加载
71+
### 选项 3:Streaming Job(内置 CDC 同步
7272

73-
Doris 内置的 [Streaming Job](../data-operate/import/streaming-job.md)`CREATE JOB ON STREAMING`)提供无需外部工具的持续文件加载能力。将 PostgreSQL 数据导出到 S3/对象存储,Streaming Job 会自动发现新文件并加载到 Doris
73+
Doris 内置的 [Streaming Job](../data-operate/import/streaming-job/streaming-job-multi-table.md) 可以直接从 PostgreSQL 同步全量和增量数据到 Doris,无需部署 Flink 等外部工具。底层使用 CDC 读取 PostgreSQL WAL,并自动创建目标表(UNIQUE KEY 模型),主键与源表保持一致
7474

7575
此选项适用于:
7676

77-
- 通过文件导出管道进行持续增量迁移
78-
- 偏好使用 Doris 原生功能而非 Flink 等外部工具的环境
79-
- PostgreSQL 数据定期导出到对象存储的场景
80-
81-
**前提条件**:数据已导出到 S3 兼容的对象存储;Doris 2.1+ 并启用 Job Scheduler。
82-
83-
详细设置请参考 [Streaming Job](../data-operate/import/streaming-job.md)[CREATE STREAMING JOB](../sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md) 文档。
77+
- 无需部署 Flink 集群的实时多表同步
78+
- 偏好使用 Doris 原生功能而非外部工具的环境
79+
- 通过单条 SQL 命令实现全量 + 增量迁移
80+
81+
**前提条件**:PostgreSQL 启用逻辑复制(`wal_level = logical`);PostgreSQL JDBC 驱动已部署到 Doris。
82+
83+
#### 步骤 1:启用逻辑复制
84+
85+
确保 `postgresql.conf` 包含:
86+
87+
```ini
88+
wal_level = logical
89+
```
90+
91+
#### 步骤 2:创建 Streaming Job
92+
93+
```sql
94+
CREATE JOB pg_sync
95+
ON STREAMING
96+
FROM POSTGRES (
97+
"jdbc_url" = "jdbc:postgresql://pg-host:5432/source_db",
98+
"driver_url" = "postgresql-42.5.6.jar",
99+
"driver_class" = "org.postgresql.Driver",
100+
"user" = "postgres",
101+
"password" = "password",
102+
"database" = "source_db",
103+
"schema" = "public",
104+
"include_tables" = "orders,customers,products",
105+
"offset" = "initial"
106+
)
107+
TO DATABASE target_db (
108+
"table.create.properties.replication_num" = "3"
109+
)
110+
```
111+
112+
关键参数:
113+
114+
| 参数 | 说明 |
115+
|------|------|
116+
| `include_tables` | 逗号分隔的待同步表列表 |
117+
| `offset` | `initial` 全量 + 增量;`latest` 仅增量 |
118+
| `snapshot_split_size` | 全量同步时每个分片的行数(默认:8096) |
119+
| `snapshot_parallelism` | 全量同步阶段的并行度(默认:1) |
120+
121+
#### 步骤 3:监控同步状态
122+
123+
```sql
124+
-- 查看 Job 状态
125+
SELECT * FROM jobs(type=insert) WHERE ExecuteType = "STREAMING";
126+
127+
-- 查看 Task 历史
128+
SELECT * FROM tasks(type='insert') WHERE jobName = 'pg_sync';
129+
130+
-- 暂停 / 恢复 / 删除
131+
PAUSE JOB WHERE jobname = 'pg_sync';
132+
RESUME JOB WHERE jobname = 'pg_sync';
133+
DROP JOB WHERE jobname = 'pg_sync';
134+
```
135+
136+
详细参考请见 [Streaming Job 多表同步](../data-operate/import/streaming-job/streaming-job-multi-table.md) 文档。
84137

85138
### 选项 4:导出和加载
86139

0 commit comments

Comments
 (0)