Skip to content

Commit 7a17508

Browse files
committed
flink-connector 支持 Checkandput,1.4.2
1 parent ef25e1c commit 7a17508

35 files changed

Lines changed: 2281 additions & 387 deletions

File tree

hologres-connector-examples/hologres-connector-flink-examples/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ mvn package -DskipTests
155155
当前example默认使用Flink 1.15版本,实际使用时connector版本请与Flink集群的版本保持一致
156156

157157
```
158-
flink run -c com.alibaba.ververica.connectors.hologres.example.FlinkRoaringBitmapAggJob target/hologres-connector-flink-examples-1.4.1-SNAPSHOT-jar-with-dependencies.jar --propsFilePath src/main/resources/setting.properties --sourceFilePath ods_app_example.csv
158+
flink run -c com.alibaba.ververica.connectors.hologres.example.FlinkRoaringBitmapAggJob target/hologres-connector-flink-examples-1.4.2-SNAPSHOT-jar-with-dependencies.jar --propsFilePath src/main/resources/setting.properties --sourceFilePath ods_app_example.csv
159159
```
160160

161161
需要在```src/main/resources/setting.properties```文件中替换对应的endpoint、username、password、database等参数, 并根据实际情况调整窗口大小
@@ -314,10 +314,10 @@ mvn package -DskipTests
314314

315315
```bash
316316
# 使用以下命令提交作业, 通过sqlFilePath指定sql文件绝对路径,示例repartition.sql文件内容见下方,声明了sourceDDL、sourceDql、sinkDDL三条sql
317-
flink run -Dexecution.runtime-mode=BATCH -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-examples-1.4.1-SNAPSHOT-jar-with-dependencies.jar --sqlFilePath="xx/src/main/resources/repartition.sql"
317+
flink run -Dexecution.runtime-mode=BATCH -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-examples-1.4.2-SNAPSHOT-jar-with-dependencies.jar --sqlFilePath="xx/src/main/resources/repartition.sql"
318318

319319
# 或者分别指定sourceDDL,sourceDQL,sinkDDL三个参数,可能需要根据本地bash环境对换行符或者反引号等特殊字符进行转义
320-
flink run -Dexecution.runtime-mode=BATCH -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-examples-1.4.1-SNAPSHOT-jar-with-dependencies.jar --sourceDDL="create temporary table source_table (c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INTEGER, c_phone STRING, c_acctbal NUMERIC(15, 2), c_mktsegment STRING, c_comment STRING) with ( 'connector' = 'datagen', 'rows-per-second' = '10000', 'number-of-rows' = '1000000' )" --sourceDQL="select *, cast('2024-04-21' as DATE) from source_table" --sinkDDL="create table sink_table (c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INTEGER, c_phone STRING, c_acctbal NUMERIC(15, 2), c_mktsegment STRING, c_comment STRING, \`date\` STRING) with ( 'connector' = 'hologres', 'dbname' = 'test_db', 'tablename' = 'test_sink_customer', 'username' = '', 'password' = '', 'endpoint' = '', 'jdbccopywritemode' = 'true', 'bulkload' = 'true' )"
320+
flink run -Dexecution.runtime-mode=BATCH -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-examples-1.4.2-SNAPSHOT-jar-with-dependencies.jar --sourceDDL="create temporary table source_table (c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INTEGER, c_phone STRING, c_acctbal NUMERIC(15, 2), c_mktsegment STRING, c_comment STRING) with ( 'connector' = 'datagen', 'rows-per-second' = '10000', 'number-of-rows' = '1000000' )" --sourceDQL="select *, cast('2024-04-21' as DATE) from source_table" --sinkDDL="create table sink_table (c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INTEGER, c_phone STRING, c_acctbal NUMERIC(15, 2), c_mktsegment STRING, c_comment STRING, \`date\` STRING) with ( 'connector' = 'hologres', 'dbname' = 'test_db', 'tablename' = 'test_sink_customer', 'username' = '', 'password' = '', 'endpoint' = '', 'jdbccopywritemode' = 'true', 'bulkload' = 'true' )"
321321
```
322322

323323
#### FlinkToHoloRePartitionExample 代码核心逻辑

hologres-connector-examples/hologres-connector-flink-examples/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<groupId>com.alibaba.hologres</groupId>
99
<artifactId>hologres-connector-flink-examples</artifactId>
10-
<version>1.4.1-SNAPSHOT</version>
10+
<version>1.4.2-SNAPSHOT</version>
1111

