Skip to content

Commit fad463d

Browse files
committed
flink、spark 新增repartition之后写入有主键空表的demo
1 parent 8661097 commit fad463d

14 files changed

Lines changed: 1001 additions & 141 deletions

File tree

hologres-connector-examples/hologres-connector-flink-examples/README.md

Lines changed: 198 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
* 5.FlinkRoaringBitmapAggJob
1818

1919
一个使用FLink及RoaringBitmap,结合Hologres维表,实现实时去重统计UV的应用,并将统计结果写入Hologres
20+
* 6.FlinkToHoloRePartitionExample
21+
22+
一个使用FLink DataStream接口实现的应用,将数据根据holo的shard进行repartition之后再写入至Hologres,可以减少hologres实例侧的小文件数量,提高写入性能的情况下降低负载。
2023

2124
## 在IDEA中运行和调试
2225
* 以下是针对提交作业到Flink集群的情况,用户也可以在IDEA等编辑器中运行代码,只需要将pom.xml文件中各flink依赖的`<scope>provided</scope>`删除即可
2326

24-
## Flink 1.14
25-
* 当前example使用Flink 1.13版本, 如需使用Flink 1.14, 由于版本依赖变动,需要将pom.xml文件中各flink依赖的`-blink`后缀去掉
27+
## Flink 1.15
28+
* 当前example使用Flink 1.15版本, 如需使用其他版本的Flink, 需要适当调整将pom.xml文件中flink依赖项的版本
2629

2730
## 运行Example 1,2,3
2831

@@ -36,7 +39,7 @@
3639
```create table sink_table(user_id bigint, user_name text, price decimal(38,2), sale_timestamp timestamptz);```
3740

3841
### 提交Flink Example作业
39-
当前example默认使用Flink 1.13版本,实际使用时connector版本请与Flink集群的版本保持一致
42+
当前example默认使用Flink 1.15版本,实际使用时connector版本请与Flink集群的版本保持一致
4043

4144
```
4245
flink run -c com.alibaba.ververica.connectors.hologres.example.FlinkDSAndSQLToHoloExample target/hologres-connector-flink-examples-1.3-SNAPSHOT-jar-with-dependencies.jar --endpoint ${ip:port} --username ${user_name} --password ${password} --database {database} --tablename sink_table
@@ -60,7 +63,7 @@ create table sink_table(user_id bigint, user_name text, price decimal(38,2), sal
6063
```
6164

6265
### 提交Flink Example作业
63-
当前example默认使用Flink 1.13版本,实际使用时connector版本请与Flink集群的版本保持一致
66+
当前example默认使用Flink 1.15版本,实际使用时connector版本请与Flink集群的版本保持一致
6467

6568
```
6669
flink run -c com.alibaba.ververica.connectors.hologres.example.FlinkSQLSourceAndSinkExample target/hologres-connector-flink-examples-1.3-SNAPSHOT-jar-with-dependencies.jar --endpoint ${ip:port} --username ${user_name} --password ${password} --database {database} --source source_table --sink sink_table
@@ -124,9 +127,9 @@ create table dws_app(
124127
prov text,
125128
city text,
126129
ymd text NOT NULL, --日期字段
127-
timetz TIMESTAMPTZ, --统计时间戳,可以实现以Flink窗口周期为单位的统计
130+
event_window_time TIMESTAMPTZ, --统计时间戳,可以实现以Flink窗口周期为单位的统计
128131
uid32_bitmap roaringbitmap, -- 使用roaringbitmap记录uv
129-
primary key(country, prov, city, ymd, timetz)--查询维度和时间作为主键,防止重复插入数据
132+
primary key(country, prov, city, ymd, event_window_time)--查询维度和时间作为主键,防止重复插入数据
130133
);
131134
CALL set_table_property('public.dws_app', 'orientation', 'column');
132135
--clustering_key和event_time_column设为日期字段,便于过滤
@@ -149,14 +152,47 @@ mvn package -DskipTests
149152
```
150153

