Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions docs/data-operate/update/multi-stream-update-for-unique-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
---
{
"title": "Multi-stream Update for Unique Model",
"language": "en"
}
---

## Overview
To ensure concurrent conflict resolution for replace operations, Doris's unique table provides the functionality of updating based on the sequence column. That is, under the same key column, columns of the REPLACE aggregation type will be replaced according to the value of the sequence column. A larger value can replace a smaller one, but not vice versa.
However, in some business scenarios, businesses need to update different columns in the same wide table through two or more data streams. For example, one data stream writes in real-time to update some fields of the table; another data stream performs imports on demand to update other columns of the table. During the update, both stream jobs need to ensure the order of replace operations; moreover, during queries, data from all columns should be accessible for querying.

## Sequence Mapping
To address the above issue, Doris supports the sequence mapping feature. This feature resolves the problem of concurrent updates from multiple streams by specifying the mapping relationship between the columns to be updated and their corresponding sequence columns.

| A | B | C | D | E | s1 | s2 |
|---|---|---|---|---|----|----|

Assuming the above table is all columns of a unique table, where AB are the key columns and CDE are the value columns.
"ABCD" represents data generated by one data stream, and "ABE" represents data generated by another data stream. Both streams need to write to the same table.
However, the timing of data generation and updates for ABCD and ABE is not synchronized (with intervals potentially being very long), making it impractical (or requiring significant cost) to concatenate all column data before writing.

We additionally introduce two columns, s1 and s2, as sequence columns to control the updates of data from the two streams.
s1 is used for version control of data in columns C and D; s2 is used for version control of data in column E. When importing or performing other update operations, the data from the two streams do not interfere with each other. Each stream completes its update operations based on its own sequence columns.


### Usage Example

**1. Creating a table supporting sequence mapping**

Create a table that supports sequence mapping, and specify that the updates of columns c and d depend on the sequence column s1, while the update of column e depends on the sequence column s2.
Sequence columns can be of integer type and time type (DATE, DATETIME), and the type of the column cannot be changed after creation.

```sql
CREATE TABLE `upsert_test` (
`a` bigint(20) NULL COMMENT "",
`b` int(11) NULL COMMENT "",
`c` int(11) NULL COMMENT "",
`d` int(11) NULL COMMENT "",
`e` int(11) NULL COMMENT "",
`s1` int(11) NULL COMMENT "",
`s2` int(11) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`a`, `b`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write"="false",
"light_schema_change"="false",
"replication_num" = "1",
"sequence_mapping.s1" = "c,d",
"sequence_mapping.s2" = "e"
);
```

The table structure is as follows:

```sql
MySQL > desc upsert_test;
+-------+--------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------+------+-------+---------+---------+
| a | bigint | Yes | true | NULL | |
| b | int | Yes | true | NULL | |
| c | int | Yes | false | NULL | REPLACE |
| d | int | Yes | false | NULL | REPLACE |
| e | int | Yes | false | NULL | REPLACE |
| s1 | int | Yes | false | NULL | REPLACE |
| s2 | int | Yes | false | NULL | REPLACE |
+-------+--------+------+-------+---------+---------+
```

**2. Insert & Query Data**

```sql
MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,2,2,2);
Query OK, 1 row affected (0.080 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 2 | 2 | NULL | 2 | NULL |
+------+------+------+------+------+------+------+
1 row in set (0.049 sec)

MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1);
Query OK, 1 row affected (0.048 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 2 | 2 | NULL | 2 | NULL |
+------+------+------+------+------+------+------+
1 row in set (0.021 sec)

MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2);
Query OK, 1 row affected (0.043 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 2 | 2 | 2 | 2 | 2 |
+------+------+------+------+------+------+------+
1 row in set (0.019 sec)

MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,3,3,3);
Query OK, 1 row affected (0.049 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 2 | 3 | 2 |
+------+------+------+------+------+------+------+
1 row in set (0.019 sec)

MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4);
Query OK, 1 row affected (0.050 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | e | s1 | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 5 | 4 | 4 |
+------+------+------+------+------+------+------+
1 row in set (0.019 sec)
```