1212
<properties>
1313
<java.version>1.8</java.version>
@@ -23,7 +23,7 @@
2323
<dependency>
2424
<groupId>com.alibaba.hologres</groupId>
2525
<artifactId>hologres-connector-flink-1.15</artifactId>
26-
<version>1.4.1-SNAPSHOT</version>
26+
<version>1.4.2-SNAPSHOT</version>
2727
<classifier>jar-with-dependencies</classifier>
2828
</dependency>
2929
<dependency>

hologres-connector-examples/hologres-connector-flink-ordergen/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<groupId>com.alibaba.hologres</groupId>
99
<artifactId>hologres-connector-flink-ordergen</artifactId>
10-
<version>1.4.1-SNAPSHOT</version>
10+
<version>1.4.2-SNAPSHOT</version>
1111

1212
<properties>
1313
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

hologres-connector-examples/hologres-connector-spark-examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<dependency>
2020
<groupId>com.alibaba.hologres</groupId>
2121
<artifactId>hologres-connector-spark-3.x</artifactId>
22-
<version>1.4.1-SNAPSHOT</version>
22+
<version>1.4.2-SNAPSHOT</version>
2323
<classifier>jar-with-dependencies</classifier>
2424
</dependency>
2525
<dependency>

hologres-connector-flink-1.15/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<dependency>
1717
<groupId>com.alibaba.hologres</groupId>
1818
<artifactId>hologres-connector-flink-1.15</artifactId>
19-
<version>1.4.0</version>
19+
<version>1.4.2</version>
2020
<classifier>jar-with-dependencies</classifier>
2121
</dependency>
2222
```
@@ -95,6 +95,9 @@ mvn clean install -N
9595
| remove-u0000-in-text.enabled | 设置为true时,会自动替换text类型中非UTF-8的u0000字符 || 默认值:false |
9696
| deduplication.enabled | 如果一批数据中有主键相同的数据,是否进行去重,只保留最后一条到达的数据 || 默认值:true |
9797
| aggressive.enabled | 是否启用激进提交模式 || 默认值:false。设置为true时,即使攒批没有达到预期条数,只要发现连接空闲就会强制提交,在流量较小时,可以有效减少数据延时 |
98+
| check-and-put.column | 启用条件更新能力, 并指定检查的字段名 || 默认值:无。 必须设置为holo表存在的字段名,表必须有主键 |
99+
| check-and-put.operator | 条件更新操作的比较操作符 || 默认值:GREATER, 表示仅当新到record的check字段大于表中原有值时,才会进行更新。目前支持配置为GREATER, GREATER_OR_EQUAL, EQUAL, NOT_EQUAL, LESS, LESS_OR_EQUAL, IS_NULL, IS_NOT_NULL |
100+
| check-and-put.null-as | 当条件更新的旧数据为null时,我们把null值视为此参数配置的值 || 默认值:无。由于postgres中,任何值与null比较,结果都是false,因此当表中原有数据为null时,想要更新,需要为其设置一个nullas参数,相当于sql中的coalesce函数 |
98101
| jdbcCopyWriteMode | 是否使用copy方式写入 || 默认值false。设置为true时,默认使用fixed_copy,fixed copy是hologres1.3新增的能力,相比insert方法,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批),但不支持回撤, 也不支持写入分区父表。 |
99102
| jdbcCopyWriteFormat | 底层是否走二进制协议 || 默认为binary。表示使用二进制模式,二进制会更快,否则为文本模式。 |
100103
| bulkLoad | 是否使用批量copy方式写入 || 默认为false。是否启用批量COPY导入,与jdbcCopyWriteMode参数同时设置为true时生效,批量copy相比fixed copy,写入时使用的hologres资源更小,默认情况下,仅支持写入无主键表,因为写入有主键表时会有表锁 |

hologres-connector-flink-1.15/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>com.alibaba.hologres</groupId>
1010
<artifactId>hologres-connector-parent</artifactId>
11-
<version>1.4.1-SNAPSHOT</version>
11+
<version>1.4.2-SNAPSHOT</version>
1212
</parent>
1313

1414
<artifactId>hologres-connector-flink-1.15</artifactId>
@@ -108,6 +108,12 @@
108108
<type>test-jar</type>
109109
<scope>test</scope>
110110
</dependency>
111+
<dependency>
112+
<groupId>org.apache.flink</groupId>
113+
<artifactId>flink-test-utils</artifactId>
114+
<version>${flink.version}</version>
115+
<scope>test</scope>
116+
</dependency>
111117
<dependency>
112118
<groupId>junit</groupId>
113119
<artifactId>junit</artifactId>

0 commit comments

Comments
 (0)