From be437034c947fe7ea054c51936f5b0a65b59f2f9 Mon Sep 17 00:00:00 2001 From: "zhangshixin.1024" Date: Fri, 10 Oct 2025 10:26:52 +0800 Subject: [PATCH] [doc] Multi-Stream update for unique table --- .../multi-stream-update-for-unique-model.md | 255 ++++++++++++++++++ .../multi-stream-update-for-unique-model.md | 250 +++++++++++++++++ sidebars.json | 3 +- 3 files changed, 507 insertions(+), 1 deletion(-) create mode 100644 docs/data-operate/update/multi-stream-update-for-unique-model.md create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/update/multi-stream-update-for-unique-model.md diff --git a/docs/data-operate/update/multi-stream-update-for-unique-model.md b/docs/data-operate/update/multi-stream-update-for-unique-model.md new file mode 100644 index 0000000000000..f9528a9ba9948 --- /dev/null +++ b/docs/data-operate/update/multi-stream-update-for-unique-model.md @@ -0,0 +1,255 @@ +--- +{ + "title": "Multi-stream Update for Unique Model", + "language": "en" +} +--- + +## Overview +To ensure concurrent conflict resolution for replace operations, Doris's unique table provides the functionality of updating based on the sequence column. That is, under the same key column, columns of the REPLACE aggregation type will be replaced according to the value of the sequence column. A larger value can replace a smaller one, but not vice versa. +However, in some business scenarios, businesses need to update different columns in the same wide table through two or more data streams. For example, one data stream writes in real-time to update some fields of the table; another data stream performs imports on demand to update other columns of the table. During the update, both stream jobs need to ensure the order of replace operations; moreover, during queries, data from all columns should be accessible for querying. + +## Sequence Mapping +To address the above issue, Doris supports the sequence mapping feature. This feature resolves the problem of concurrent updates from multiple streams by specifying the mapping relationship between the columns to be updated and their corresponding sequence columns. + +| A | B | C | D | E | s1 | s2 | +|---|---|---|---|---|----|----| + +Assuming the above table is all columns of a unique table, where AB are the key columns and CDE are the value columns. +"ABCD" represents data generated by one data stream, and "ABE" represents data generated by another data stream. Both streams need to write to the same table. +However, the timing of data generation and updates for ABCD and ABE is not synchronized (with intervals potentially being very long), making it impractical (or requiring significant cost) to concatenate all column data before writing. + +We additionally introduce two columns, s1 and s2, as sequence columns to control the updates of data from the two streams. +s1 is used for version control of data in columns C and D; s2 is used for version control of data in column E. When importing or performing other update operations, the data from the two streams do not interfere with each other. Each stream completes its update operations based on its own sequence columns. + + +### Usage Example + +**1. Creating a table supporting sequence mapping** + +Create a table that supports sequence mapping, and specify that the updates of columns c and d depend on the sequence column s1, while the update of column e depends on the sequence column s2. +Sequence columns can be of integer type and time type (DATE, DATETIME), and the type of the column cannot be changed after creation. + +```sql +CREATE TABLE `upsert_test` ( + `a` bigint(20) NULL COMMENT "", + `b` int(11) NULL COMMENT "", + `c` int(11) NULL COMMENT "", + `d` int(11) NULL COMMENT "", + `e` int(11) NULL COMMENT "", + `s1` int(11) NULL COMMENT "", + `s2` int(11) NULL COMMENT "" +) ENGINE=OLAP +UNIQUE KEY(`a`, `b`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1 +PROPERTIES ( +"enable_unique_key_merge_on_write"="false", +"light_schema_change"="false", +"replication_num" = "1", +"sequence_mapping.s1" = "c,d", +"sequence_mapping.s2" = "e" +); +``` + +The table structure is as follows: + +```sql +MySQL > desc upsert_test; ++-------+--------+------+-------+---------+---------+ +| Field | Type | Null | Key | Default | Extra | ++-------+--------+------+-------+---------+---------+ +| a | bigint | Yes | true | NULL | | +| b | int | Yes | true | NULL | | +| c | int | Yes | false | NULL | REPLACE | +| d | int | Yes | false | NULL | REPLACE | +| e | int | Yes | false | NULL | REPLACE | +| s1 | int | Yes | false | NULL | REPLACE | +| s2 | int | Yes | false | NULL | REPLACE | ++-------+--------+------+-------+---------+---------+ +``` + +**2. Insert & Query Data** + +```sql +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,2,2,2); +Query OK, 1 row affected (0.080 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 2 | 2 | NULL | 2 | NULL | ++------+------+------+------+------+------+------+ +1 row in set (0.049 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1); +Query OK, 1 row affected (0.048 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 2 | 2 | NULL | 2 | NULL | ++------+------+------+------+------+------+------+ +1 row in set (0.021 sec) + +MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2); +Query OK, 1 row affected (0.043 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 2 | 2 | 2 | 2 | 2 | ++------+------+------+------+------+------+------+ +1 row in set (0.019 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,3,3,3); +Query OK, 1 row affected (0.049 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 2 | 3 | 2 | ++------+------+------+------+------+------+------+ +1 row in set (0.019 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4); +Query OK, 1 row affected (0.050 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 5 | 4 | 4 | ++------+------+------+------+------+------+------+ +1 row in set (0.019 sec) +``` + +During the first insertion, since e and s2 are not written, the values read for e and s2 will be null. + +During the second insertion, as the value of s1 is smaller than that written in the first insertion, the values of c, d, and s1 will remain unchanged. + +During the third insertion, when values for e and s2 are written, all columns will have correct values. + +During the fourth insertion, because the value of s1 is greater than the previously written value, c, d, and s1 will all be updated. + +During the fifth insertion, since both s1 and s2 values are greater than the previously written values, c, d, s1, e, and s2 will all be updated. + +**3. Add/Drop Column** +```sql +CREATE TABLE `upsert_test` ( + `a` bigint(20) NULL COMMENT "", + `b` int(11) NULL COMMENT "", + `c` int(11) NULL COMMENT "", + `d` int(11) NULL COMMENT "", + `s1` int(11) NULL COMMENT "", + ) ENGINE=OLAP + UNIQUE KEY(`a`, `b`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "false", + "light_schema_change"="false", + "replication_num" = "1", + "sequence_mapping.s1" = "c,d" + ); +``` + +```sql +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1),(1,1,3,3,3),(1,1,2,2,2); +Query OK, 3 rows affected (0.101 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+ +| a | b | c | d | s1 | ++------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 3 | ++------+------+------+------+------+ +1 row in set (0.057 sec) + +MySQL > alter table upsert_test add column (e int(11) NULL, s2 bigint) PROPERTIES('sequence_mapping.s2' = 'e'); +Query OK, 0 rows affected (0.011 sec) + +MySQL > desc upsert_test; ++-------+--------+------+-------+---------+---------+ +| Field | Type | Null | Key | Default | Extra | ++-------+--------+------+-------+---------+---------+ +| a | bigint | Yes | true | NULL | | +| b | int | Yes | true | NULL | | +| c | int | Yes | false | NULL | REPLACE | +| d | int | Yes | false | NULL | REPLACE | +| s1 | int | Yes | false | NULL | REPLACE | +| e | int | Yes | false | NULL | REPLACE | +| s2 | bigint | Yes | false | NULL | REPLACE | ++-------+--------+------+-------+---------+---------+ +7 rows in set (0.003 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | s1 | e | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 3 | NULL | NULL | ++------+------+------+------+------+------+------+ +1 row in set (0.032 sec) + +MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2); +Query OK, 1 row affected (0.052 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | s1 | e | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 3 | 2 | 2 | ++------+------+------+------+------+------+------+ +1 row in set (0.020 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4); +Query OK, 1 row affected (0.050 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | s1 | e | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 4 | 5 | 4 | ++------+------+------+------+------+------+------+ +1 row in set (0.022 sec) + +MySQL > alter table upsert_test drop column e; +Query OK, 0 rows affected (0.006 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+ +| a | b | c | d | s1 | s2 | ++------+------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 4 | 4 | ++------+------+------+------+------+------+ +1 row in set (0.026 sec) + +MySQL > alter table upsert_test drop column s2; +Query OK, 0 rows affected (0.005 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+ +| a | b | c | d | s1 | ++------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 4 | ++------+------+------+------+------+ +1 row in set (0.014 sec) +``` + +### Note + +1. Light schema change is not supported temporarily, so column renaming is not possible. + +2. Sequence columns can be of integer type and time type (DATE, DATETIME), and the type of the column cannot be changed after creation. + +3. There must be no overlap between all mapped columns. For example, column d in the sample cannot be mapped to both s1 and s2. + +4. Sequence columns and mapped columns cannot be key columns. + +5. The mapping relationship cannot be changed. For example, column d in the sample, which has already been mapped to column s1, cannot be modified to map to column s2. + +6. Any added column must be mapped to a certain sequence column. \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/update/multi-stream-update-for-unique-model.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/update/multi-stream-update-for-unique-model.md new file mode 100644 index 0000000000000..b0f0ca6fafb49 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/update/multi-stream-update-for-unique-model.md @@ -0,0 +1,250 @@ +--- +{ + "title": "主键模型的多流更新", + "language": "zh-CN" +} +--- + +## 概览 +Doris的unique表为了保证了replace的并发冲突,提供了按照sequence列更新的功能。即在相同key列下,replace聚合类型的列将按照sequence列的值进行替换,较大值可以替换较小值,反之则不可以。但在有些业务场景中,业务需要通过两条或者多条数据流,对同一个大宽表中的不同列进行更新。 比如一条数据流会实时写入,更新这张表的部分字段;另一条数据流按需执行导入,更新这张表的其他列。在更新时,这两流作业都需要保证replace的先后顺序;而且在查询时需要对所有列的数据能够进行查询。 + +## sequence mapping +为了解决上述的问题,doris支持了sequence mapping的功能。该功能通过指定更新列对应的sequence column映射关系来解决多流的并发更新问题。 + +| A | B | C | D | E | s1 | s2 | +|---|---|---|---|---|----|----| + +假设上面表格是一个unique table 的所有列,AB是key,CDE是value。 +"ABCD" 是一个数据流产生的数据, "ABE" 是另一个数据流产生的数据, 两个流写要到同一个table上。 +但是ABCD和ABE产生、更新的数据的时机不同步(间隔时间甚至会很长),这就导致在写入数据之前完成所有列数据的拼接变的不太可能(或者需要花费很大的代价)。 + +我们额外加入s1和s2两列,作为sequence column来控制两流数据的更新。 +s1 对C、D 两列的数据进行版本控制;s2 对E列的数据进行版本控制。两流数据在导入或者其他更新操作时,互不干扰。每一流根据自己的sequence columns来完成更新操作。 + +### 使用示例 + +**1. 创建支持sequence mapping的表** + +创建支持sequence mapping的表,并指定c,d列的更新依赖s1列(sequence列),e列的更新依赖s2列(sequence列)。 +sequence列可以为整型和时间类型(DATE、DATETIME),创建后不能更改该列的类型。 +```sql +CREATE TABLE `upsert_test` ( + `a` bigint(20) NULL COMMENT "", + `b` int(11) NULL COMMENT "", + `c` int(11) NULL COMMENT "", + `d` int(11) NULL COMMENT "", + `e` int(11) NULL COMMENT "", + `s1` int(11) NULL COMMENT "", + `s2` int(11) NULL COMMENT "" +) ENGINE=OLAP +UNIQUE KEY(`a`, `b`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1 +PROPERTIES ( +"enable_unique_key_merge_on_write"="false", +"light_schema_change"="false", +"replication_num" = "1", +"sequence_mapping.s1" = "c,d", +"sequence_mapping.s2" = "e" +); +``` + +创建好的表结构如下: +```sql +MySQL > desc upsert_test; ++-------+--------+------+-------+---------+---------+ +| Field | Type | Null | Key | Default | Extra | ++-------+--------+------+-------+---------+---------+ +| a | bigint | Yes | true | NULL | | +| b | int | Yes | true | NULL | | +| c | int | Yes | false | NULL | REPLACE | +| d | int | Yes | false | NULL | REPLACE | +| e | int | Yes | false | NULL | REPLACE | +| s1 | int | Yes | false | NULL | REPLACE | +| s2 | int | Yes | false | NULL | REPLACE | ++-------+--------+------+-------+---------+---------+ +``` + +**2. 插入&查询数据** + +```sql +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,2,2,2); +Query OK, 1 row affected (0.080 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 2 | 2 | NULL | 2 | NULL | ++------+------+------+------+------+------+------+ +1 row in set (0.049 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1); +Query OK, 1 row affected (0.048 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 2 | 2 | NULL | 2 | NULL | ++------+------+------+------+------+------+------+ +1 row in set (0.021 sec) + +MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2); +Query OK, 1 row affected (0.043 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 2 | 2 | 2 | 2 | 2 | ++------+------+------+------+------+------+------+ +1 row in set (0.019 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,3,3,3); +Query OK, 1 row affected (0.049 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 2 | 3 | 2 | ++------+------+------+------+------+------+------+ +1 row in set (0.019 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4); +Query OK, 1 row affected (0.050 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | e | s1 | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 5 | 4 | 4 | ++------+------+------+------+------+------+------+ +1 row in set (0.019 sec) +``` +第一次插入时,由于e和s2没有写入,因此e和s2读出来的值为null。 + +第二次插入时,由于s1的值小于第一次写入的值,因此c,d,s1的值都不会变化。 + +第三次插入时,写入了e和s2的值,所有列都有正确的值。 + +第四次插入时,由于s1的值大于之前写入的值,c,d,s1都被更新。 + +第五次插入时,由于s1和s2都大于之前写入的值,c,d,s1,e,s2都被更新。 + +**3. 添加或删除列** +```sql +CREATE TABLE `upsert_test` ( + `a` bigint(20) NULL COMMENT "", + `b` int(11) NULL COMMENT "", + `c` int(11) NULL COMMENT "", + `d` int(11) NULL COMMENT "", + `s1` int(11) NULL COMMENT "", + ) ENGINE=OLAP + UNIQUE KEY(`a`, `b`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "false", + "light_schema_change"="false", + "replication_num" = "1", + "sequence_mapping.s1" = "c,d" + ); +``` + +```sql +MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1),(1,1,3,3,3),(1,1,2,2,2); +Query OK, 3 rows affected (0.101 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+ +| a | b | c | d | s1 | ++------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 3 | ++------+------+------+------+------+ +1 row in set (0.057 sec) + +MySQL > alter table upsert_test add column (e int(11) NULL, s2 bigint) PROPERTIES('sequence_mapping.s2' = 'e'); +Query OK, 0 rows affected (0.011 sec) + +MySQL > desc upsert_test; ++-------+--------+------+-------+---------+---------+ +| Field | Type | Null | Key | Default | Extra | ++-------+--------+------+-------+---------+---------+ +| a | bigint | Yes | true | NULL | | +| b | int | Yes | true | NULL | | +| c | int | Yes | false | NULL | REPLACE | +| d | int | Yes | false | NULL | REPLACE | +| s1 | int | Yes | false | NULL | REPLACE | +| e | int | Yes | false | NULL | REPLACE | +| s2 | bigint | Yes | false | NULL | REPLACE | ++-------+--------+------+-------+---------+---------+ +7 rows in set (0.003 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | s1 | e | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 3 | NULL | NULL | ++------+------+------+------+------+------+------+ +1 row in set (0.032 sec) + +MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2); +Query OK, 1 row affected (0.052 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | s1 | e | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 3 | 3 | 3 | 2 | 2 | ++------+------+------+------+------+------+------+ +1 row in set (0.020 sec) + +MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4); +Query OK, 1 row affected (0.050 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+------+ +| a | b | c | d | s1 | e | s2 | ++------+------+------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 4 | 5 | 4 | ++------+------+------+------+------+------+------+ +1 row in set (0.022 sec) + +MySQL > alter table upsert_test drop column e; +Query OK, 0 rows affected (0.006 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+------+ +| a | b | c | d | s1 | s2 | ++------+------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 4 | 4 | ++------+------+------+------+------+------+ +1 row in set (0.026 sec) + +MySQL > alter table upsert_test drop column s2; +Query OK, 0 rows affected (0.005 sec) + +MySQL > select * from upsert_test; ++------+------+------+------+------+ +| a | b | c | d | s1 | ++------+------+------+------+------+ +| 1 | 1 | 5 | 5 | 4 | ++------+------+------+------+------+ +1 row in set (0.014 sec) +``` + +### 注意 + +1. 暂时不支持light schema change,因此无法rename列 + +2. sequence列可以为整型和时间类型(DATE、DATETIME),创建后不能更改该列的类型 + +3. 所有的mapping列之间不能重叠,比如样例中的d列不能既映射到s1又映射到s2 + +4. sequence列和mapping列不能为key列 + +5. 无法更改映射关系,比如样例中的d列已经映射到s1列,无法修改映射到s2列 + +6. 添加的列一定需要映射到某个sequence列 \ No newline at end of file diff --git a/sidebars.json b/sidebars.json index 62451e92f32b1..a6333d2eddd4f 100644 --- a/sidebars.json +++ b/sidebars.json @@ -237,7 +237,8 @@ "data-operate/update/unique-update", "data-operate/update/update-of-unique-model", "data-operate/update/update-of-aggregate-model", - "data-operate/update/unique-update-concurrent-control" + "data-operate/update/unique-update-concurrent-control", + "data-operate/update/multi-stream-update-for-unique-model" ] }, {