Skip to content

Commit e047a5f

Browse files
committed
sequence group
1 parent 911cf8e commit e047a5f

File tree

7 files changed

+325
-0
lines changed

7 files changed

+325
-0
lines changed
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
---
2+
title: Merge Engine
3+
tags: Paimon
4+
outline: deep
5+
---
6+
7+
# Merge Engine
8+
9+
相同主键的多条数据,Paimon会根据merge-engine参数对数据进行合并
10+
11+
## Deduplicate
12+
13+
```
14+
'merge-engine' = 'deduplicate' -- deduplicate 是默认值,可以不设置
15+
```
16+
17+
对于多条相同主键的数据,主键表仅会保留最新一条数据。如果最新数据是delete操作,所有对应主键的数据都会被丢弃。
18+
19+
### Deduplicate 案例
20+
21+
```
22+
SET 'execution.runtime-mode' = 'streaming';
23+
SET 'table.exec.sink.upsert-materialize'='NONE';
24+
SET 'execution.checkpointing.interval'='10 s';
25+
set parallelism.default=1;
26+
SET 'sql-client.execution.result-mode' = 'tableau';
27+
CREATE CATALOG paimon WITH (
28+
'type' = 'paimon',
29+
'warehouse' = 'hdfs://mj01:8020/lakehouse'
30+
);
31+
USE CATALOG paimon;
32+
create database if not exists merge_test;
33+
CREATE TABLE if not exists paimon.merge_test.deduplicate_test1(
34+
`id` Int,
35+
`name` String,
36+
`salary` Int,
37+
PRIMARY KEY (id) NOT ENFORCED
38+
) with (
39+
'merge-engine' = 'deduplicate'
40+
);
41+
42+
CREATE TABLE if not exists paimon.merge_test.deduplicate_test2(
43+
`id` Int,
44+
`name` String,
45+
`salary` Int,
46+
PRIMARY KEY (id) NOT ENFORCED
47+
);
48+
49+
insert into paimon.merge_test.deduplicate_test2 select * from paimon.merge_test.deduplicate_test1;
50+
51+
insert into paimon.merge_test.deduplicate_test1 values(1,'flink',1000);
52+
insert into paimon.merge_test.deduplicate_test1 values(1,'flink',2000);
53+
insert into paimon.merge_test.deduplicate_test1 values(1,'flink',500);
54+
```
55+
56+
结果只有最新的一条数据
57+
58+
![](./img/05/deduplicate-result.png)
59+
60+
![](./img/05/deduplicate-result-1.png)
61+
62+
## First-row
63+
64+
```
65+
'merge-engine' = 'first-row'
66+
```
67+
68+
Paimon只会保留相同主键数据中的第一条。与deduplicate合并机制相比,first-row只会产生insert类型的变更数据。
69+
70+
重点注意:
71+
- 下游如果需要流式消费first-row的产生的数据,上游表changelog-producer参数必须设置为 lookup。
72+
- first-row无法处理delete与update_before消息。您可以设置'first-row.ignore-delete' = 'true'以忽略这两类消息。
73+
- first-row不支持changelog-producer input、full-compaction模式
74+
75+
为什么frist-row不支持input、full-compaction模式?
76+
- full-compaction:每一次执行小文件全量合并(full compaction)时,产生完整的变更数据。
77+
- input:直接将输入的消息作为变更数据传递给下游消费者(作为changelog)
78+
input、full-compaction严格意义上与first-row相违背。所以first-row支持none、lookup
79+
80+
### First-row 案例
81+
82+
```
83+
SET 'execution.runtime-mode' = 'streaming';
84+
SET 'table.exec.sink.upsert-materialize'='NONE';
85+
SET 'execution.checkpointing.interval'='10 s';
86+
set parallelism.default=1;
87+
SET 'sql-client.execution.result-mode' = 'tableau';
88+
CREATE CATALOG paimon WITH (
89+
'type' = 'paimon',
90+
'warehouse' = 'hdfs://bigdata01:8020/lakehouse'
91+
);
92+
USE CATALOG paimon;
93+
create database if not exists merge_test;
94+
95+
96+
CREATE TABLE if not exists paimon.merge_test.first_test2(
97+
`id` Int,
98+
`name` String,
99+
`salary` Int,
100+
PRIMARY KEY (id) NOT ENFORCED
101+
);
102+
103+
CREATE TABLE if not exists paimon.merge_test.first_test1(
104+
`id` Int,
105+
`name` String,
106+
`salary` Int,
107+
PRIMARY KEY (id) NOT ENFORCED
108+
) with (
109+
'merge-engine' = 'first-row',
110+
'changelog-producer'='lookup' --none、input、lookup、full-compaction
111+
);
112+
113+
114+
115+
insert into paimon.merge_test.first_test2 select * from paimon.merge_test.first_test1;
116+
117+
insert into paimon.merge_test.first_test1 values(1,'flink',1000);
118+
insert into paimon.merge_test.first_test1 values(1,'flink',2000);
119+
120+
121+
SELECT * FROM paimon.merge_test.first_test1 /*+ OPTIONS('scan.snapshot-id' = '1') */
122+
```
123+
124+
当merge-engine为first-row的时候,不是所有的changelog producer类型都支持。只有none和lookup类型支持,其他都报错。
125+
126+
![](./img/05/first-row-streaming-read.png)
127+
128+
但在流读模式下,none不产生changelog,因此也只能将changelog producer设置为lookup.
129+
130+
结果只有第一条数据
131+
132+
```
133+
![](./img/05/first-row-result.png)
134+
```
135+
136+
## Aggregate
137+
138+
具有相同主键的多条数据,主键表将会根据指定的聚合函数进行聚合。对于不属于主键的每一列,都需要通过fields.{field-name}.aggregate-function指定一个聚合函数,否则该列将默认使用last_non_null_value聚合函数。
139+
140+
支持的聚合函数与对应的数据类型如下:
141+
- sum(求和):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。
142+
- product(求乘积):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。
143+
- count(统计非null值总数):支持INTEGER和BIGINT。
144+
- max(最大值)和min(最小值):CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。
145+
- first_value(返回第一次输入的值)和last_value(返回最新输入的值):支持所有数据类型,包括null。
146+
- first_not_null_value(返回第一次输入的非null值)和last_non_null_value(返回最新输入的非 null 值):支持所有数据类型。
147+
- listagg(将输入的字符串依次用英文逗号连接):支持STRING。
148+
- bool_and和bool_or:支持BOOLEAN。
149+
150+
151+
如果下游需要流式消费aggregation的结果,需要将changelog-producer参数设为input、lookup或full-compaction。
152+
153+
### Aggregate 案例
154+
155+
```
156+
SET 'execution.runtime-mode' = 'streaming';
157+
SET 'table.exec.sink.upsert-materialize'='NONE';
158+
SET 'execution.checkpointing.interval'='10 s';
159+
set parallelism.default=1;
160+
SET 'sql-client.execution.result-mode' = 'tableau';
161+
CREATE CATALOG paimon WITH (
162+
'type' = 'paimon',
163+
'warehouse' = 'hdfs://bigdata01:8020/lakehouse'
164+
);
165+
USE CATALOG paimon;
166+
create database if not exists merge_test;
167+
168+
169+
CREATE TABLE if not exists paimon.merge_test.aggregation_test2(
170+
`id` Int,
171+
`name` String,
172+
`salary` Int,
173+
`sum_cnt` Int,
174+
PRIMARY KEY (id) NOT ENFORCED
175+
);
176+
177+
CREATE TABLE if not exists paimon.merge_test.aggregation_test1(
178+
`id` Int,
179+
`name` String,
180+
`salary` Int,
181+
`sum_cnt` Int,
182+
PRIMARY KEY (id) NOT ENFORCED
183+
) with (
184+
'merge-engine' = 'aggregation',
185+
'fields.salary.aggregate-function' = 'max',
186+
'fields.sum_cnt.aggregate-function' = 'sum',
187+
'changelog-producer' = 'lookup'
188+
);
189+
190+
191+
insert into paimon.merge_test.aggregation_test2
192+
select * from paimon.merge_test.aggregation_test1;
193+
194+
insert into paimon.merge_test.aggregation_test1 values(2,'flink',1000,1000);
195+
insert into paimon.merge_test.aggregation_test1 values(2,'flink',2000,500);
196+
insert into paimon.merge_test.aggregation_test1 values(3,'flink',500,500);
197+
198+
SELECT * FROM paimon.merge_test.aggregation_test1 /*+ OPTIONS('scan.snapshot-id' = '1') */
199+
```
200+
201+
aggregation不支持none类型的changelog producer,其他类型都支持。
202+
![](./img/05/aggregation-none.png)
203+
204+
结果是 salary取最大值2000, sum_cnt求和值2000
205+
206+
## Partial-update
207+
208+
```
209+
'merge-engine' = 'partial-update'
210+
```
211+
设置'merge-engine' = 'partial-update'后,您可以通过多条消息对数据进行逐步更新,并最终得到完整的数据。即具有相同主键的新数据将会覆盖原来的数据,但值为null的列不会进行覆盖。
212+
213+
### Sequence Group 序列组
214+
215+
序列字段可能无法解决具有多个流更新的 partial-update 表的无序问题,在多流更新期间,sequence-field 可能会被另一个流的最新数据覆盖。通过设置Sequence Group为不同列分别指定合并顺序
216+
217+
重点注意:
218+
- 如果下游需要流式消费partial-update的结果,changelog-producer参数设为input、lookup或full-compaction。
219+
- partial-update 无法处理 delete 与 update_before 消息。需要设置'partial-update.ignore-delete' = 'true' 以忽略这两类消息。
220+
221+
部分更新效率部分场景下取代left join等链表(shuffle)
222+
select id,a,null from streaming1 union all select id,null,b from streaming2
223+
224+
### Partial-update 案例1
225+
226+
```
227+
SET 'execution.runtime-mode' = 'streaming';
228+
SET 'table.exec.sink.upsert-materialize'='NONE';
229+
--设置检查点的间隔为1分钟
230+
SET 'execution.checkpointing.interval'='10 s';
231+
set parallelism.default=1;
232+
SET 'sql-client.execution.result-mode' = 'tableau';
233+
CREATE CATALOG paimon WITH (
234+
'type' = 'paimon',
235+
'warehouse' = 'hdfs://bigdata01:8020/lakehouse'
236+
);
237+
USE CATALOG paimon;
238+
create database if not exists merge_test;
239+
CREATE TABLE if not exists paimon.merge_test.part1(
240+
`id` Int,
241+
`name` String,
242+
`salary` BIGINT,
243+
PRIMARY KEY (id) NOT ENFORCED
244+
) with (
245+
'merge-engine' = 'partial-update',
246+
'changelog-producer' = 'lookup'
247+
);
248+
249+
insert into paimon.merge_test.part1 values(1,'flink',CAST(NULL AS INT));
250+
insert into paimon.merge_test.part1 values(1,cast(NULL as STRING),2000);
251+
252+
CREATE TABLE if not exists paimon.merge_test.part1(
253+
`id` Int,
254+
`name1` String,
255+
`name2` String,
256+
`name3` String,
257+
`salary1` BIGINT,
258+
`salary2` BIGINT,
259+
`salary3` BIGINT,
260+
PRIMARY KEY (id,name1,salary1) NOT ENFORCED
261+
) with (
262+
'merge-engine' = 'partial-update',
263+
'changelog-producer' = 'lookup'
264+
);
265+
266+
```
267+
分别插入两次同一主键数据的不同field,最终两次的更新都会到同一条数据上。
268+
269+
![](./img/05/partial-update-result.png)
270+
271+
### Partial-update 案例2 (sequence group)
272+
273+
```
274+
SET 'execution.runtime-mode' = 'streaming';
275+
SET 'table.exec.sink.upsert-materialize'='NONE';
276+
--设置检查点的间隔为1分钟
277+
SET 'execution.checkpointing.interval'='10 s';
278+
set parallelism.default=1;
279+
SET 'sql-client.execution.result-mode' = 'tableau';
280+
CREATE CATALOG paimon WITH (
281+
'type' = 'paimon',
282+
'warehouse' = 'hdfs://bigdata01:8020/lakehouse'
283+
);
284+
USE CATALOG paimon;
285+
create database if not exists merge_test;
286+
CREATE TABLE if not exists paimon.merge_test.part2(
287+
`id` Int,
288+
`name` String,
289+
`sg_1` Int,
290+
`salary` BIGINT,
291+
`sg_2` Int,
292+
PRIMARY KEY (id) NOT ENFORCED
293+
) with (
294+
'merge-engine' = 'partial-update',
295+
'changelog-producer' = 'input',
296+
'fields.sg_1.sequence-group' = 'name',
297+
'fields.sg_2.sequence-group' = 'salary'
298+
);
299+
300+
insert into paimon.merge_test.part2 values(1,'flink',1,1,1);
301+
-- output: +I | 1 | flink | 1 | 1 | 1
302+
insert into paimon.merge_test.part2 values(1,'flink1',0,1,cast(NULL as Int));
303+
-- output: +I | 1 | flink | 1 | 1 | 1
304+
insert into paimon.merge_test.part2 values(1,'flink2',1,2000,1);
305+
-- output: +I | 1 | flink2 | 1 | 2000 | 1
306+
insert into paimon.merge_test.part2 values(1,'flink3',0,3000,0);
307+
-- output: +I | 1 | flink2 | 1 | 2000 | 1
308+
insert into paimon.merge_test.part2 values(1,'flink3',2,3000,2);
309+
-- output: +I | 1 | flink3 | 1 | 3000 | 1
310+
```
311+
312+
如果多个流同时更新一个field,无法保证顺序,partial-update时会产生问题,就是无法判读以哪个为准。
313+
314+
sequence group就是为了保持顺序,如果新的数据的sequence group的值比当前数据的值小,就不会更新数据。
315+
316+
上面的例子,sg_1这一栏被设置为name这一栏的sequence group。第一条数据(name, sg_1)为(flink, 1).
317+
318+
第二条新的数据(name, sg_1)为(flink1, 0), 由于seqneuce group的值0比当前值1小,因此不触发更新。name值依然为flink.
319+
320+
第三条新的数据(name, sg_2)为(flink2, 1), sequence group 为1,不小于当前值,可以触发更新。更新后的数据name变为flink2.
321+
322+
对于设置为salay的sequence group sg_2也是同理更新数据。
323+
324+
325+
164 KB
Loading
163 KB
Loading
53.9 KB
Loading
76.4 KB
Loading
420 KB
Loading
275 KB
Loading

0 commit comments

Comments
 (0)