Skip to content

Commit ae1f47a

Browse files
committed
feat(gaussdb): optimize snapshot phase and fix replication slot stall
- Implemented unique backfill slot naming using split ID hashes to prevent parallel split conflicts. - Disabled dropSlotOnClose for backfill connections to handle transient reconnection robustly. - Added explicit backfill slot cleanup in GaussDBScanFetchTask using try-finally. - Integrated is_snapshot metadata field for differentiated record routing. - Improved performance test script with 100k volume verification and data integrity checks. - Verified 100k snapshot at ~6.1k records/second with 100% data integrity.
1 parent 09dcef7 commit ae1f47a

14 files changed

Lines changed: 1136 additions & 270 deletions

File tree

deploy_gaussdb_to_gaussdb.sh

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ done
125125
# 3. Copy SQL script
126126
echo "📜 Copying SQL script to JobManager..."
127127
docker exec flink-jobmanager mkdir -p /opt/flink/sql
128-
docker cp "$SQL_FILE" flink-jobmanager:/opt/flink/sql/gaussdb_to_gaussdb.sql
128+
docker cp "$SQL_FILE" flink-jobmanager:/opt/flink/sql/gaussdb_sync.sql
129129

130130
# 4. Restart Clusters
131131
echo "🔄 Restarting Flink containers to apply changes..."
@@ -173,9 +173,11 @@ BEGIN
173173
stock INTEGER DEFAULT 0,
174174
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
175175
) DISTRIBUTE BY HASH(product_id);
176-
RAISE NOTICE 'Source table created';
176+
ALTER TABLE products REPLICA IDENTITY FULL;
177+
RAISE NOTICE 'Source table created and REPLICA IDENTITY set to FULL';
177178
ELSE
178-
RAISE NOTICE 'Source table already exists';
179+
ALTER TABLE products REPLICA IDENTITY FULL;
180+
RAISE NOTICE 'Source table already exists, ensuring REPLICA IDENTITY is FULL';
179181
END IF;
180182
END \$\$;
181183
EOF
@@ -195,22 +197,16 @@ CREATE TABLE products_sink (
195197
EOF
196198
echo -e "${GREEN}✅ Sink table created${NC}"
197199

198-
# 5.4 插入种子数据 (确保快照阶段有数据,CDC 能正确进入 stream 阶段)
199-
echo "🌱 Inserting seed data for CDC initialization..."
200-
PGPASSWORD=Gauss_235 psql -h 10.250.0.30 -p 8000 -U tom -d db1 <<EOF
201-
-- 清除旧种子数据
202-
DELETE FROM products WHERE product_id BETWEEN 1 AND 10;
203-
-- 插入种子数据 (使用 ID 1-10,测试数据使用 2000+)
204-
INSERT INTO products (product_id, product_name, category, price, stock) VALUES
205-
(1, 'Seed Product 1', 'SEED', 10.00, 100),
206-
(2, 'Seed Product 2', 'SEED', 20.00, 200),
207-
(3, 'Seed Product 3', 'SEED', 30.00, 300);
208-
EOF
209-
echo -e "${GREEN}✅ Seed data inserted (3 records)${NC}"
200+
# 5.4 跳过种子数据插入 (性能测试时会预先插入完整数据)
201+
# 注意:之前这里有 DELETE FROM products WHERE product_id BETWEEN 1 AND 10
202+
# 这会导致性能测试中的数据丢失,因此已移除
203+
echo "🌱 Skipping seed data insertion (data should be pre-populated by test script)..."
204+
echo -e "${GREEN}✅ Ready for CDC sync${NC}"
205+
210206

211207
# 6. Submit SQL Job
212-
echo "🚀 Submitting SQL job to Flink..."
213-
docker exec flink-jobmanager /opt/flink/bin/sql-client.sh -f /opt/flink/sql/gaussdb_to_gaussdb.sql
208+
echo "🚀 Submitting SQL job to Flink (Optimized with Dual-Sink Routing)..."
209+
docker exec flink-jobmanager /opt/flink/bin/sql-client.sh -f /opt/flink/sql/gaussdb_sync.sql
214210

215211
echo ""
216212
echo -e "${GREEN}✅ Success! GaussDB -> GaussDB deployment complete.${NC}"

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/DebeziumChangelogMode.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,24 @@
1717

1818
package org.apache.flink.cdc.debezium.table;
1919

20-
/** Changelog modes used to encode changes from Debezium to Flink internal structure. */
20+
/**
21+
* Changelog modes used to encode changes from Debezium to Flink internal
22+
* structure.
23+
*/
2124
public enum DebeziumChangelogMode {
22-
/** Encodes changes as retract stream using all RowKinds. This is the default mode. */
25+
/**
26+
* Encodes changes as retract stream using all RowKinds. This is the default
27+
* mode.
28+
*/
2329
ALL("all"),
2430
/**
25-
* Encodes changes as upsert stream that describes idempotent updates on a key. Primary keys
31+
* Encodes changes as upsert stream that describes idempotent updates on a key.
32+
* Primary keys
2633
* must be set in tables to use this changelog mode.
2734
*/
28-
UPSERT("upsert");
35+
UPSERT("upsert"),
36+
/** Encodes changes as insert-only stream. */
37+
INSERT_ONLY("insert-only");
2938

3039
private final String value;
3140

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
-- Optimized Flink SQL Job for GaussDB to GaussDB Synchronization
2+
-- Uses dual-sink routing to optimize snapshot phase with INSERT INTO
3+
4+
-- =====================================================
5+
-- Performance Tuning Settings
6+
-- =====================================================
7+
SET 'parallelism.default' = '4';
8+
SET 'table.exec.state.ttl' = '36h';
9+
SET 'table.optimizer.source.reuse-enabled' = 'true';
10+
SET 'table.exec.sink.upsert-materialize' = 'AUTO';
11+
SET 'checkpoint.interval' = '10s';
12+
SET 'pipeline.object-reuse' = 'true';
13+
SET 'execution.checkpointing.interval' = '10s';
14+
15+
-- =====================================================
16+
-- 1. Source Table DN1 (GaussDB)
17+
-- =====================================================
18+
CREATE TABLE source_dn1 (
19+
product_id INT NOT NULL,
20+
product_name STRING,
21+
category STRING,
22+
price DECIMAL(10, 2),
23+
stock INT,
24+
is_snapshot BOOLEAN METADATA FROM 'is_snapshot' VIRTUAL,
25+
PRIMARY KEY (product_id) NOT ENFORCED
26+
) WITH (
27+
'connector' = 'gaussdb-cdc',
28+
'hostname' = '10.250.0.30',
29+
'port' = '40000',
30+
'username' = 'tom',
31+
'password' = 'Gauss_235',
32+
'database-name' = 'db1',
33+
'schema-name' = 'public',
34+
'table-name' = 'products',
35+
'slot.name' = 'flink_cdc_g2g_dn1',
36+
'decoding.plugin.name' = 'mppdb_decoding',
37+
'scan.incremental.snapshot.enabled' = 'true',
38+
'scan.snapshot.fetch.size' = '10000',
39+
'scan.incremental.snapshot.chunk.size' = '50000'
40+
);
41+
42+
-- =====================================================
43+
-- 2. Source Table DN2 (GaussDB)
44+
-- =====================================================
45+
CREATE TABLE source_dn2 (
46+
product_id INT NOT NULL,
47+
product_name STRING,
48+
category STRING,
49+
price DECIMAL(10, 2),
50+
stock INT,
51+
is_snapshot BOOLEAN METADATA FROM 'is_snapshot' VIRTUAL,
52+
PRIMARY KEY (product_id) NOT ENFORCED
53+
) WITH (
54+
'connector' = 'gaussdb-cdc',
55+
'hostname' = '10.250.0.181',
56+
'port' = '40020',
57+
'username' = 'tom',
58+
'password' = 'Gauss_235',
59+
'database-name' = 'db1',
60+
'schema-name' = 'public',
61+
'table-name' = 'products',
62+
'slot.name' = 'flink_cdc_g2g_dn2',
63+
'decoding.plugin.name' = 'mppdb_decoding',
64+
'scan.incremental.snapshot.enabled' = 'true',
65+
'scan.snapshot.fetch.size' = '10000',
66+
'scan.incremental.snapshot.chunk.size' = '50000'
67+
);
68+
69+
-- =====================================================
70+
-- 3. Source Table DN3 (GaussDB)
71+
-- =====================================================
72+
CREATE TABLE source_dn3 (
73+
product_id INT NOT NULL,
74+
product_name STRING,
75+
category STRING,
76+
price DECIMAL(10, 2),
77+
stock INT,
78+
is_snapshot BOOLEAN METADATA FROM 'is_snapshot' VIRTUAL,
79+
PRIMARY KEY (product_id) NOT ENFORCED
80+
) WITH (
81+
'connector' = 'gaussdb-cdc',
82+
'hostname' = '10.250.0.157',
83+
'port' = '40040',
84+
'username' = 'tom',
85+
'password' = 'Gauss_235',
86+
'database-name' = 'db1',
87+
'schema-name' = 'public',
88+
'table-name' = 'products',
89+
'slot.name' = 'flink_cdc_g2g_dn3',
90+
'decoding.plugin.name' = 'mppdb_decoding',
91+
'scan.incremental.snapshot.enabled' = 'true',
92+
'scan.snapshot.fetch.size' = '10000',
93+
'scan.incremental.snapshot.chunk.size' = '50000'
94+
);
95+
96+
-- =====================================================
97+
-- 4. Fast Sink (Snapshot phase)
98+
-- No primary key defined in Flink -> Uses INSERT INTO
99+
-- =====================================================
100+
CREATE TABLE products_sink_fast (
101+
product_id INT PRIMARY KEY NOT ENFORCED,
102+
product_name STRING,
103+
category STRING,
104+
price DECIMAL(10, 2),
105+
stock INT
106+
) WITH (
107+
'connector' = 'jdbc',
108+
'url' = 'jdbc:gaussdb://10.250.0.30:8000/db1?currentSchema=public',
109+
'table-name' = 'products_sink_FAST_INSERT_ONLY',
110+
'username' = 'tom',
111+
'password' = 'Gauss_235',
112+
'driver' = 'com.huawei.gaussdb.jdbc.Driver',
113+
'sink.buffer-flush.max-rows' = '5000',
114+
'sink.buffer-flush.interval' = '1s'
115+
);
116+
117+
-- =====================================================
118+
-- 5. Upsert Sink (Incremental phase)
119+
-- Primary key defined -> Uses MERGE INTO (GaussDB Dialect)
120+
-- =====================================================
121+
CREATE TABLE products_sink_upsert (
122+
product_id INT PRIMARY KEY NOT ENFORCED,
123+
product_name STRING,
124+
category STRING,
125+
price DECIMAL(10, 2),
126+
stock INT
127+
) WITH (
128+
'connector' = 'jdbc',
129+
'url' = 'jdbc:gaussdb://10.250.0.30:8000/db1?currentSchema=public',
130+
'table-name' = 'products_sink',
131+
'username' = 'tom',
132+
'password' = 'Gauss_235',
133+
'driver' = 'com.huawei.gaussdb.jdbc.Driver',
134+
'sink.buffer-flush.max-rows' = '1000',
135+
'sink.buffer-flush.interval' = '1s'
136+
);
137+
138+
-- =====================================================
139+
-- 6. Unified Source View
140+
-- =====================================================
141+
CREATE TEMPORARY VIEW unified_source AS
142+
SELECT product_id, product_name, category, price, stock, is_snapshot FROM source_dn1
143+
UNION ALL
144+
SELECT product_id, product_name, category, price, stock, is_snapshot FROM source_dn2
145+
UNION ALL
146+
SELECT product_id, product_name, category, price, stock, is_snapshot FROM source_dn3;
147+
148+
-- =====================================================
149+
-- 7. Routing Statement Set
150+
-- =====================================================
151+
BEGIN STATEMENT SET;
152+
153+
-- Snapshot records route to fast sink (using blind INSERT)
154+
INSERT INTO products_sink_fast
155+
SELECT product_id, product_name, category, price, stock
156+
FROM unified_source
157+
WHERE is_snapshot = true;
158+
159+
-- Incremental records route to upsert sink (using MERGE INTO)
160+
INSERT INTO products_sink_upsert
161+
SELECT product_id, product_name, category, price, stock
162+
FROM unified_source
163+
WHERE is_snapshot = false;
164+
165+
END;
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
-- Simplified Flink SQL Job for GaussDB to GaussDB Synchronization
2+
-- Recommended standard approach after JDBC Dialect UPSERT refactor
3+
4+
-- =====================================================
5+
-- Performance Tuning Settings
6+
-- =====================================================
7+
SET 'parallelism.default' = '1';
8+
SET 'checkpoint.interval' = '10s';
9+
SET 'execution.checkpointing.interval' = '10s';
10+
SET 'table.optimizer.source.reuse-enabled' = 'true';
11+
12+
-- =====================================================
13+
-- 1. Source Tables (Distributed GaussDB)
14+
-- =====================================================
15+
CREATE TABLE source_dn1 (
16+
product_id INT NOT NULL,
17+
product_name STRING,
18+
category STRING,
19+
price DECIMAL(10, 2),
20+
stock INT,
21+
PRIMARY KEY (product_id) NOT ENFORCED
22+
) WITH (
23+
'connector' = 'gaussdb-cdc',
24+
'hostname' = '10.250.0.30',
25+
'port' = '40000',
26+
'username' = 'tom',
27+
'password' = 'Gauss_235',
28+
'database-name' = 'db1',
29+
'schema-name' = 'public',
30+
'table-name' = 'products',
31+
'slot.name' = 'flink_cdc_simplified_dn1',
32+
'decoding.plugin.name' = 'mppdb_decoding'
33+
);
34+
35+
CREATE TABLE source_dn2 (
36+
product_id INT NOT NULL,
37+
product_name STRING,
38+
category STRING,
39+
price DECIMAL(10, 2),
40+
stock INT,
41+
PRIMARY KEY (product_id) NOT ENFORCED
42+
) WITH (
43+
'connector' = 'gaussdb-cdc',
44+
'hostname' = '10.250.0.181',
45+
'port' = '40020',
46+
'username' = 'tom',
47+
'password' = 'Gauss_235',
48+
'database-name' = 'db1',
49+
'schema-name' = 'public',
50+
'table-name' = 'products',
51+
'slot.name' = 'flink_cdc_simplified_dn2',
52+
'decoding.plugin.name' = 'mppdb_decoding'
53+
);
54+
55+
CREATE TABLE source_dn3 (
56+
product_id INT NOT NULL,
57+
product_name STRING,
58+
category STRING,
59+
price DECIMAL(10, 2),
60+
stock INT,
61+
PRIMARY KEY (product_id) NOT ENFORCED
62+
) WITH (
63+
'connector' = 'gaussdb-cdc',
64+
'hostname' = '10.250.0.157',
65+
'port' = '40040',
66+
'username' = 'tom',
67+
'password' = 'Gauss_235',
68+
'database-name' = 'db1',
69+
'schema-name' = 'public',
70+
'table-name' = 'products',
71+
'slot.name' = 'flink_cdc_simplified_dn3',
72+
'decoding.plugin.name' = 'mppdb_decoding'
73+
);
74+
75+
-- =====================================================
76+
-- 2. Simplified Sink Table
77+
-- Single Sink is now efficient for both Snapshot and Incremental phases
78+
-- =====================================================
79+
CREATE TABLE products_sink (
80+
product_id INT PRIMARY KEY NOT ENFORCED,
81+
product_name STRING,
82+
category STRING,
83+
price DECIMAL(10, 2),
84+
stock INT
85+
) WITH (
86+
'connector' = 'jdbc',
87+
'url' = 'jdbc:gaussdb://10.250.0.30:8000/db1?currentSchema=public',
88+
'table-name' = 'products_sink',
89+
'username' = 'tom',
90+
'password' = 'Gauss_235',
91+
'driver' = 'com.huawei.gaussdb.jdbc.Driver',
92+
-- Batch settings for overall performance
93+
'sink.buffer-flush.max-rows' = '2000',
94+
'sink.buffer-flush.interval' = '1s'
95+
);
96+
97+
-- =====================================================
98+
-- 3. Unified Sync (Transparent UPSERT)
99+
-- =====================================================
100+
-- =====================================================
101+
-- 3. Unified Sync (Independent Pipelines for Stability)
102+
-- =====================================================
103+
BEGIN STATEMENT SET;
104+
INSERT INTO products_sink SELECT product_id, product_name, category, price, stock FROM source_dn1;
105+
INSERT INTO products_sink SELECT product_id, product_name, category, price, stock FROM source_dn2;
106+
INSERT INTO products_sink SELECT product_id, product_name, category, price, stock FROM source_dn3;
107+
END;

0 commit comments

Comments
 (0)