Skip to content

Commit bd30f89

Browse files
author
lmj
committed
docs(phase6): add orchestration+JDBC sink E2E guide; update Phase 6 issues/status and TODOs
1 parent 29b5e45 commit bd30f89

4 files changed

Lines changed: 303 additions & 54 deletions

File tree

README.md

Lines changed: 200 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,50 @@ db-syncer task create \
138138
db-syncer task start <task-id>
139139
```
140140

141+
### Phase 6: 任务执行与 JDBC Sink(要点)
142+
143+
- 任务编排:CLI/REST 启动任务时,会自动生成/覆盖 Source 与 Sink 的 Connector 配置、部署到 Kafka Connect,并等待运行状态。
144+
- Debezium 2.x 兼容:
145+
- MySQL Source 使用 `topic.prefix`(替代 legacy 的 `database.server.name`)。
146+
- 提供默认 `database.server.id`
147+
- 使用 file-based schema history:`schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory`,路径:`/kafka/connect/custom-connectors/schema-history/<task>.dat`
148+
- 演示环境下默认 `snapshot.locking.mode=none`,避免 MySQL 用户缺少 RELOAD/LOCK TABLES 权限导致初始快照失败。
149+
- 转换与路由(Sink):
150+
- 启用 `key/value.converter.schemas.enable=true`(覆盖 worker 默认关闭 schemas)。
151+
- 注入 SMT:`ExtractNewRecordState`(解包 Debezium 信封)+ `RegexRouter`(将主题名路由为纯表名)。
152+
- `topics.regex` 仅匹配业务主题(排除 schema history 主题)。
153+
- 容器内主机名:在 `docker-compose` 环境中,Source/Sink 连接数据库时请使用容器服务名(如 `mysql-source``postgres-target`),而非 `localhost`
154+
155+
快速 E2E 验证(本地 docker 开发环境):
156+
157+
```bash
158+
# 1) 启动依赖环境
159+
cd docker && docker-compose up -d
160+
161+
# 2) 创建任务(以 MySQL -> PostgreSQL 为例,使用容器主机名)
162+
db-syncer task create \
163+
--name e2e-mysql-to-pg2 \
164+
--description "E2E MySQL to PostgreSQL (docker hostnames)" \
165+
--source-type MYSQL --source-host mysql-source --source-port 3306 \
166+
--source-db source_db --source-user dbuser --source-pass dbpass \
167+
--target-type POSTGRESQL --target-host postgres-target --target-port 5432 \
168+
--target-db target_db --target-user targetuser --target-pass targetpass
169+
170+
# 3) 启动任务(自动部署 Source/Sink)
171+
db-syncer task start e2e-mysql-to-pg2
172+
173+
# 4) 写入源库数据(在主机)
174+
docker exec -i db-syncer-mysql-source \
175+
mysql -udbuser -pdbpass -D source_db \
176+
-e "CREATE TABLE IF NOT EXISTS customers (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100), email VARCHAR(100), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP); INSERT INTO customers(name,email) VALUES ('Alice','alice@example.com');"
177+
178+
# 5) 校验目标库
179+
docker exec -i db-syncer-postgres-target \
180+
psql -U targetuser -d target_db \
181+
-c "\\dt" \
182+
-c "SELECT * FROM public.customers LIMIT 5;"
183+
```
184+
141185
### 监控进度
142186

143187
```bash
@@ -153,6 +197,137 @@ db-syncer task pause <task-id> # 暂停任务
153197
db-syncer task resume <task-id> # 恢复任务
154198
db-syncer task stop <task-id> # 停止任务
155199
db-syncer task delete <task-id> # 删除任务
200+
201+
# 设置任务的类型映射规则文件(存入 source_properties)
202+
db-syncer task props set <task-id-or-name> \
203+
--scope source \
204+
--key typeMapping.rulesPath \
205+
--value /path/to/rules.yaml
206+
```
207+
208+
### 启用类型映射 SMT(可选)
209+
210+
生成带有类型映射的 Source Connector 配置模板(会注入 ApplyTypeMapping SMT):
211+
212+
```bash
213+
# 为任务生成 Source Connector 配置,并开启类型映射(以 MySQL 为例)
214+
db-syncer config generate <task-id-or-name> --type source \
215+
--enable-type-mapping \
216+
--type-mapping-source-db mysql \
217+
--type-mapping-enable-time true \
218+
--type-mapping-enable-json true \
219+
-o source-config.json
220+
```
221+
222+
对应的配置片段将包含:
223+
224+
```
225+
transforms=applyTypeMapping
226+
transforms.applyTypeMapping.type=com.dbsyncer.transformations.smt.ApplyTypeMapping
227+
transforms.applyTypeMapping.source.db=mysql
228+
transforms.applyTypeMapping.enable.time.mapping=true
229+
transforms.applyTypeMapping.enable.json.mapping=true
230+
transforms.applyTypeMapping.enable.decimal.mapping=true
231+
transforms.applyTypeMapping.decimal.target=string
232+
```
233+
234+
说明:
235+
- `source.db`: 源数据库方言(mysql/oracle/postgresql),用于选择类型映射器。
236+
- `enable.time.mapping`: 将 Debezium 时间类逻辑类型映射为 Kafka Connect 标准 Date/Time/Timestamp。
237+
- `enable.json.mapping`: 将 Debezium Json 逻辑类型映射为普通字符串。
238+
- `enable.decimal.mapping`: 将 Debezium VariableScaleDecimal 映射为目标类型。
239+
- `decimal.target`: 目标类型,`string`(默认)或 `decimal`(Kafka Connect Decimal,注意可能引发频繁 Schema 变更)。
240+
241+
也可以在创建任务时通过元数据默认值控制(存储在 `source_properties`):
242+
243+
```
244+
typeMapping.enabled=true
245+
typeMapping.sourceDb=mysql
246+
typeMapping.enableTime=true
247+
typeMapping.enableJson=true
248+
```
249+
当使用 `db-syncer config generate --type source` 且未显式传入 CLI 开关时,会读取上述默认值注入到配置模板。
250+
251+
### 快速生成目标表 DDL(预览)
252+
253+
当你只有源库的列定义时,可快速预览映射后的 PostgreSQL 表结构:
254+
255+
```bash
256+
# 基于列定义快速生成 PostgreSQL DDL(也可用 --task 自动读 source-db 与 rulesPath)
257+
db-syncer transform ddl --source-db mysql --schema public --table orders \
258+
"id:bigint unsigned not null" \
259+
"flag:tinyint(1)" \
260+
"amount:decimal(18,2)" \
261+
"payload:json" \
262+
"created_at:timestamp" \
263+
--pk id \
264+
--if-not-exists \
265+
--rules rules.yaml
266+
267+
# 从文件加载列定义(每行一个 COLUMN 规范,支持 # 注释)
268+
db-syncer transform ddl --task mytask --table orders \
269+
--columns-file columns.txt --mode create
270+
271+
# 基于 Debezium/Connect JSON Schema 直接生成 DDL(自动选择 envelope.after 或顶层 struct)
272+
db-syncer transform schema --task mytask --file ./value-schema.json \
273+
--table orders --mode create --format text --pk id
274+
275+
# 以 ALTER 形式输出(逐列 ADD COLUMN),便于演进已有表
276+
db-syncer transform ddl --task mytask --table orders \
277+
"new_col:varchar(100)" --mode alter --if-not-exists --timestamp-tz without
278+
279+
# 仅输出列定义(便于嵌入到其它模板),可控制时间戳是否带时区
280+
db-syncer transform ddl --task mytask --table orders \
281+
"ts_col:timestamp" --schema-only --timestamp-tz with
282+
283+
# 以 JSON 格式输出结构(便于自动化处理)
284+
db-syncer transform ddl --task mytask --table orders \
285+
"id:bigint unsigned not null" "name:varchar(255)" \
286+
--format json
287+
288+
# JSON 输出示例
289+
{
290+
"schema" : "public",
291+
"table" : "orders",
292+
"mode" : "create",
293+
"columns" : [ {
294+
"name" : "id",
295+
"type" : "numeric(20,0)",
296+
"notNull" : true
297+
}, {
298+
"name" : "name",
299+
"type" : "varchar(255)",
300+
"notNull" : false
301+
} ],
302+
"primaryKey" : [ "id" ]
303+
}
304+
305+
# 类型映射预览(JSON 输出)
306+
db-syncer transform map --task mytask --format json \
307+
"id:bigint unsigned" "name:varchar(255)" "payload:varbinary(1024)"
308+
309+
# 从文件加载列定义进行类型映射预览
310+
db-syncer transform map --task mytask --columns-file columns.txt --format json
311+
312+
# 生成规则模板(YAML),用于自定义类型映射
313+
db-syncer transform rules --source-db mysql -o rules.yaml
314+
db-syncer task props set <task-id-or-name> --scope source --key typeMapping.rulesPath --value /path/to/rules.yaml
315+
316+
# 预览时将二进制列映射为 text(而非 bytea)
317+
db-syncer transform ddl --task mytask --table files \
318+
"content:varbinary(4096)" --binary-as text
319+
```
320+
321+
输出示例:
322+
323+
```
324+
CREATE TABLE "public"."orders" (
325+
"id" numeric(20,0) NOT NULL,
326+
"flag" boolean,
327+
"amount" numeric(18,2),
328+
"payload" jsonb,
329+
"created_at" timestamp without time zone
330+
);
156331
```
157332

158333
## 项目结构
@@ -167,6 +342,8 @@ db-syncer-debezium/
167342
├── monitoring/ # 监控模块
168343
├── docker/ # Docker 编排文件
169344
└── docs/ # 文档
345+
├── MAPPING_REFERENCE.md # 类型映射参考
346+
└── rules/ # 规则模板
170347
```
171348

