11-- Flink SQL Job to sync Distributed GaussDB to GaussDB (Sink via CN)
2- -- 分布式 GaussDB -> GaussDB 同步任务
2+ -- 分布式 GaussDB -> GaussDB 同步任务 (性能优化版)
33
4- -- 设置状态保留时间为 1 小时
4+ -- =====================================================
5+ -- Performance Tuning Settings
6+ -- =====================================================
57SET ' table.exec.state.ttl' = ' 1 h' ;
6- -- 禁用 Upsert Materialize,因为数据是按 ID 分片的,不存在多节点更新同一行的乱序问题
78SET ' table.exec.sink.upsert-materialize' = ' NONE' ;
9+ SET ' parallelism.default' = ' 4' ;
10+ SET ' pipeline.object-reuse' = ' true' ;
11+ SET ' execution.checkpointing.interval' = ' 5min' ;
812
913-- =====================================================
10- -- 1. Source Table DN1 (GaussDB)
14+ -- 1. Source Table DN1 (GaussDB) - with performance tuning
1115-- =====================================================
1216CREATE TABLE products_source_dn1 (
1317 product_id INT NOT NULL ,
@@ -27,11 +31,13 @@ CREATE TABLE products_source_dn1 (
2731 ' table-name' = ' products' ,
2832 ' slot.name' = ' flink_cdc_g2g_dn1' ,
2933 ' decoding.plugin.name' = ' mppdb_decoding' ,
30- ' scan.incremental.snapshot.enabled' = ' true'
34+ ' scan.incremental.snapshot.enabled' = ' true' ,
35+ ' scan.snapshot.fetch.size' = ' 10000' ,
36+ ' scan.incremental.snapshot.chunk.size' = ' 50000'
3137);
3238
3339-- =====================================================
34- -- 2. Source Table DN2 (GaussDB)
40+ -- 2. Source Table DN2 (GaussDB) - with performance tuning
3541-- =====================================================
3642CREATE TABLE products_source_dn2 (
3743 product_id INT NOT NULL ,
@@ -51,11 +57,13 @@ CREATE TABLE products_source_dn2 (
5157 ' table-name' = ' products' ,
5258 ' slot.name' = ' flink_cdc_g2g_dn2' ,
5359 ' decoding.plugin.name' = ' mppdb_decoding' ,
54- ' scan.incremental.snapshot.enabled' = ' true'
60+ ' scan.incremental.snapshot.enabled' = ' true' ,
61+ ' scan.snapshot.fetch.size' = ' 10000' ,
62+ ' scan.incremental.snapshot.chunk.size' = ' 50000'
5563);
5664
5765-- =====================================================
58- -- 3. Source Table DN3 (GaussDB)
66+ -- 3. Source Table DN3 (GaussDB) - with performance tuning
5967-- =====================================================
6068CREATE TABLE products_source_dn3 (
6169 product_id INT NOT NULL ,
@@ -75,13 +83,15 @@ CREATE TABLE products_source_dn3 (
7583 ' table-name' = ' products' ,
7684 ' slot.name' = ' flink_cdc_g2g_dn3' ,
7785 ' decoding.plugin.name' = ' mppdb_decoding' ,
78- ' scan.incremental.snapshot.enabled' = ' true'
86+ ' scan.incremental.snapshot.enabled' = ' true' ,
87+ ' scan.snapshot.fetch.size' = ' 10000' ,
88+ ' scan.incremental.snapshot.chunk.size' = ' 50000'
7989);
8090
8191-- =====================================================
8292-- 4. Sink Table (GaussDB via CN)
8393-- 使用自定义 GaussDB JDBC Dialect (支持 MERGE INTO)
84- -- 使用 gaussdbjdbc.jar 驱动
94+ -- 性能优化: 批量写入配置
8595-- =====================================================
8696CREATE TABLE products_sink (
8797 product_id INT NOT NULL ,
@@ -96,7 +106,12 @@ CREATE TABLE products_sink (
96106 ' table-name' = ' products_sink' ,
97107 ' username' = ' tom' ,
98108 ' password' = ' Gauss_235' ,
99- ' driver' = ' com.huawei.gaussdb.jdbc.Driver'
109+ ' driver' = ' com.huawei.gaussdb.jdbc.Driver' ,
110+ -- Performance tuning: batch write settings
111+ ' sink.buffer-flush.max-rows' = ' 5000' ,
112+ ' sink.buffer-flush.interval' = ' 2s' ,
113+ ' sink.max-retries' = ' 3' ,
114+ ' sink.parallelism' = ' 3'
100115);
101116
102117-- =====================================================
@@ -108,3 +123,4 @@ UNION ALL
108123SELECT product_id, product_name, category, price, stock FROM products_source_dn2
109124UNION ALL
110125SELECT product_id, product_name, category, price, stock FROM products_source_dn3;
126+
0 commit comments