151154
#### 提交Flink 作业
152-
当前example默认使用Flink 1.13版本,实际使用时connector版本请与Flink集群的版本保持一致
155+
当前example默认使用Flink 1.15版本,实际使用时connector版本请与Flink集群的版本保持一致
153156

154157
```
155-
flink run -c com.alibaba.ververica.connectors.hologres.example.FlinkRoaringBitmapAggJob target/hologres-connector-flink-examples-1.0.0-jar-with-dependencies.jar --propsFilePath src/main/resources/setting.properties --sourceFilePath ods_app_example.csv
158+
flink run -c com.alibaba.ververica.connectors.hologres.example.FlinkRoaringBitmapAggJob target/hologres-connector-flink-examples-1.4.1-SNAPSHOT-jar-with-dependencies.jar --propsFilePath src/main/resources/setting.properties --sourceFilePath ods_app_example.csv
156159
```
157160

158-
其中需要在```src/main/resources/setting.properties```文件中替换对应的endpoint、username、password、database等参数
161+
需要在```src/main/resources/setting.properties```文件中替换对应的endpoint、username、password、database等参数, 并根据实际情况调整窗口大小
162+
```properties
163+
# hologres-flink-connector参数
164+
endpoint=
165+
username=
166+
password=
167+
database=
168+
# 维表名
169+
dimTableName=uid_mapping
170+
# 结果表名
171+
dwsTableName=dws_app
172+
#窗口大小,单位为s
173+
windowSize=2
174+
```
159175
程序中读取的csv格式示例在```src/main/resources/ods_app_example.csv```中,实际使用时结合数据源情况修改fieldsMask等
176+
```csv
177+
uid,country,prov,city,channel,operator,brand,ip,click_time,year,month,day,ymd
178+
1ae58016,中国,广东,东莞,android,pros,a,192.168.1.1,2021-03-29 13:34:00,2021,3,29,20210329
179+
2ae7788c,中国,广东,深圳,android,pros,a,192.168.1.8,2021-03-29 13:34:00,2021,3,29,20210329
180+
1c90ab08,美国,新泽西州,阿布西肯,ios,cons,a,192.168.1.2,2021-03-29 13:34:00,2021,3,29,20210329
181+
e7847f80,中国,天津,天津,ios,pros,b,192.168.1.3,2021-03-29 13:34:00,2021,3,29,20210329
182+
1ae58016,中国,广东,东莞,android,pros,a,192.168.1.1,2021-03-29 13:34:00,2021,3,29,20210329
183+
1c90ab08,美国,新泽西州,阿布西肯,ios,cons,a,192.168.1.2,2021-03-29 13:34:01,2021,3,29,20210329
184+
e7847f80,中国,天津,天津,ios,pros,b,192.168.1.3,2021-03-29 13:34:01,2021,3,29,20210329
185+
1ae58016,中国,广东,东莞,android,pros,a,192.168.1.1,2021-03-29 13:34:01,2021,3,29,20210329
186+
1c90ab08,美国,新泽西州,阿布西肯,ios,cons,a,192.168.1.2,2021-03-29 13:34:01,2021,3,29,20210329
187+
e7847f80,中国,天津,天津,ios,pros,b,192.168.1.3,2021-03-29 13:34:02,2021,3,29,20210329
188+
1ae58016,中国,广东,东莞,android,pros,a,192.168.1.1,2021-03-29 13:34:02,2021,3,29,20210329
189+
1c90ab08,美国,新泽西州,阿布西肯,ios,cons,a,192.168.1.2,2021-03-29 13:34:02,2021,3,29,20210329
190+
e7847f80,中国,天津,天津,ios,pros,b,192.168.1.3,2021-03-29 13:34:02,2021,3,29,20210329
191+
1ae58016,中国,广东,东莞,android,pros,a,192.168.1.1,2021-03-29 13:34:03,2021,3,29,20210329
192+
1c90ab08,美国,新泽西州,阿布西肯,ios,cons,a,192.168.1.2,2021-03-29 13:34:03,2021,3,29,20210329
193+
e7847f80,中国,天津,天津,ios,pros,b,192.168.1.3,2021-03-29 13:34:03,2021,3,29,20210329
194+
e8855f8d,中国,天津,天津,ios,pros,b,192.168.1.5,2021-03-29 13:34:03,2021,3,29,20210329
195+
```
160196

