@@ -560,3 +560,366 @@ nc -zv <GAUSSDB_IP> <GAUSSDB_PORT>
560560| ` table.exec.sink.upsert-materialize ` | ` NONE ` | 禁用物化,直接下发 DELETE |
561561| ` slot.name ` | 唯一值 | 每个 DN 必须使用不同的槽名 |
562562| ` decoding.plugin.name ` | ` mppdb_decoding ` | GaussDB 推荐解码插件 |
563+
564+ ---
565+
566+ ## 第五部分:GaussDB 作为 Sink 端
567+
568+ 本章介绍如何将 GaussDB 作为目标数据库(Sink),实现 ** GaussDB → GaussDB** 的数据同步。
569+
570+ ### 5.1 背景说明
571+
572+ GaussDB 与 PostgreSQL 兼容,但不完全支持 PostgreSQL 的 ` ON CONFLICT ` 语法用于 upsert 操作。为解决此问题,我们实现了** 自定义 GaussDB JDBC Dialect** ,使用 GaussDB 兼容的 ` MERGE INTO ` 语法。
573+
574+ #### 技术实现
575+
576+ | 组件 | 文件 | 功能 |
577+ | ------| ------| ------|
578+ | 自定义方言 | ` GaussDBJdbcDialect.java ` | 使用 ` MERGE INTO ` 语法替代 ` ON CONFLICT ` |
579+ | 方言工厂 | ` GaussDBJdbcDialectFactory.java ` | 识别 ` jdbc:gaussdb:// ` URL 并创建方言实例 |
580+
581+ #### MERGE INTO 语法示例
582+
583+ ``` sql
584+ MERGE INTO products_sink t
585+ USING (SELECT :product_id AS " product_id" , :product_name AS " product_name" , ...) AS s
586+ ON t." product_id" = s." product_id"
587+ WHEN MATCHED THEN UPDATE SET " product_name" = s." product_name" , ...
588+ WHEN NOT MATCHED THEN INSERT (...) VALUES (...);
589+ ```
590+
591+ ---
592+
593+ ### 5.2 环境准备(GaussDB Sink)
594+
595+ #### 5.2.1 确保自定义方言已编译
596+
597+ 方言类位于 ` flink-connector-gaussdb-cdc ` 模块中,编译步骤同 1.1 节。
598+
599+ #### 5.2.2 准备修改版 JDBC Connector
600+
601+ 由于 Flink JDBC Connector 使用 SPI 机制发现方言,需要将自定义方言类注入到 ` flink-connector-jdbc.jar ` 中:
602+
603+ ``` bash
604+ # 定义变量
605+ PROJECT_ROOT=$( pwd)
606+ CONNECTOR_MODULE=" flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc"
607+ JDBC_MOD_DIR=" /tmp/jdbc_mod_gaussdb"
608+
609+ # 提取原始 JDBC Connector
610+ rm -rf " $JDBC_MOD_DIR " && mkdir -p " $JDBC_MOD_DIR " && cd " $JDBC_MOD_DIR "
611+ cp /path/to/flink-connector-jdbc.jar .
612+ unzip -q -o flink-connector-jdbc.jar -d extracted
613+
614+ # 添加 GaussDB 方言工厂到 SPI 文件
615+ echo " org.apache.flink.cdc.connectors.gaussdb.jdbc.GaussDBJdbcDialectFactory" >> \
616+ extracted/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
617+
618+ # 拷贝方言类
619+ mkdir -p extracted/org/apache/flink/cdc/connectors/gaussdb/jdbc
620+ cp " $PROJECT_ROOT /$CONNECTOR_MODULE /target/classes/org/apache/flink/cdc/connectors/gaussdb/jdbc/" * .class \
621+ extracted/org/apache/flink/cdc/connectors/gaussdb/jdbc/
622+
623+ # 重新打包
624+ cd extracted && jar -cf ../flink-connector-jdbc-gaussdb.jar . && cd ..
625+
626+ echo " 修改版 JDBC Connector 已生成: $JDBC_MOD_DIR /flink-connector-jdbc-gaussdb.jar"
627+ ```
628+
629+ #### 5.2.3 分发 JAR 包到 Flink 容器
630+
631+ ``` bash
632+ JAR_PATH=" flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-gaussdb-cdc/target/flink-sql-connector-gaussdb-cdc-3.6-SNAPSHOT.jar"
633+ JDBC_DRIVER=" flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/lib/gaussdbjdbc.jar"
634+ MODIFIED_JDBC_CONNECTOR=" $JDBC_MOD_DIR /flink-connector-jdbc-gaussdb.jar"
635+
636+ for container in flink-jobmanager flink-taskmanager; do
637+ # 清理旧 JAR
638+ docker exec $container rm -f /opt/flink/lib/flink-connector-jdbc* .jar
639+
640+ # 拷贝新 JAR
641+ docker cp $JAR_PATH $container :/opt/flink/lib/
642+ docker cp $JDBC_DRIVER $container :/opt/flink/lib/
643+ docker cp $MODIFIED_JDBC_CONNECTOR $container :/opt/flink/lib/flink-connector-jdbc.jar
644+ done
645+
646+ # 重启容器
647+ docker restart flink-jobmanager flink-taskmanager
648+ sleep 30
649+ ```
650+
651+ ---
652+
653+ ### 5.3 初始化 GaussDB Sink 表
654+
655+ 通过 CN 连接到目标 GaussDB:
656+
657+ ``` bash
658+ psql -h < CN_IP> -p 8000 -U < USERNAME> -d db1
659+ ```
660+
661+ 创建 Sink 表:
662+
663+ ``` sql
664+ DROP TABLE IF EXISTS products_sink CASCADE;
665+
666+ CREATE TABLE products_sink (
667+ product_id INTEGER PRIMARY KEY ,
668+ product_name VARCHAR (200 ),
669+ category VARCHAR (50 ),
670+ price DECIMAL (10 ,2 ),
671+ stock INTEGER
672+ );
673+ ```
674+
675+ ---
676+
677+ ### 5.4 创建 Flink SQL 任务文件
678+
679+ 创建 ` gaussdb_to_gaussdb.sql ` 文件:
680+
681+ ``` sql
682+ -- =====================================================
683+ -- Flink SQL: 分布式 GaussDB -> GaussDB (via CN) 同步任务
684+ -- =====================================================
685+
686+ -- 优化配置
687+ SET ' table.exec.state.ttl' = ' 1 h' ;
688+ SET ' table.exec.sink.upsert-materialize' = ' NONE' ;
689+
690+ -- =====================================================
691+ -- DN1 数据源
692+ -- =====================================================
693+ CREATE TABLE products_source_dn1 (
694+ product_id INT NOT NULL ,
695+ product_name STRING,
696+ category STRING,
697+ price DECIMAL (10 , 2 ),
698+ stock INT ,
699+ PRIMARY KEY (product_id) NOT ENFORCED
700+ ) WITH (
701+ ' connector' = ' gaussdb-cdc' ,
702+ ' hostname' = ' <DN1_IP>' ,
703+ ' port' = ' <DN1_PORT>' ,
704+ ' username' = ' <USERNAME>' ,
705+ ' password' = ' <PASSWORD>' ,
706+ ' database-name' = ' db1' ,
707+ ' schema-name' = ' public' ,
708+ ' table-name' = ' products' ,
709+ ' slot.name' = ' flink_cdc_g2g_dn1' ,
710+ ' decoding.plugin.name' = ' mppdb_decoding' ,
711+ ' scan.incremental.snapshot.enabled' = ' true'
712+ );
713+
714+ -- =====================================================
715+ -- DN2 数据源
716+ -- =====================================================
717+ CREATE TABLE products_source_dn2 (
718+ product_id INT NOT NULL ,
719+ product_name STRING,
720+ category STRING,
721+ price DECIMAL (10 , 2 ),
722+ stock INT ,
723+ PRIMARY KEY (product_id) NOT ENFORCED
724+ ) WITH (
725+ ' connector' = ' gaussdb-cdc' ,
726+ ' hostname' = ' <DN2_IP>' ,
727+ ' port' = ' <DN2_PORT>' ,
728+ ' username' = ' <USERNAME>' ,
729+ ' password' = ' <PASSWORD>' ,
730+ ' database-name' = ' db1' ,
731+ ' schema-name' = ' public' ,
732+ ' table-name' = ' products' ,
733+ ' slot.name' = ' flink_cdc_g2g_dn2' ,
734+ ' decoding.plugin.name' = ' mppdb_decoding' ,
735+ ' scan.incremental.snapshot.enabled' = ' true'
736+ );
737+
738+ -- =====================================================
739+ -- DN3 数据源
740+ -- =====================================================
741+ CREATE TABLE products_source_dn3 (
742+ product_id INT NOT NULL ,
743+ product_name STRING,
744+ category STRING,
745+ price DECIMAL (10 , 2 ),
746+ stock INT ,
747+ PRIMARY KEY (product_id) NOT ENFORCED
748+ ) WITH (
749+ ' connector' = ' gaussdb-cdc' ,
750+ ' hostname' = ' <DN3_IP>' ,
751+ ' port' = ' <DN3_PORT>' ,
752+ ' username' = ' <USERNAME>' ,
753+ ' password' = ' <PASSWORD>' ,
754+ ' database-name' = ' db1' ,
755+ ' schema-name' = ' public' ,
756+ ' table-name' = ' products' ,
757+ ' slot.name' = ' flink_cdc_g2g_dn3' ,
758+ ' decoding.plugin.name' = ' mppdb_decoding' ,
759+ ' scan.incremental.snapshot.enabled' = ' true'
760+ );
761+
762+ -- =====================================================
763+ -- GaussDB Sink (通过 CN 写入)
764+ -- 使用自定义 GaussDB JDBC Dialect (支持 MERGE INTO)
765+ -- =====================================================
766+ CREATE TABLE products_sink (
767+ product_id INT NOT NULL ,
768+ product_name STRING,
769+ category STRING,
770+ price DECIMAL (10 , 2 ),
771+ stock INT ,
772+ PRIMARY KEY (product_id) NOT ENFORCED
773+ ) WITH (
774+ ' connector' = ' jdbc' ,
775+ ' url' = ' jdbc:gaussdb://<CN_IP>:8000/db1?currentSchema=public' ,
776+ ' table-name' = ' products_sink' ,
777+ ' username' = ' <USERNAME>' ,
778+ ' password' = ' <PASSWORD>' ,
779+ ' driver' = ' com.huawei.gaussdb.jdbc.Driver'
780+ );
781+
782+ -- =====================================================
783+ -- 启动同步作业(合并所有 DN 数据)
784+ -- =====================================================
785+ INSERT INTO products_sink
786+ SELECT product_id, product_name, category, price, stock FROM products_source_dn1
787+ UNION ALL
788+ SELECT product_id, product_name, category, price, stock FROM products_source_dn2
789+ UNION ALL
790+ SELECT product_id, product_name, category, price, stock FROM products_source_dn3;
791+ ```
792+
793+ > ** 关键配置说明** :
794+ > - ` url ` : 使用 ` jdbc:gaussdb:// ` 协议以触发自定义 GaussDB 方言
795+ > - ` driver ` : 使用 ` com.huawei.gaussdb.jdbc.Driver ` (来自 gaussdbjdbc.jar)
796+
797+ ---
798+
799+ ### 5.5 提交 Flink 任务
800+
801+ ``` bash
802+ # 拷贝 SQL 文件到 JobManager 容器
803+ docker cp gaussdb_to_gaussdb.sql flink-jobmanager:/opt/flink/
804+
805+ # 提交任务
806+ docker exec -it flink-jobmanager /opt/flink/bin/sql-client.sh -f /opt/flink/gaussdb_to_gaussdb.sql
807+ ```
808+
809+ ---
810+
811+ ### 5.6 验证同步功能
812+
813+ #### 测试 INSERT
814+
815+ 在 GaussDB 中执行(通过 CN):
816+ ``` sql
817+ INSERT INTO products VALUES (6001 , ' G2G 产品A' , ' 测试' , 100 .00 , 10 );
818+ INSERT INTO products VALUES (6002 , ' G2G 产品B' , ' 测试' , 200 .00 , 20 );
819+ ```
820+
821+ 验证 Sink 表:
822+ ``` bash
823+ psql -h < CN_IP> -p 8000 -U < USERNAME> -d db1 -c " SELECT * FROM products_sink WHERE product_id >= 6000;"
824+ ```
825+
826+ 预期:应看到 2 条记录。
827+
828+ #### 测试 UPDATE
829+
830+ ``` sql
831+ UPDATE products SET price = 150 .00 , stock = 15 WHERE product_id = 6001 ;
832+ ```
833+
834+ 验证:
835+ ``` bash
836+ psql -h < CN_IP> -p 8000 -U < USERNAME> -d db1 -c " SELECT * FROM products_sink WHERE product_id = 6001;"
837+ ```
838+
839+ 预期:` price ` 为 ` 150.00 ` ,` stock ` 为 ` 15 ` 。
840+
841+ #### 测试 DELETE
842+
843+ ``` sql
844+ DELETE FROM products WHERE product_id = 6002 ;
845+ ```
846+
847+ 验证:
848+ ``` bash
849+ psql -h < CN_IP> -p 8000 -U < USERNAME> -d db1 -c " SELECT COUNT(*) FROM products_sink WHERE product_id = 6002;"
850+ ```
851+
852+ 预期:` 0 `
853+
854+ ---
855+
856+ ### 5.7 一键测试脚本
857+
858+ 项目提供了一键测试脚本,可自动完成部署和验证:
859+
860+ ``` bash
861+ # 运行完整的 GaussDB -> GaussDB 测试
862+ bash test_gaussdb_to_gaussdb_cdc.sh
863+ ```
864+
865+ 该脚本会自动:
866+ 1 . 编译并打包连接器
867+ 2 . 创建修改版 JDBC Connector(包含 GaussDB 方言)
868+ 3 . 部署到 Flink 集群
869+ 4 . 初始化 Sink 表
870+ 5 . 提交 Flink SQL 任务
871+
872+ ---
873+
874+ ### 5.8 GaussDB Sink 常见问题
875+
876+ #### 问题 1:报错 ` Could not find any jdbc dialect factory that can handle url 'jdbc:gaussdb://...' `
877+
878+ ** 原因** :自定义 GaussDB 方言未正确加载。
879+
880+ ** 解决** :
881+ 1 . 确认方言类已编译到目标目录
882+ 2 . 确认已将方言工厂添加到 SPI 文件
883+ 3 . 确认修改版 JDBC Connector 已部署到 ` /opt/flink/lib/ `
884+
885+ #### 问题 2:报错 ` SQL statement must not contain ? character `
886+
887+ ** 原因** :方言未被正确调用,使用了默认的 PostgreSQL 方言。
888+
889+ ** 解决** :
890+ 1 . 确认 ` url ` 使用 ` jdbc:gaussdb:// ` 协议
891+ 2 . 确认 SPI 文件包含 ` GaussDBJdbcDialectFactory `
892+ 3 . 重启 Flink 容器
893+
894+ #### 问题 3:报错 ` ON CONFLICT is supported only in PG-format database `
895+
896+ ** 原因** :使用了错误的 JDBC URL 或驱动。
897+
898+ ** 解决** :
899+ - 确保 Sink 配置中:
900+ - ` url ` = ` jdbc:gaussdb://... ` (而不是 ` jdbc:postgresql:// ` )
901+ - ` driver ` = ` com.huawei.gaussdb.jdbc.Driver `
902+
903+ #### 问题 4:ClassNotFoundException: com.huawei.gaussdb.jdbc.Driver
904+
905+ ** 原因** :` gaussdbjdbc.jar ` 未正确部署。
906+
907+ ** 解决** :
908+ ``` bash
909+ docker cp gaussdbjdbc.jar flink-taskmanager:/opt/flink/lib/
910+ docker restart flink-taskmanager
911+ ```
912+
913+ ---
914+
915+ ## 附录 B:GaussDB Sink 配置参考
916+
917+ | 配置项 | 值 | 说明 |
918+ | --------| ---| ------|
919+ | ` connector ` | ` jdbc ` | 使用 Flink JDBC Connector |
920+ | ` url ` | ` jdbc:gaussdb://<host>:<port>/<db> ` | 必须使用 ` jdbc:gaussdb:// ` 协议 |
921+ | ` driver ` | ` com.huawei.gaussdb.jdbc.Driver ` | GaussDB JDBC 驱动类 |
922+ | ` table-name ` | 表名 | 目标表名称 |
923+ | ` username ` | 用户名 | 数据库用户 |
924+ | ` password ` | 密码 | 数据库密码 |
925+
0 commit comments