172349
## 开发路线图
@@ -176,11 +353,11 @@ db-syncer-debezium/
176353
### 里程碑
177354

178355
- [x] Phase 1: 项目基础设施
179-
- [ ] Phase 2: 元数据管理系统
180-
- [ ] Phase 3: CLI 命令行工具
181-
- [ ] Phase 4: Debezium Source Connector 集成
356+
- [x] Phase 2: 元数据管理系统
357+
- [x] Phase 3: CLI 命令行工具
358+
- [x] Phase 4: Debezium Source Connector 集成
182359
- [ ] Phase 5: 数据转换层
183-
- [ ] Phase 6: JDBC Sink 与任务执行
360+
- [ ] Phase 6: JDBC Sink 与任务执行(核心流转与 E2E 已完成,其余优化进行中)
184361
- [ ] Phase 7: 监控与可观测性
185362
- [ ] Phase 8: 测试与文档
186363

@@ -196,3 +373,22 @@ Apache License 2.0 - 详见 [LICENSE](LICENSE) 文件
196373

197374
- 提交 Issue: [GitHub Issues](https://github.com/your-username/db-syncer-debezium/issues)
198375
- 文档: [Wiki](https://github.com/your-username/db-syncer-debezium/wiki)
376+
也可以仅做类型映射预览,并加载规则覆盖(或用 --task 自动读取默认 rules):
377+
378+
```bash
379+
# rules.yaml 示例:
380+
# sourceDb: mysql
381+
# overrides:
382+
# - name: tinyint
383+
# lengthEquals: 1
384+
# target: boolean
385+
# - name: bigint
386+
# unsigned: true
387+
# target: numeric(20,0)
388+
389+
db-syncer transform map --source-db mysql --rules rules.yaml \
390+
"id:bigint unsigned" "flag:tinyint(1)" "name:varchar(255)"
391+
```
392+
# 示例规则文件
393+
- MySQL: docs/rules/mysql-defaults.yaml
394+
- Oracle: docs/rules/oracle-defaults.yaml

docs/DEVELOPMENT.md

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,59 @@ mvn clean compile
136136
mvn clean install -DskipTests
137137

138138
# 完整构建(包含测试)
139-
mvn clean install
140-
```
139+
mvn clean install
140+
```
141+
142+
## 数据转换与 SMT 调试
143+
144+
本项目提供 `ApplyTypeMapping` SMT 以统一 Debezium 事件中的逻辑类型与不同源数据库的类型。
145+
146+
- 启用方式(生成 Source Connector 配置模板时):
147+
```bash
148+
dbsyncer config generate <task> --type source \
149+
--enable-type-mapping \
150+
--type-mapping-source-db mysql \
151+
--type-mapping-enable-time true \
152+
--type-mapping-enable-json true \
153+
# 可选:Decimal 转换
154+
transforms.applyTypeMapping.enable.decimal.mapping=true \
155+
transforms.applyTypeMapping.decimal.target=string
156+
```
157+
158+
- 也可通过任务元数据默认值(`source_properties`)控制:
159+
```
160+
typeMapping.enabled=true
161+
typeMapping.sourceDb=mysql
162+
typeMapping.enableTime=true
163+
typeMapping.enableJson=true
164+
typeMapping.enableDecimal=true
165+
typeMapping.decimalTarget=string
166+
```
167+
168+
启用后,配置中将注入 `transforms.applyTypeMapping.*` 参数,SMT 会将 Debezium 时间逻辑类型映射为 Kafka Connect 标准 Date/Time/Timestamp,并将 Debezium Json 逻辑类型映射为字符串,便于 JDBC Sink 处理。
169+
170+
### Debezium 2.x 与 JDBC Sink E2E 提示
171+
172+
- MySQL Source
173+
- 使用 `topic.prefix`(替代 `database.server.name`)。
174+
- 提供唯一 `database.server.id`(可由任务 ID 派生)。
175+
- file-based schema history(无需额外 Kafka 主题):
176+
- `schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory`
177+
- `schema.history.internal.file.filename=/kafka/connect/custom-connectors/schema-history/<task>.dat`
178+
- 演示环境建议 `snapshot.locking.mode=none`,避免因 MySQL 账号缺少 RELOAD/LOCK TABLES 权限导致 snapshot 失败。
179+
180+
- JDBC Sink
181+
- 覆盖转换器以启用 schema:`key/value.converter.schemas.enable=true`
182+
- 注入 SMT:
183+
- `ExtractNewRecordState`:解包 Debezium Envelope -> 扁平记录。
184+
- `RegexRouter`:将 `prefix.db.table` 路由为 `table`,便于按表名建表。
185+
- `topics.regex` 仅匹配业务数据主题(排除 schema history)。
186+
187+
- 容器内主机名
188+
- Source/Sink 连接数据库时使用容器服务名(例如 `mysql-source``postgres-target`)。
189+
190+
- Kafka Connect REST
191+
- 默认 `http://localhost:8083`,可通过 `CONNECT_REST_URL` 覆盖。
141192

142193
## 项目结构
143194

docs/ISSUES_PLAN.md

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -422,44 +422,43 @@
422422
**描述**: 集成 Debezium JDBC Sink Connector
423423

424424
**验收标准**:
425-
- [ ] JDBC Sink Connector 依赖
426-
- [ ] PostgreSQL Sink 配置生成
427-
- [ ] Upsert 语义配置
428-
- [ ] 批处理配置
429-
- [ ] 错误处理配置
425+
- [x] JDBC Sink Connector 依赖
426+
- [x] PostgreSQL Sink 配置生成
427+
- [x] Upsert 语义配置(record_key / delete.enabled / schema.evolution=basic)
428+
- [x] 批处理配置(batch.size)
429+
- [x] 转换与路由(ExtractNewRecordState + RegexRouter)
430430

431431
### Issue #37: 任务编排引擎
432432
**标签**: `task`, `phase-6`
433433
**描述**: 实现完整的任务编排和执行引擎
434434

435435
**验收标准**:
436-
- [ ] TaskOrchestrator 核心类
437-
- [ ] 任务状态机实现
438-
- [ ] Source Connector 部署
439-
- [ ] Sink Connector 部署
440-
- [ ] 任务协调逻辑
436+
- [x] TaskOrchestrator 核心类(TaskExecutionService)
437+
- [x] 任务状态机实现(STARTING/RUNNING/PAUSED/STOPPED)
438+
- [x] Source Connector 部署
439+
- [x] Sink Connector 部署
440+
- [x] 任务协调逻辑(覆盖保存配置 → 部署 → 等待运行 → 状态回填)
441441

442442
### Issue #38: 任务启动逻辑
443443
**标签**: `task`, `phase-6`
444444
**描述**: 实现任务启动的完整流程
445445

446446
**验收标准**:
447-
- [ ] 参数验证
448-
- [ ] 前置检查 (数据库连接、权限等)
449-
- [ ] Schema 初始化
450-
- [ ] Connector 部署
451-
- [ ] 状态更新
447+
- [x] 参数验证
448+
- [x] 前置检查(基础校验)
449+
- [x] Schema 初始化(file-based history + schemas.enable 覆盖)
450+
- [x] Connector 部署
451+
- [x] 状态更新
452452

453453
### Issue #39: 任务停止和暂停逻辑
454454
**标签**: `task`, `phase-6`
455455
**描述**: 实现任务停止和暂停的逻辑
456456

457457
**验收标准**:
458-
- [ ] 优雅停止逻辑
459-
- [ ] Connector 删除/暂停
460-
- [ ] Offset 保存
461-
- [ ] 状态清理
462-
- [ ] 恢复能力
458+
- [x] 优雅停止逻辑(删除 Connector + 状态 STOPPED)
459+
- [x] Connector 删除/暂停/恢复
460+
- [x] 状态清理与恢复能力
461+
- [ ] Offset 保存(依赖 Connect Offset 存储,后续完善)
463462

464463
### Issue #40: 进度跟踪实现
465464
**标签**: `task`, `phase-6`

0 commit comments

Comments
 (0)