161197
#### FlinkRoaringBitmapAggJob 代码核心逻辑
162198
* Flink 流式读取数据源(DataStream),并转化为源表(Table)
@@ -165,6 +201,29 @@ flink run -c com.alibaba.ververica.connectors.hologres.example.FlinkRoaringBitma
165201
* 关联结果转化为DataStream,通过Flink时间窗口处理,结合RoaringBitmap进行聚合,并以窗口为周期写入结果表。
166202

167203
### 3.查询
204+
* resources/ods_app_example.csv中的示例数据,当窗口的大小为2s时,结果如下
205+
206+
```sql
207+
select * from uid_mapping ;
208+
uid | uid_int32
209+
----------+-----------
210+
1c90ab08 | 1
211+
2ae7788c | 9
212+
e7847f80 | 8
213+
e8855f8d | 10
214+
1ae58016 | 5
215+
216+
select country,prov,city,ymd,event_window_time, rb_to_array(uid32_bitmap) from dws_app order by event_window_time;
217+
country | prov | city | ymd | event_window_time | rb_to_array
218+
---------+----------+----------+----------+------------------------+-------------
219+
中国 | 广东 | 东莞 | 20210329 | 2021-03-29 13:34:02+08 | {5}
220+
中国 | 天津 | 天津 | 20210329 | 2021-03-29 13:34:02+08 | {8}
221+
美国 | 新泽西州 | 阿布西肯 | 20210329 | 2021-03-29 13:34:02+08 | {1}
222+
中国 | 广东 | 深圳 | 20210329 | 2021-03-29 13:34:02+08 | {9}
223+
美国 | 新泽西州 | 阿布西肯 | 20210329 | 2021-03-29 13:34:04+08 | {1}
224+
中国 | 广东 | 东莞 | 20210329 | 2021-03-29 13:34:04+08 | {5}
225+
中国 | 天津 | 天津 | 20210329 | 2021-03-29 13:34:04+08 | {8,10}
226+
```
168227

169228
* 查询时,从基础聚合表(dws_app)中按照查询维度做聚合计算,查询bitmap基数,得出group by条件下的用户数
170229

