Skip to content

Commit e120049

Browse files
author
lmj
committed
Add task execution engine and progress tracking
1 parent acaf66f commit e120049

27 files changed

Lines changed: 1346 additions & 37 deletions

connectors/src/main/java/com/dbsyncer/connectors/client/KafkaConnectClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,15 @@ public boolean waitForConnectorRunning(String connectorName, long timeoutMs, lon
307307
}
308308
log.debug("Connector {} not yet running, waiting...", connectorName);
309309
Thread.sleep(pollIntervalMs);
310+
} catch (ConnectorNotFoundException e) {
311+
// Connector may not be immediately available after creation; keep waiting
312+
log.debug("Connector {} not found yet, waiting...", connectorName);
313+
try {
314+
Thread.sleep(pollIntervalMs);
315+
} catch (InterruptedException ie) {
316+
Thread.currentThread().interrupt();
317+
throw new KafkaConnectException("Interrupted while waiting for connector", ie);
318+
}
310319
} catch (InterruptedException e) {
311320
Thread.currentThread().interrupt();
312321
throw new KafkaConnectException("Interrupted while waiting for connector", e);

connectors/src/main/java/com/dbsyncer/connectors/config/MySqlSourceConnectorConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public class MySqlSourceConnectorConfig implements SourceConnectorConfig {
101101
private String sslTruststore;
102102
private String sslTruststorePassword;
103103

104+
// Optional transformation injection
105+
@Builder.Default
106+
private boolean enableTypeMappingTransform = false;
107+
private String typeMappingSourceDb; // e.g. "mysql"
108+
104109
@Override
105110
public String getConnectorClass() {
106111
return CONNECTOR_CLASS;
@@ -240,6 +245,14 @@ public Map<String, String> toConfigMap() {
240245
config.put("database.ssl.truststore.password", sslTruststorePassword);
241246
}
242247

248+
// Optional SMT transform to normalize schema/types
249+
if (enableTypeMappingTransform && typeMappingSourceDb != null && !typeMappingSourceDb.isBlank()) {
250+
config.put("transforms", "applyTypeMapping");
251+
config.put("transforms.applyTypeMapping.type",
252+
"com.dbsyncer.transformations.smt.ApplyTypeMapping");
253+
config.put("transforms.applyTypeMapping.source.db", typeMappingSourceDb);
254+
}
255+
243256
return config;
244257
}
245258

connectors/src/main/java/com/dbsyncer/connectors/config/OracleSourceConnectorConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ public String getValue() {
123123
@Builder.Default
124124
private int maxQueueSize = 8192;
125125

126+
// Optional transformation injection
127+
@Builder.Default
128+
private boolean enableTypeMappingTransform = false;
129+
private String typeMappingSourceDb; // e.g. "oracle"
130+
126131
@Override
127132
public String getConnectorClass() {
128133
return CONNECTOR_CLASS;
@@ -247,6 +252,14 @@ public Map<String, String> toConfigMap() {
247252
config.put("max.batch.size", String.valueOf(maxBatchSize));
248253
config.put("max.queue.size", String.valueOf(maxQueueSize));
249254

255+
// Optional SMT transform to normalize schema/types
256+
if (enableTypeMappingTransform && typeMappingSourceDb != null && !typeMappingSourceDb.isBlank()) {
257+
config.put("transforms", "applyTypeMapping");
258+
config.put("transforms.applyTypeMapping.type",
259+
"com.dbsyncer.transformations.smt.ApplyTypeMapping");
260+
config.put("transforms.applyTypeMapping.source.db", typeMappingSourceDb);
261+
}
262+
250263
return config;
251264
}
252265

connectors/src/main/java/com/dbsyncer/connectors/config/PostgresSourceConnectorConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ public String getValue() {
121121
private String sslKey;
122122
private String sslPassword;
123123

124+
// Optional transformation injection
125+
@Builder.Default
126+
private boolean enableTypeMappingTransform = false;
127+
private String typeMappingSourceDb; // e.g. "postgresql"
128+
124129
@Override
125130
public String getConnectorClass() {
126131
return CONNECTOR_CLASS;
@@ -259,6 +264,14 @@ public Map<String, String> toConfigMap() {
259264
config.put("database.sslpassword", sslPassword);
260265
}
261266

267+
// Optional SMT transform to normalize schema/types
268+
if (enableTypeMappingTransform && typeMappingSourceDb != null && !typeMappingSourceDb.isBlank()) {
269+
config.put("transforms", "applyTypeMapping");
270+
config.put("transforms.applyTypeMapping.type",
271+
"com.dbsyncer.transformations.smt.ApplyTypeMapping");
272+
config.put("transforms.applyTypeMapping.source.db", typeMappingSourceDb);
273+
}
274+
262275
return config;
263276
}
264277

docs/ARCHITECTURE.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ db-syncer-debezium/
154154
│ └── config/ # Connector 配置生成器
155155
156156
├── transformations/ # 数据转换层
157-
│ ├── smt/ # Single Message Transforms
158-
│ ├── mapper/ # 类型映射器
157+
│ ├── smt/ # Single Message Transforms(如 ApplyTypeMapping)
158+
│ ├── mapper/ # 类型映射器(MySQL/Oracle → PostgreSQL)
159159
│ └── schema/ # Schema 转换
160160
161161
└── monitoring/ # 监控模块
@@ -238,6 +238,32 @@ CREATE TABLE schema_history (
238238
);
239239
```
240240

241+
### 2. Transformations(数据转换层)
242+
243+
**职责**:
244+
- 统一不同源数据库的类型,便于落地到目标(如 PostgreSQL)
245+
- 标准化 Debezium 事件中的逻辑类型,方便下游 Sink 处理
246+
247+
**关键实现**:
248+
- `ApplyTypeMapping` SMT: 可选注入到 Source Connector,用于重写记录值的 Schema/Value
249+
- 时间逻辑类型映射:
250+
- `io.debezium.time.Date` → Kafka Connect `Date`
251+
- `io.debezium.time.Time` → Kafka Connect `Time`
252+
- `io.debezium.time.Timestamp`/`MicroTimestamp`/`NanoTimestamp` → Kafka Connect `Timestamp`
253+
- JSON 逻辑类型映射:
254+
- `io.debezium.data.Json` → 普通 `string`
255+
- Decimal 逻辑类型:
256+
- `io.debezium.data.VariableScaleDecimal``string`(默认)或 Kafka Connect `Decimal`(可选,存在 Schema 变更频率高的风险)
257+
- 配置项:
258+
- `transforms.applyTypeMapping.source.db`(mysql/oracle/postgresql)
259+
- `transforms.applyTypeMapping.enable.time.mapping`(默认 true)
260+
- `transforms.applyTypeMapping.enable.json.mapping`(默认 true)
261+
- `transforms.applyTypeMapping.enable.decimal.mapping`(默认 true)
262+
- `transforms.applyTypeMapping.decimal.target`(string/decimal,默认 string)
263+
264+
**类型映射器**:
265+
- `MySqlToPostgresTypeMapper``OracleToPostgresTypeMapper` 提供列级类型映射(供 Schema 生成/校验使用)。
266+
241267
### 2. CLI Tool (命令行工具)
242268

243269
**职责**:

metadata-service/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,29 @@
1616
<description>Metadata management service with PostgreSQL storage</description>
1717

1818
<dependencies>
19+
<!-- Spring Boot Actuator & Metrics -->
20+
<dependency>
21+
<groupId>org.springframework.boot</groupId>
22+
<artifactId>spring-boot-starter-actuator</artifactId>
23+
</dependency>
24+
25+
<!-- Micrometer Prometheus Registry -->
26+
<dependency>
27+
<groupId>io.micrometer</groupId>
28+
<artifactId>micrometer-registry-prometheus</artifactId>
29+
</dependency>
30+
1931
<!-- Internal -->
2032
<dependency>
2133
<groupId>com.dbsyncer</groupId>
2234
<artifactId>common</artifactId>
2335
<version>${project.version}</version>
2436
</dependency>
37+
<dependency>
38+
<groupId>com.dbsyncer</groupId>
39+
<artifactId>connectors</artifactId>
40+
<version>${project.version}</version>
41+
</dependency>
2542

2643
<!-- Spring Boot -->
2744
<dependency>

metadata-service/src/main/java/com/dbsyncer/metadata/controller/ProgressController.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ public ResponseEntity<Double> getAverageLag(@PathVariable UUID taskId) {
7676
return ResponseEntity.ok(averageLag);
7777
}
7878

79+
/**
80+
* Get ETA (seconds) for task completion if it can be estimated, otherwise returns null.
81+
*/
82+
@GetMapping("/eta")
83+
public ResponseEntity<Long> getEtaSeconds(@PathVariable UUID taskId) {
84+
log.debug("REST request to get ETA for task: {}", taskId);
85+
Long etaSeconds = progressTrackingService.estimateEtaSeconds(taskId);
86+
return ResponseEntity.ok(etaSeconds);
87+
}
88+
7989
/**
8090
* Get tables with errors.
8191
*/

metadata-service/src/main/java/com/dbsyncer/metadata/controller/TaskController.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.dbsyncer.metadata.dto.TaskResponse;
55
import com.dbsyncer.metadata.dto.TaskUpdateRequest;
66
import com.dbsyncer.metadata.entity.TaskStatus;
7+
import com.dbsyncer.metadata.service.TaskExecutionService;
78
import com.dbsyncer.metadata.service.TaskService;
89
import jakarta.validation.Valid;
910
import lombok.RequiredArgsConstructor;
@@ -29,6 +30,7 @@
2930
public class TaskController {
3031

3132
private final TaskService taskService;
33+
private final TaskExecutionService executionService;
3234

3335
/**
3436
* Create a new migration task.
@@ -119,7 +121,7 @@ public ResponseEntity<Void> deleteTask(@PathVariable UUID taskId) {
119121
@PostMapping("/{taskId}/start")
120122
public ResponseEntity<TaskResponse> startTask(@PathVariable UUID taskId) {
121123
log.info("REST request to start task: {}", taskId);
122-
TaskResponse response = taskService.updateTaskStatus(taskId, TaskStatus.STARTING);
124+
TaskResponse response = executionService.startTask(taskId);
123125
return ResponseEntity.ok(response);
124126
}
125127

@@ -129,7 +131,7 @@ public ResponseEntity<TaskResponse> startTask(@PathVariable UUID taskId) {
129131
@PostMapping("/{taskId}/stop")
130132
public ResponseEntity<TaskResponse> stopTask(@PathVariable UUID taskId) {
131133
log.info("REST request to stop task: {}", taskId);
132-
TaskResponse response = taskService.updateTaskStatus(taskId, TaskStatus.STOPPING);
134+
TaskResponse response = executionService.stopTask(taskId);
133135
return ResponseEntity.ok(response);
134136
}
135137

@@ -139,7 +141,7 @@ public ResponseEntity<TaskResponse> stopTask(@PathVariable UUID taskId) {
139141
@PostMapping("/{taskId}/pause")
140142
public ResponseEntity<TaskResponse> pauseTask(@PathVariable UUID taskId) {
141143
log.info("REST request to pause task: {}", taskId);
142-
TaskResponse response = taskService.updateTaskStatus(taskId, TaskStatus.PAUSED);
144+
TaskResponse response = executionService.pauseTask(taskId);
143145
return ResponseEntity.ok(response);
144146
}
145147

@@ -149,7 +151,7 @@ public ResponseEntity<TaskResponse> pauseTask(@PathVariable UUID taskId) {
149151
@PostMapping("/{taskId}/resume")
150152
public ResponseEntity<TaskResponse> resumeTask(@PathVariable UUID taskId) {
151153
log.info("REST request to resume task: {}", taskId);
152-
TaskResponse response = taskService.updateTaskStatus(taskId, TaskStatus.RUNNING);
154+
TaskResponse response = executionService.resumeTask(taskId);
153155
return ResponseEntity.ok(response);
154156
}
155157

metadata-service/src/main/java/com/dbsyncer/metadata/dto/TaskCreateRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ public class TaskCreateRequest {
7373
private Integer batchSize = 10000;
7474
private Integer maxQueueSize = 8192;
7575
private Integer pollIntervalMs = 1000;
76+
private Boolean incrementalSnapshot = false;
77+
private Integer snapshotChunkSize = 10000;
78+
private Integer parallelTables = 1;
7679

7780
// Metadata
7881
private String createdBy;

metadata-service/src/main/java/com/dbsyncer/metadata/dto/TaskResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class TaskResponse {
4747
private Integer batchSize;
4848
private Integer maxQueueSize;
4949
private Integer pollIntervalMs;
50+
private Boolean incrementalSnapshot;
51+
private Integer snapshotChunkSize;
52+
private Integer parallelTables;
5053

5154
// Status
5255
private TaskStatus status;
@@ -97,6 +100,9 @@ public static TaskResponse fromEntity(MigrationTask task) {
97100
response.setBatchSize(task.getBatchSize());
98101
response.setMaxQueueSize(task.getMaxQueueSize());
99102
response.setPollIntervalMs(task.getPollIntervalMs());
103+
response.setIncrementalSnapshot(task.getIncrementalSnapshot());
104+
response.setSnapshotChunkSize(task.getSnapshotChunkSize());
105+
response.setParallelTables(task.getParallelTables());
100106

101107
response.setStatus(task.getStatus());
102108
response.setErrorMessage(task.getErrorMessage());

0 commit comments

Comments
 (0)