During the first insertion, since e and s2 are not written, the values read for e and s2 will be null.

During the second insertion, as the value of s1 is smaller than that written in the first insertion, the values of c, d, and s1 will remain unchanged.

During the third insertion, when values for e and s2 are written, all columns will have correct values.

During the fourth insertion, because the value of s1 is greater than the previously written value, c, d, and s1 will all be updated.

During the fifth insertion, since both s1 and s2 values are greater than the previously written values, c, d, s1, e, and s2 will all be updated.

**3. Add/Drop Column**
```sql
CREATE TABLE `upsert_test` (
`a` bigint(20) NULL COMMENT "",
`b` int(11) NULL COMMENT "",
`c` int(11) NULL COMMENT "",
`d` int(11) NULL COMMENT "",
`s1` int(11) NULL COMMENT "",
) ENGINE=OLAP
UNIQUE KEY(`a`, `b`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "false",
"light_schema_change"="false",
"replication_num" = "1",
"sequence_mapping.s1" = "c,d"
);
```

```sql
MySQL > insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1),(1,1,3,3,3),(1,1,2,2,2);
Query OK, 3 rows affected (0.101 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+
| a | b | c | d | s1 |
+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 3 |
+------+------+------+------+------+
1 row in set (0.057 sec)

MySQL > alter table upsert_test add column (e int(11) NULL, s2 bigint) PROPERTIES('sequence_mapping.s2' = 'e');
Query OK, 0 rows affected (0.011 sec)

MySQL > desc upsert_test;
+-------+--------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------+------+-------+---------+---------+
| a | bigint | Yes | true | NULL | |
| b | int | Yes | true | NULL | |
| c | int | Yes | false | NULL | REPLACE |
| d | int | Yes | false | NULL | REPLACE |
| s1 | int | Yes | false | NULL | REPLACE |
| e | int | Yes | false | NULL | REPLACE |
| s2 | bigint | Yes | false | NULL | REPLACE |
+-------+--------+------+-------+---------+---------+
7 rows in set (0.003 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | s1 | e | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 3 | NULL | NULL |
+------+------+------+------+------+------+------+
1 row in set (0.032 sec)

MySQL > insert into upsert_test(a, b, e, s2) values (1,1,2,2);
Query OK, 1 row affected (0.052 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | s1 | e | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 3 | 3 | 3 | 2 | 2 |
+------+------+------+------+------+------+------+
1 row in set (0.020 sec)

MySQL > insert into upsert_test(a, b, c, d, s1,e,s2) values(1,1,5,5,4,5,4);
Query OK, 1 row affected (0.050 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+------+
| a | b | c | d | s1 | e | s2 |
+------+------+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 4 | 5 | 4 |
+------+------+------+------+------+------+------+
1 row in set (0.022 sec)

MySQL > alter table upsert_test drop column e;
Query OK, 0 rows affected (0.006 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+------+
| a | b | c | d | s1 | s2 |
+------+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 4 | 4 |
+------+------+------+------+------+------+
1 row in set (0.026 sec)

MySQL > alter table upsert_test drop column s2;
Query OK, 0 rows affected (0.005 sec)

MySQL > select * from upsert_test;
+------+------+------+------+------+
| a | b | c | d | s1 |
+------+------+------+------+------+
| 1 | 1 | 5 | 5 | 4 |
+------+------+------+------+------+
1 row in set (0.014 sec)
```

### Note

1. Light schema change is not supported temporarily, so column renaming is not possible.

2. Sequence columns can be of integer type and time type (DATE, DATETIME), and the type of the column cannot be changed after creation.

3. There must be no overlap between all mapped columns. For example, column d in the sample cannot be mapped to both s1 and s2.

4. Sequence columns and mapped columns cannot be key columns.

5. The mapping relationship cannot be changed. For example, column d in the sample, which has already been mapped to column s1, cannot be modified to map to column s2.

6. Any added column must be mapped to a certain sequence column.
Loading