@@ -186,6 +245,12 @@ GROUP BY country
186245
,prov
187246
,city
188247
;
248+
country | prov | city | uv
249+
---------+----------+----------+----
250+
中国 | 广东 | 东莞 | 1
251+
美国 | 新泽西州 | 阿布西肯 | 1
252+
中国 | 广东 | 深圳 | 1
253+
中国 | 天津 | 天津 | 2
189254
```
190255

191256
* 查询某段时间内各个省份的uv
@@ -195,8 +260,131 @@ SELECT country
195260
,prov
196261
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
197262
FROM dws_app
198-
WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
263+
WHERE event_window_time > '2021-03-29 13:30:00+08' and event_window_time <= '2021-03-29 13:34:05+08'
199264
GROUP BY country
200265
,prov
201266
;
267+
268+
-- 结果如下
269+
country | prov | uv
270+
---------+----------+----
271+
美国 | 新泽西州 | 1
272+
中国 | 天津 | 2
273+
中国 | 广东 | 2
274+
```
275+
## 运行Example 6
276+
277+
### 使用Flink Custom Partition自定义数据分区策略,提高批量写入性能
278+
279+
### 1.创建hologres结果表
280+
281+
* 创建结果表
282+
```
283+
-- 建议根据数据量合理设置shard数
284+
CREATE TABLE test_sink_customer
285+
(
286+
c_custkey BIGINT,
287+
c_name TEXT,
288+
c_address TEXT,
289+
c_nationkey INT,
290+
c_phone TEXT,
291+
c_acctbal NUMERIC(15,2),
292+
c_mktsegment TEXT,
293+
c_comment TEXT,
294+
"date" DATE
295+
) with (
296+
distribution_key="c_custkey,date",
297+
orientation="column"
298+
);
299+
```
300+
301+
### 2.运行作业
302+
303+
#### 编译
304+
202305
```
306+
cd hologres-connector-flink-examples
307+
mvn package -DskipTests
308+
```
309+
310+
#### 提交Flink 作业
311+
当前example默认使用Flink 1.15版本,实际使用时connector版本请与Flink集群的版本保持一致
312+
-Dexecution.runtime-mode=BATCH 表示使用批模式提交作业, 详见[执行模式](https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/datastream/execution_mode/)
313+
314+
315+
```bash
316+
# 使用以下命令提交作业, 通过sqlFilePath指定sql文件绝对路径,示例repartition.sql文件内容见下方,声明了sourceDDL、sourceDql、sinkDDL三条sql
317+
flink run -Dexecution.runtime-mode=BATCH -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-examples-1.4.1-SNAPSHOT-jar-with-dependencies.jar --sqlFilePath="xx/src/main/resources/repartition.sql"
318+
319+
# 或者分别指定sourceDDL,sourceDQL,sinkDDL三个参数,可能需要根据本地bash环境对换行符或者反引号等特殊字符进行转义
320+
flink run -Dexecution.runtime-mode=BATCH -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-examples-1.4.1-SNAPSHOT-jar-with-dependencies.jar --sourceDDL="create temporary table source_table (c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INTEGER, c_phone STRING, c_acctbal NUMERIC(15, 2), c_mktsegment STRING, c_comment STRING) with ( 'connector' = 'datagen', 'rows-per-second' = '10000', 'number-of-rows' = '1000000' )" --sourceDQL="select *, cast('2024-04-21' as DATE) from source_table" --sinkDDL="create table sink_table (c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INTEGER, c_phone STRING, c_acctbal NUMERIC(15, 2), c_mktsegment STRING, c_comment STRING, \`date\` STRING) with ( 'connector' = 'hologres', 'dbname' = 'test_db', 'tablename' = 'test_sink_customer', 'username' = '', 'password' = '', 'endpoint' = '', 'jdbccopywritemode' = 'true', 'bulkload' = 'true' )"
321+
```
322+
323+
#### FlinkToHoloRePartitionExample 代码核心逻辑
324+
* Flink SQL方式读取源表数据(Table),并将结果转化为DataStream<Row>
325+
* 自定义分区策略,根据distribution_key对应字段计算出每条数据的shard,将数据重新分区到不同的并发上
326+
* 每个并发上只会写入几个甚至一个shard的数据,可以更好的内存中攒批,同时减少需要合并的小文件数量
327+
* 当写入有主键表时,程序根据当前并发将要写入的shard设置target-shards参数,将默认的表锁改为shard级别的锁,不同的shard可以并发写入,提高写入性能
328+
329+
#### FlinkToHoloRePartitionExample 注意事项
330+
* 本demo主要应用于批量导入数据, 使用时建议使用Flink批模式提交作业,如果使用流模式,可以关闭自动checkpoint功能
331+
* 写入无主键表时,需要设置jdbccopywritemode和bulkload参数为true
332+
* 写入有主键表时,还需要设置target-shards.enabled参数为true,要求批量导入之前有主键表是空表
333+
* flink结果表并发,建议与写入holo目标表的shard数一致
334+
335+
#### 参数说明
336+
* sourceDDL: 源表声明, 示例中使用了datagen源表,使用时根据实际情况替换,注意运行环境中需要相关依赖
337+
* sourceDQL: 源表查询, 查询结果会作为sinkDDL的输入, 因此要求select的字段数量,类型与sinkDDL声明的结果表对应
338+
* sinkDDL: holo结果表声明, 因为需要通过Catalog获取连接信息,因此建表时必须使用CREATE TABLE方式(不能使用CREATE TEMPORARY TABLE),hologres connector会和demo代码一同打包,主要参数解释如下,其他参数的详细解释可以参考connector文档
339+
* jdbccopywritemode: 使用COPY模式写入,默认fixed_copy,fixed copy是hologres1.3新增的能力,相比insert方法,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批),但不支持回撤
340+
* bulkload: 启用批量COPY导入,与jdbcCopyWriteMode参数同时设置为true时生效,批量copy相比fixed copy,写入时使用的hologres资源更小,默认情况下,仅支持写入无主键表,写入有主键表时会有表锁
341+
* target-shards.enabled: bulkload写入有主键表时,默认是表锁.因此在上游数据根据shard进行了repartition的基础上,可以开启此参数.写入有主键表的锁粒度会从表级别调整为shard级别,相比fixedcopy写入有主键表,可以节省holo实例2/3的负载
342+
343+
```sql
344+
--sourceDDL
345+
CREATE TEMPORARY TABLE source_table
346+
(
347+
c_custkey BIGINT
348+
,c_name STRING
349+
,c_address STRING
350+
,c_nationkey INTEGER
351+
,c_phone STRING
352+
,c_acctbal NUMERIC(15, 2)
353+
,c_mktsegment STRING
354+
,c_comment STRING
355+
)
356+
WITH (
357+
'connector' = 'datagen'
358+
,'rows-per-second' = '10000'
359+
,'number-of-rows' = '1000000'
360+
);
361+
362+
--sourceDql
363+
select *, cast('2024-04-21' as DATE) from source_table;
364+
365+
--sinkDDL
366+
CREATE TABLE sink_table
367+
(
368+
c_custkey BIGINT
369+
,c_name STRING
370+
,c_address STRING
371+
,c_nationkey INTEGER
372+
,c_phone STRING
373+
,c_acctbal NUMERIC(15, 2)
374+
,c_mktsegment STRING
375+
,c_comment STRING
376+
,`date` DATE
377+
)
378+
with (
379+
'connector' = 'hologres'
380+
,'dbname' = 'test_db'
381+
,'tablename' = 'test_sink_customer'
382+
,'username' = ''
383+
,'password' = ''
384+
,'endpoint' = ''
385+
,'jdbccopywritemode' = 'true'
386+
,'bulkload' = 'true'
387+
,'target-shards.enabled'='true'
388+
);
389+
390+
```

