┌─────────────────────────────────────────────────────────────────────┐
│ 管理控制平面 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web Console │ │ REST API │ │ Monitoring │ │
│ │ Dashboard │ │ Service │ │ & Alerting │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Spring Boot │ │
│ │ Management │ │
│ │ Service │ │
│ └────────┬────────┘ │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ │ │ │ │
│ ┌───────▼──────┐ ┌───▼────────┐ ┌─▼──────────┐ │
│ │ PostgreSQL │ │ Redis │ │ Kafka │ │
│ │ (Metadata) │ │ (Lock/Cache)│ │ Connect │ │
│ └──────────────┘ └────────────┘ └─┬──────────┘ │
└─────────────────────────────────────────┼───────────────────────────┘
│
┌─────────────────────────────────────────┼───────────────────────────┐
│ 数据同步平面 │ │
│ │ │
│ ┌───────────────────────────────┼──────────────┐ │
│ │ Kafka Connect Cluster │ │ │
│ │ ┌──────────────┐ ┌──────────▼────────┐ │ │
│ │ │ Connector │ │ Connector │ │ │
│ │ │ Worker 1 │ │ Worker 2 │ ...│ │
│ │ └──────┬───────┘ └──────┬────────────┘ │ │
│ └─────────┼──────────────────┼─────────────────┘ │
│ │ │ │
│ ┌──────────────▼──────────────────▼─────────────────┐ │
│ │ Apache Kafka │ │
│ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ Topics (CDC Events Stream) │ │ │
│ │ │ - tenant-1.db1.table1 │ │ │
│ │ │ - tenant-1.db1.table2 │ │ │
│ │ │ - tenant-2.db2.table1 │ │ │
│ │ └──────────────────────────────────────────┘ │ │
│ └───────────┬─────────────────┬────────────────────┘ │
│ │ │ │
│ ┌────────▼──────┐ ┌──────▼──────────┐ │
│ │ Transform │ │ Transform │ │
│ │ Service 1 │ │ Service 2 │ ... │
│ └────────┬──────┘ └──────┬──────────┘ │
└────────────────┼─────────────────┼───────────────────────────────────┘
│ │
┌────────────────┼─────────────────┼───────────────────────────────────┐
│ 数据源/目标层 │ │
│ │ │ │
│ ┌─────────────▼──┐ ┌──────────▼───────┐ ┌──────────────┐ │
│ │ Source DBs │ │ Target DBs │ │ Sink │ │
│ │ │ │ │ │ Connectors │ │
│ │ - MySQL │ │ - MySQL │ └──────────────┘ │
│ │ - Oracle │ │ - PostgreSQL │ │
│ │ - SQL Server │ │ - Oracle │ │
│ │ - PostgreSQL │ │ - SQL Server │ │
│ └────────────────┘ └──────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
| 层级 | 组件 | 职责 |
|---|---|---|
| 接入层 | REST API, Web Console | 用户交互、任务管理 |
| 应用层 | Spring Boot Service | 业务逻辑、任务编排 |
| 数据层 | PostgreSQL, Redis | 元数据存储、缓存 |
| 流处理层 | Kafka, Kafka Connect | CDC事件流、连接器管理 |
| 转换层 | Kafka Streams | 数据转换、路由 |
| 存储层 | 各类数据库 | 源数据库和目标数据库 |
db-sync-platform/
├── db-sync-common/ # 公共模块
│ ├── enums/ # 枚举定义
│ ├── constants/ # 常量定义
│ ├── exceptions/ # 异常定义
│ └── utils/ # 工具类
│
├── db-sync-core/ # 核心业务模块
│ ├── domain/ # 领域模型
│ │ ├── entity/ # JPA实体
│ │ ├── dto/ # 数据传输对象
│ │ └── vo/ # 视图对象
│ ├── repository/ # 数据访问层
│ ├── service/ # 业务逻辑层
│ │ ├── TenantService
│ │ ├── SyncTaskService
│ │ ├── MappingService
│ │ └── MonitoringService
│ └── config/ # 配置类
│
├── db-sync-connector/ # 连接器管理模块
│ ├── client/ # Kafka Connect客户端
│ ├── builder/ # Connector配置构建器
│ │ ├── MySQLConnectorBuilder
│ │ ├── OracleConnectorBuilder
│ │ └── SqlServerConnectorBuilder
│ ├── manager/ # Connector生命周期管理
│ └── monitor/ # Connector监控
│
├── db-sync-transform/ # 数据转换模块
│ ├── engine/ # 转换引擎
│ │ ├── TransformationEngine
│ │ ├── ScriptExecutor
│ │ └── TypeConverter
│ ├── mapper/ # 类型映射器
│ └── streams/ # Kafka Streams拓扑
│
└── db-sync-api/ # API接口模块
├── controller/ # REST控制器
├── security/ # 安全配置
├── filter/ # 过滤器
└── DbSyncApplication.java # 主启动类
Controller层:
@RestController
@RequestMapping("/api/v1")
public class SyncTaskController {
// 1. 接收HTTP请求
// 2. 参数验证
// 3. 调用Service层
// 4. 返回统一响应格式
}Service层:
@Service
public class SyncTaskService {
// 1. 业务逻辑编排
// 2. 事务管理
// 3. 调用Connector管理器
// 4. 异常处理
}Repository层:
public interface SyncTaskRepository extends JpaRepository<SyncTask, UUID> {
// 1. 数据访问
// 2. 自定义查询
}分布式模式配置:
# Worker配置
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
group.id=db-sync-connect-cluster
# 配置存储
config.storage.topic=db-sync-connect-configs
config.storage.replication.factor=3
# Offset存储
offset.storage.topic=db-sync-connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
# 状态存储
status.storage.topic=db-sync-connect-status
status.storage.replication.factor=3
status.storage.partitions=5
# 性能配置
tasks.max=8
offset.flush.interval.ms=60000Source Connector(数据源):
- MySQLConnector:基于binlog
- OracleConnector:基于LogMiner或XStream
- SqlServerConnector:基于CDC
- PostgresConnector:基于逻辑复制
Sink Connector(目标端):
- JDBCSinkConnector:通用JDBC写入
- CustomSinkConnector:自定义写入逻辑
Kafka Topic (CDC Events)
↓
┌───────────────────┐
│ Kafka Streams │
│ Topology │
├───────────────────┤
│ 1. 解析CDC事件 │
│ 2. 查找映射规则 │
│ 3. 执行转换逻辑 │
│ 4. 类型转换 │
│ 5. 数据验证 │
└───────┬───────────┘
↓
Kafka Topic (Transformed)
↓
Sink Connector
↓
Target Database
KStream<String, DebeziumEvent> sourceStream =
streamsBuilder.stream("source-topic");
KStream<String, TransformedEvent> transformed = sourceStream
.filter((key, event) -> applyRowFilter(event)) // 行过滤
.mapValues(event -> {
TableMapping mapping = getMappingConfig(event);
return transformer.transform(event, mapping); // 字段转换
})
.mapValues(event -> typeConverter.convert(event)) // 类型转换
.mapValues(event -> scriptExecutor.execute(event)); // 脚本执行
transformed.to("target-topic");详见《02-数据库设计.md》
读取流程:
1. 应用启动 → 加载所有租户配置到缓存(Redis)
2. 创建Connector时 → 读取任务配置、映射规则
3. 数据转换时 → 从缓存读取映射配置
4. 监控采集时 → 读取任务列表
写入流程:
1. 用户创建任务 → 写入PostgreSQL → 失效缓存
2. 更新配置 → 写入PostgreSQL → 失效缓存 → 重启Connector
3. 保存Offset → 先写Redis → 异步持久化到PostgreSQL
4. 记录监控数据 → 批量写入PostgreSQL
┌─────────────┐
│ 用户触发 │ 创建同步任务
│ 全量同步 │
└──────┬──────┘
│
▼
┌──────────────────────┐
│ Spring Boot Service │
├──────────────────────┤
│ 1. 创建任务记录 │
│ 2. 构建Debezium配置 │
│ 3. 注册Source Connector│
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Debezium Connector │
├──────────────────────┤
│ snapshot.mode=initial│
│ 1. 读取表结构 │
│ 2. 锁定表(可选) │
│ 3. 读取全量数据 │
│ 4. 发送到Kafka │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Kafka Topics │
│ tenant-1.db.table │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Transform Service │
├──────────────────────┤
│ 1. 读取映射配置 │
│ 2. 转换字段 │
│ 3. 类型转换 │
│ 4. 执行脚本 │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Sink Connector │
├──────────────────────┤
│ 1. 批量插入 │
│ 2. 冲突处理 │
│ 3. 记录Offset │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Target Database │
└──────────────────────┘
┌─────────────┐
│ Source DB │
│ 数据变更 │
│ (INSERT/ │
│ UPDATE/ │
│ DELETE) │
└──────┬──────┘
│
▼
┌──────────────────────┐
│ Database Log │
│ - binlog (MySQL) │
│ - redo log (Oracle) │
│ - CDC log (SQLServer)│
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Debezium Connector │
├──────────────────────┤
│ 1. 监听日志变更 │
│ 2. 解析变更事件 │
│ 3. 封装为CDC格式 │
│ 4. 发送到Kafka │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Kafka Topics │
│ { │
│ "op": "c", │
│ "before": {...}, │
│ "after": {...}, │
│ "ts_ms": 1234567 │
│ } │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Transform Service │
├──────────────────────┤
│ 1. 识别操作类型 │
│ 2. 应用转换规则 │
│ 3. 处理删除事件 │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Sink Connector │
├──────────────────────┤
│ - INSERT → INSERT │
│ - UPDATE → UPDATE │
│ - DELETE → DELETE │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Target Database │
└──────────────────────┘
┌─────────────┐
│ Source DB │
│ 执行DDL │
│ ALTER TABLE │
└──────┬──────┘
│
▼
┌──────────────────────┐
│ Debezium Connector │
│ schema.history.internal│
│ .kafka.topic │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Schema Change Event │
│ { │
│ "ddl": "ALTER...", │
│ "databaseName": "", │
│ "tableChanges": [] │
│ } │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Schema Change Handler│
├──────────────────────┤
│ 1. 解析DDL语句 │
│ 2. 转换为目标DB语法 │
│ 3. 更新映射配置 │
│ 4. 在目标DB执行 │
└──────┬───────────────┘
│
▼
┌──────────────────────┐
│ Target Database │
│ Schema Updated │
└──────────────────────┘
| 组件 | 高可用方案 | 说明 |
|---|---|---|
| Spring Boot | Kubernetes多副本 | 3个Pod,负载均衡 |
| PostgreSQL | 主从复制 + PgPool | 1主2从,自动故障转移 |
| Redis | Redis Cluster | 3主3从,哨兵模式 |
| Kafka | 多副本集群 | 3个Broker,副本因子=3 |
| Kafka Connect | Worker集群 | 3个Worker,自动负载均衡 |
Spring Boot服务故障:
1. Kubernetes检测到Pod失败
2. 自动重启Pod
3. Pod启动后从PostgreSQL加载配置
4. 恢复正常服务
Kafka Connect Worker故障:
1. Worker失败
2. Connector自动重分配到其他Worker
3. 从Offset继续消费
4. 数据不丢失
数据库连接失败:
1. Connector检测到连接失败
2. 自动重试(指数退避)
3. 重试次数达到上限后标记任务失败
4. 发送告警通知
5. 手动修复后重启任务
Exactly-Once语义:
1. Kafka事务支持
2. Connector Offset精确管理
3. 目标数据库幂等写入
4. 重复消费检测和去重
事务一致性:
1. 启用Debezium事务元数据
2. 捕获事务边界(BEGIN/COMMIT)
3. 在目标数据库中作为一个事务执行
4. 保证原子性
Debezium配置:
# 增加快照线程数
snapshot.max.threads=4
# 增加批量大小
max.batch.size=8192
max.queue.size=16384
# 减少刷新间隔
offset.flush.interval.ms=10000Kafka配置:
# 增加分区数
num.partitions=32
# 批量发送
batch.size=32768
linger.ms=10
# 压缩
compression.type=snappySink Connector配置:
# 批量写入
batch.size=1000
max.retries=10
# 连接池
connection.pool.size=20网络优化:
- 使用本地数据中心
- 减少跨区域调用
- 启用TCP优化
处理优化:
- 减少转换逻辑复杂度
- 缓存热数据(映射配置)
- 异步处理非关键路径
增量快照:
// 使用Debezium 1.6+的增量快照
signalTopic.send(new Signal(
"execute-snapshot",
Map.of(
"data-collections", List.of("db.large_table"),
"type", "incremental"
)
));分区表支持:
// 按分区并行处理
for (String partition : tablePartitions) {
executorService.submit(() ->
processPartition(partition)
);
}┌─────────────────────────────────────┐
│ Internet │
└───────────┬─────────────────────────┘
│ HTTPS (TLS 1.3)
▼
┌───────────────────────────────────┐
│ Load Balancer / Ingress │
└───────────┬───────────────────────┘
│
▼
┌───────────────────────────────────┐
│ API Gateway (认证/限流) │
└───────────┬───────────────────────┘
│
▼
┌───────────────────────────────────┐
│ Spring Boot Service │
│ (Internal Network Only) │
└───────────────────────────────────┘
JWT Token认证:
@Configuration
public class SecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) {
return http
.authorizeHttpRequests(auth -> auth
.requestMatchers("/api/v1/public/**").permitAll()
.requestMatchers("/api/v1/admin/**").hasRole("ADMIN")
.anyRequest().authenticated()
)
.oauth2ResourceServer(OAuth2ResourceServerConfigurer::jwt)
.build();
}
}敏感数据加密:
@Service
public class EncryptionService {
// 配置信息加密存储
public String encryptConfig(String plaintext) {
return AES.encrypt(plaintext, secretKey);
}
// 数据库密码加密
public String encryptPassword(String password) {
return BCrypt.hashpw(password, BCrypt.gensalt());
}
}数据脱敏:
@Component
public class DataMaskingTransformer {
public Object maskField(Object value, MaskingRule rule) {
if (rule.getType() == MaskType.PHONE) {
return maskPhone(value.toString());
} else if (rule.getType() == MaskType.EMAIL) {
return maskEmail(value.toString());
}
return value;
}
}业务指标:
- 活跃任务数
- 总同步数据量
- 同步成功率
- 平均延迟
性能指标:
- 消费TPS
- 生产TPS
- 端到端延迟
- Consumer Lag
资源指标:
- CPU使用率
- 内存使用率
- 网络IO
- 磁盘IO
| 告警项 | 阈值 | 级别 | 处理 |
|---|---|---|---|
| Consumer Lag > 100000 | 持续5分钟 | P1 | 立即处理 |
| 错误率 > 1% | 持续3分钟 | P2 | 及时处理 |
| 延迟 > 10秒 | P99持续1分钟 | P2 | 及时处理 |
| Connector失败 | 立即 | P0 | 紧急处理 |
| 内存使用 > 85% | 持续10分钟 | P3 | 关注 |
扩展Spring Boot服务:
kubectl scale deployment db-sync-api --replicas=5扩展Kafka Connect Worker:
# 增加Worker节点
docker-compose up -d --scale connect-worker=5
# Connector自动重新负载均衡扩展Kafka:
# 增加Broker
# 增加Topic分区数
kafka-topics.sh --alter --topic sync-events \
--partitions 64 --bootstrap-server kafka:9092// 1. 实现ConnectorBuilder接口
public class MongoDBConnectorBuilder implements ConnectorBuilder {
@Override
public Map<String, String> buildConfig(SyncTask task) {
// 构建MongoDB connector配置
}
}
// 2. 实现TypeMapper
public class MongoDBTypeMapper implements TypeMapper {
@Override
public Object convert(Object value, String sourceType, String targetType) {
// MongoDB类型转换
}
}
// 3. 注册到Spring容器
@Configuration
public class ConnectorConfig {
@Bean
public ConnectorBuilder mongodbConnectorBuilder() {
return new MongoDBConnectorBuilder();
}
}文档版本:v1.0 最后更新:2025-01-30