|
| 1 | +--- |
| 2 | +title: Paimon tables |
| 3 | +tags: Paimon |
| 4 | +outline: deep |
| 5 | +--- |
| 6 | + |
| 7 | +# 主键表和Append Only表 |
| 8 | + |
| 9 | +## 主键表 |
| 10 | +Paimon主键(Primary Key)表:表中每一行数据都有一个唯一主键,用来表示唯一的一行数据。 |
| 11 | + |
| 12 | +``` |
| 13 | +CREATE TABLE if not exists paimon.test.bucket_num ( |
| 14 | + `id` Int, |
| 15 | + `name` String, |
| 16 | + `age` Int, |
| 17 | + `dt` string, |
| 18 | + PRIMARY KEY (id,dt) NOT ENFORCED |
| 19 | +) PARTITIONED BY (dt) with ( |
| 20 | +); |
| 21 | +``` |
| 22 | + |
| 23 | +### 分桶方式 |
| 24 | +Bucket 桶是Paimon表读写操作的最小单元。非分区、分区的数据都会写入到对应的桶中。 |
| 25 | + |
| 26 | +创建Paimon主键表时,在WITH参数中指定'bucket' = '<num>' |
| 27 | +1. '<num>' 为2、3正数的则是固定桶 |
| 28 | +2. '<num>' 值为 -1,或者不写buckent=…… 则表示动态桶 |
| 29 | + |
| 30 | +``` |
| 31 | +CREATE TABLE if not exists paimon.test.bucket_num ( |
| 32 | + `id` Int PRIMARY KEY NOT ENFORCED, |
| 33 | + `name` String, |
| 34 | + `age` Int, |
| 35 | + `dt` string, |
| 36 | + PRIMARY KEY (id,dt) NOT ENFORCED |
| 37 | +) PARTITIONED BY (dt) with ( |
| 38 | + 'bucket' = '2', -- bucket=4 固定分桶、'bucket' = '-1' 动态分桶 |
| 39 | + 'merge-engine' = 'deduplicate', -- deduplicate 是默认值,可以不设置,相同的主键数据,保留最新的 |
| 40 | + 'file.format'='avro' --格式 parquet、orc、avro |
| 41 | +); |
| 42 | +``` |
| 43 | + |
| 44 | +### 固定桶主键表 Fixed Bucket |
| 45 | + |
| 46 | +1. 有分区键的情况下 主键字段必须包括分区字段 。 |
| 47 | +2. Bucket 个数会影响并发度,影响性能,建议每个分桶的数据大小在2 GB左右,最大不超过5 GB。 |
| 48 | + |
| 49 | +``` |
| 50 | +CREATE TABLE if not exists paimon.test.bucket2 ( |
| 51 | + id bigint, |
| 52 | + name String, |
| 53 | + age Int, |
| 54 | + dt string, |
| 55 | + PRIMARY KEY (id,dt) NOT ENFORCED |
| 56 | +) PARTITIONED BY (dt) with ( |
| 57 | + 'bucket' = '2', |
| 58 | + 'file.format'='avro', |
| 59 | + 'sink.parallelism' = '2' |
| 60 | +); |
| 61 | +``` |
| 62 | + |
| 63 | +注意:分桶数限制了实际工作的作业并发数,单个分桶内数据总量太大可能导致读写性能的降低。 |
| 64 | + |
| 65 | +**假如有多个作业(insert into)如何支持写入一张表?** |
| 66 | + |
| 67 | +如果要支持多个insert into table select …… 写入到相同的一张表 |
| 68 | +设置参数 'write-only'='true' (单独启动一个Dedicated Compaction Job) |
| 69 | +否则会报错:Conflicts during commits. Multiple jobs are writing into the same partition at the same time. |
| 70 | + |
| 71 | +``` |
| 72 | + 'write-only'='true' --取决于是否多个任务写一张表 |
| 73 | +
|
| 74 | +CREATE TABLE if not exists paimon.test.bucket2 ( |
| 75 | + id bigint, |
| 76 | + name String, |
| 77 | + age Int, |
| 78 | + dt string, |
| 79 | + PRIMARY KEY (id,dt) NOT ENFORCED |
| 80 | +) PARTITIONED BY (dt) with ( |
| 81 | + 'bucket' = '2', |
| 82 | + 'file.format'='avro', |
| 83 | + 'sink.parallelism' = '2', |
| 84 | + 'write-only'='true' |
| 85 | +); |
| 86 | +
|
| 87 | +// two job |
| 88 | +======================================== |
| 89 | +insert into paimon.test.bucket2 select id,name,age,date_format(CURRENT_TIMESTAMP,'yyyyMMdd') from default_catalog.default_database.datagen1 ; |
| 90 | +
|
| 91 | +insert into paimon.test.bucket2 select id,name,age,date_format(CURRENT_TIMESTAMP,'yyyyMMdd') from default_catalog.default_database.datagen1 ; |
| 92 | +
|
| 93 | +``` |
| 94 | + |
| 95 | +### 动态分桶主键表 Dynamic Bucket |
| 96 | + |
| 97 | +注意:动态分桶表的主键可以包含分区字段也可以包含不分区字段。 |
| 98 | + |
| 99 | +Paimon will automatically expand the number of buckets. |
| 100 | +- Option1: 'dynamic-bucket.target-row-num': controls the target row number for one bucket. |
| 101 | +- Option2: 'dynamic-bucket.initial-buckets': controls the number of initialized bucket. |
| 102 | +- Option3: 'dynamic-bucket.max-buckets': controls the number of max buckets. |
| 103 | + |
| 104 | +#### 主键包括分区键 |
| 105 | + |
| 106 | +``` |
| 107 | +CREATE TABLE if not exists paimon.test.bucket2 ( |
| 108 | + id bigint, |
| 109 | + name String, |
| 110 | + age Int, |
| 111 | + dt string, |
| 112 | + PRIMARY KEY (id,dt) NOT ENFORCED |
| 113 | +) PARTITIONED BY (dt) with ( |
| 114 | + 'bucket' = '-1', |
| 115 | + 'merge-engine' = 'deduplicate', |
| 116 | + 'file.format'='avro', |
| 117 | + 'sink.parallelism' = '2' |
| 118 | +); |
| 119 | +``` |
| 120 | + |
| 121 | +paimon表可以确定该主键属于哪个分区,但是确定不来属于哪个分桶。需要额外的堆内存创建**索引**,以维护主键与分桶编号的映射关系。 |
| 122 | + |
| 123 | +主键完全包含分区键的动态分桶表,Paimon可以确定该主键属于哪个分区,无法确定属于哪个分桶,因此需要使用额外的堆内存创建索引(index),维护主键与分桶编号的映射关系。 |
| 124 | + |
| 125 | +具体来说,每1亿条主键将额外消耗1 GB的堆内存。只有当前正在写入的分区才会消耗堆内存,历史分区中的主键不会消耗堆内存。 |
| 126 | + |
| 127 | +``` |
| 128 | +CREATE TABLE if not exists paimon.test.bucket2 ( |
| 129 | + id bigint, |
| 130 | + name String, |
| 131 | + age Int, |
| 132 | + dt string, |
| 133 | + PRIMARY KEY (id,dt) NOT ENFORCED |
| 134 | +) PARTITIONED BY (dt) with ( |
| 135 | + 'bucket' = '-1', |
| 136 | + 'file.format'='avro', |
| 137 | + 'sink.parallelism' = '2' |
| 138 | +); |
| 139 | +
|
| 140 | +insert into paimon.test.bucket2 values |
| 141 | +(1,'zhangsan',18,'2023-01-01'), |
| 142 | +(1,'zhangsan',18,'2023-01-02'), |
| 143 | +(1,'zhangsan',18,'2023-01-03'); |
| 144 | +
|
| 145 | +// 查询返回3条数据 |
| 146 | +``` |
| 147 | + |
| 148 | +#### 主键不包括分区键 |
| 149 | + |
| 150 | +``` |
| 151 | +CREATE TABLE if not exists paimon.test.bucket2 ( |
| 152 | + id bigint, |
| 153 | + name String, |
| 154 | + age Int, |
| 155 | + dt string, |
| 156 | + PRIMARY KEY (id) NOT ENFORCED |
| 157 | +) PARTITIONED BY (dt) with ( |
| 158 | + 'bucket' = '-1', |
| 159 | + 'merge-engine' = 'deduplicate', |
| 160 | + 'file.format'='avro', |
| 161 | + 'sink.parallelism' = '2' |
| 162 | +); |
| 163 | +``` |
| 164 | + |
| 165 | +1. 如果主键不包含分区键,Paimon无法根据主键确定该数据属于哪个分区的哪个分桶,使用RocksDB维护主键与分区以及分桶编号的映射关系 |
| 166 | +2. 对性能会造成明显影响,1.维护映射关系。2.每次作业启动的时候,需要将映射关系全量加到RocksDB中,作业启动也会相对变慢。 |
| 167 | + |
| 168 | +``` |
| 169 | +CREATE TABLE if not exists paimon.test.bucket2 ( |
| 170 | + id bigint, |
| 171 | + name String, |
| 172 | + age Int, |
| 173 | + dt string, |
| 174 | + PRIMARY KEY (id) NOT ENFORCED |
| 175 | +) PARTITIONED BY (dt) with ( |
| 176 | + 'bucket' = '-1', |
| 177 | + 'file.format'='avro', |
| 178 | + 'sink.parallelism' = '2' |
| 179 | +); |
| 180 | +
|
| 181 | +insert into paimon.test.bucket2 values |
| 182 | +(1,'zhangsan',18,'2023-01-01'), |
| 183 | +(1,'zhangsan',18,'2023-01-02'), |
| 184 | +(1,'zhangsan',18,'2023-01-03'); |
| 185 | +
|
| 186 | +// 查询返回一条数据,因为主键id一样 |
| 187 | +
|
| 188 | +CREATE TABLE my_table (id bigint,product_id BIGINT,price DOUBLE,sales BIGINT) |
| 189 | +PARTITIONED BY (id) WITH ('bucket' = '3','bucket-key' = 'product_id'); |
| 190 | +``` |
| 191 | + |
| 192 | +#### 主键表 fixed bucket,dynamic bucket |
| 193 | + |
| 194 | +固定桶:不支持跨分区更新(因为分区键必须包含在主键中,直接通过hash确定数据在哪个桶) |
| 195 | + |
| 196 | +动态桶(主键不包括分区字段):支持跨分区更新 |
| 197 | +动态桶(主键包括分区字段): 可以确定分区,不支持跨分区更新 |
| 198 | + |
| 199 | +#### 桶的更新 |
| 200 | + |
| 201 | +1. 固定桶支持动态调整桶的大小。 |
| 202 | +2. 当分桶的数据量超过限制时,再自动创建新的分桶。创建新桶的条件 |
| 203 | +- dynamic-bucket.target-row-num:每个分桶最多存储几条数据。默认值为2000000。 |
| 204 | +- dynamic-bucket.initial-buckets:初始的分桶数。如果不设置,初始将会创建等同于writer算子并发数的分桶。 |
| 205 | +- dynamic-bucket.max-buckets: 最大分桶数。 |
| 206 | + |
| 207 | +## Append Only表(非主键表) |
| 208 | + |
| 209 | +如果在创建Paimon表时没有指定主键(Primary Key),则该表就是Paimon Append Only表。只能以流式方式将完整记录插入到表中,适合不需要流式更新的场景(例如日志数据同步)。 |
| 210 | + |
| 211 | +两种模式:Scalable 表 与 Queue 表。 |
| 212 | + |
| 213 | +``` |
| 214 | +0.9.0新特性 |
| 215 | +Append 表的删改支持:此版本引入了 Append 的 DELETE & UPDATE & MERGEINTO 支持,你可以通过 Spark SQL 来删改 Append 表,并且它还支持 Deletion Vectors 模式 |
| 216 | +``` |
| 217 | + |
| 218 | +### Scalable表 |
| 219 | + |
| 220 | +定义 bucket 为 -1,且没有主键时,就是一张增强的 Hive 表,没有桶的概念 (数据会放到 bucket-0 目录中,桶是被忽略的,所有的读写并没有并发限制),支持批写批读,支持流写流读,只是它的流读会有一部分乱序 (并不是完全的输入顺序)。 |
| 221 | + |
| 222 | +注意:适合对数据的流式消费顺序没有需求场景。 |
| 223 | + |
| 224 | +``` |
| 225 | +CREATE TABLE if not exists paimon.test.bucket2( |
| 226 | + id bigint, |
| 227 | + name String, |
| 228 | + age Int, |
| 229 | + dt string |
| 230 | +) PARTITIONED BY (dt) with ( |
| 231 | + 'bucket' = '-1' |
| 232 | +); |
| 233 | +
|
| 234 | +insert into paimon.test.bucket2 values |
| 235 | +(1,'zhangsan',18,'2023-01-01'), |
| 236 | +(1,'zhangsan',18,'2023-01-02'), |
| 237 | +(1,'zhangsan',18,'2023-01-03'); |
| 238 | +
|
| 239 | +``` |
| 240 | + |
| 241 | +### Queue表 |
| 242 | + |
| 243 | +作为消息队列具有分钟级延迟的替代。Paimon表的分桶数此时相当于Kafka的Partition数。 |
| 244 | +数据在每个bucket里面默认有序 |
| 245 | +``` |
| 246 | +CREATE TABLE if not exists paimon.test.bucket2 ( |
| 247 | + id bigint, |
| 248 | + name String, |
| 249 | + age Int, |
| 250 | + dt string |
| 251 | +) with ( |
| 252 | + 'bucket' = '5', |
| 253 | + 'bucket-key' = 'id' |
| 254 | +); |
| 255 | +
|
| 256 | +insert into paimon.test.bucket2 values |
| 257 | +(1,'zhangsan',18,'2023-01-01'), |
| 258 | +(2,'zhangsan',18,'2023-01-01'), |
| 259 | +(3,'zhangsan',18,'2023-01-02'), |
| 260 | +(3,'zhangsan',18,'2023-01-02'), |
| 261 | +(4,'zhangsan',18,'2023-01-02'), |
| 262 | +(5,'zhangsan',18,'2023-01-02'), |
| 263 | +(6,'zhangsan',18,'2023-01-02'), |
| 264 | +(7,'zhangsan',18,'2023-01-02'), |
| 265 | +(8,'zhangsan',18,'2023-01-03'); |
| 266 | +
|
| 267 | +// 会创建出5个桶 |
| 268 | +``` |
| 269 | + |
| 270 | + |
| 271 | +### Scalable表 vs Queue表 |
| 272 | + |
| 273 | +#### 数据分发 |
| 274 | + |
| 275 | +Scalable :没有桶的概念,无需考虑数据顺序、无需对数据进行hash partitioning,多个并发可以同时写同一个分区,Scalable 表写入速度更快。 |
| 276 | + |
| 277 | +Queue :默认情况下,Paimon将根据每条数据所有列的取值,确定该数据属于哪个分桶(bucket)。也可以在创建Paimon表时,在WITH参数中指定bucket-key参数,不同列的名称用英文逗号分隔。例如,设置'bucket-key' = 'c1,c2',则Paimon将根据每条数据c1和c2两列的值,确定该数据属于哪个分桶。 |
| 278 | + |
| 279 | +#### 数据消费顺序 |
| 280 | + |
| 281 | +Scalable :不能保证数据的消费顺序和写入顺序,适合对数据的流式消费顺序没有需求场景。 |
| 282 | + |
| 283 | +Queue :表可以保证流式消费Paimon表时,每个分桶中数据的消费顺序与数据写入Paimon表的顺序一致。具体来说: |
| 284 | + |
| 285 | +- 如果表参数中设置了'scan.plan-sort-partition' = 'true',则分区内值更小的数据会首先产出。 |
| 286 | +- 如果表参数中未设置'scan.plan-sort-partition' = 'true',则分区内创建时间更早的数据会首先产出,先进先出。 |
| 287 | +- 对于两条来自相同分区的相同分桶的数据,先写入Paimon表的数据会首先产出。 |
| 288 | +- 对于两条来自相同分区但不同分桶的数据,由于不同分桶可能被不同的Flink作业并发处理,因此不保证两条数据的消费顺序。 |
| 289 | + |
0 commit comments