hologres-connector-examples/hologres-connector-flink-examples/pom.xml

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77

88
<groupId>com.alibaba.hologres</groupId>
99
<artifactId>hologres-connector-flink-examples</artifactId>
10-
<version>1.4.0-SNAPSHOT</version>
10+
<version>1.4.1-SNAPSHOT</version>
1111

1212
<properties>
1313
<java.version>1.8</java.version>
14-
<flink.version>1.15.3</flink.version>
14+
<flink.version>1.15.4</flink.version>
1515
</properties>
1616

1717
<dependencies>
@@ -23,9 +23,14 @@
2323
<dependency>
2424
<groupId>com.alibaba.hologres</groupId>
2525
<artifactId>hologres-connector-flink-1.15</artifactId>
26-
<version>1.4.0-SNAPSHOT</version>
26+
<version>1.4.1-SNAPSHOT</version>
2727
<classifier>jar-with-dependencies</classifier>
2828
</dependency>
29+
<dependency>
30+
<groupId>com.alibaba.hologres</groupId>
31+
<artifactId>holo-client</artifactId>
32+
<version>2.4.0</version>
33+
</dependency>
2934
<dependency>
3035
<groupId>org.apache.flink</groupId>
3136
<artifactId>flink-clients</artifactId>
@@ -79,6 +84,20 @@
7984
<artifactId>flink-table-planner_2.12</artifactId>
8085
<version>${flink.version}</version>
8186
<type>test-jar</type>
87+
<scope>provided</scope>
88+
</dependency>
89+
90+
<dependency>
91+
<groupId>org.slf4j</groupId>
92+
<artifactId>slf4j-api</artifactId>
93+
<version>1.7.25</version>
94+
<scope>provided</scope>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.slf4j</groupId>
98+
<artifactId>slf4j-log4j12</artifactId>
99+
<version>1.7.25</version>
100+
<scope>provided</scope>
82101
</dependency>
83102
</dependencies>
84103

0 commit comments

Comments
 (0)