Skip to content

Commit 0626b16

Browse files
authored
[docs] Add document for Prefix Lookup (#504)
1 parent 73d1e0f commit 0626b16

File tree

3 files changed

+194
-13
lines changed

3 files changed

+194
-13
lines changed

website/docs/engine-flink/ddl.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ DROP DATABASE my_db;
5858

5959
### PrimaryKey Table
6060

61-
The following SQL statement will create a [PrimaryKey Table](table-design/table-types/pk-table.md) with a primary key consisting of shop_id and user_id.
61+
The following SQL statement will create a [PrimaryKey Table](table-design/table-types/pk-table/index.md) with a primary key consisting of shop_id and user_id.
6262
```sql title="Flink SQL"
6363
CREATE TABLE my_pk_table (
6464
shop_id BIGINT,

website/docs/engine-flink/lookups.md

Lines changed: 177 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ sidebar_position: 5
77
Flink lookup joins are important because they enable efficient, real-time enrichment of streaming data with reference data, a common requirement in many real-time analytics and processing scenarios.
88

99

10-
## Instructions
11-
- Use a primary key table as a dimension table, and the join condition must include all primary keys of the dimension table.
10+
## Lookup
11+
12+
### Instructions
13+
- Use a primary key table as a dimension table, and the join condition must include all primary keys of the dimension table.
1214
- Fluss lookup join is in asynchronous mode by default for higher throughput. You can change the mode of lookup join as synchronous mode by setting the SQL Hint `'lookup.async' = 'false'`.
1315

14-
## Examples
16+
### Examples
1517
1. Create two tables.
1618
```sql title="Flink SQL"
1719
CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
@@ -24,6 +26,7 @@ CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
2426
`o_clerk` CHAR(15) NOT NULL,
2527
`o_shippriority` INT NOT NULL,
2628
`o_comment` STRING NOT NULL,
29+
`o_dt` STRING NOT NULL,
2730
PRIMARY KEY (o_orderkey) NOT ENFORCED
2831
);
2932
```
@@ -83,7 +86,45 @@ FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
8386
ON `o`.`o_custkey` = `c`.`c_custkey`;
8487
```
8588

86-
## Lookup Options
89+
### Examples (Partitioned Table)
90+
91+
Continuing from the previous example, if our dimension table is a Fluss partitioned primary key table, as follows:
92+
93+
```sql title="Flink SQL"
94+
CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned` (
95+
`c_custkey` INT NOT NULL,
96+
`c_name` STRING NOT NULL,
97+
`c_address` STRING NOT NULL,
98+
`c_nationkey` INT NOT NULL,
99+
`c_phone` CHAR(15) NOT NULL,
100+
`c_acctbal` DECIMAL(15, 2) NOT NULL,
101+
`c_mktsegment` CHAR(10) NOT NULL,
102+
`c_comment` STRING NOT NULL,
103+
`dt` STRING NOT NULL,
104+
PRIMARY KEY (`c_custkey`, `dt`) NOT ENFORCED
105+
)
106+
PARTITIONED BY (`dt`)
107+
WITH (
108+
'table.auto-partition.enabled' = 'true',
109+
'table.auto-partition.time-unit' = 'year'
110+
);
111+
```
112+
113+
To do a lookup join with the Fluss partitioned primary key table, we need to specify the
114+
primary keys (including partition key) in the join condition.
115+
```sql title="Flink SQL"
116+
INSERT INTO lookup_join_sink
117+
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
118+
FROM
119+
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
120+
LEFT JOIN `customer_partitioned`
121+
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
122+
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;
123+
```
124+
125+
For more details about Fluss partitioned table, see [Partitioned Tables](/docs/table-design/data-distribution/partitioning.md).
126+
127+
### Lookup Options
87128

88129

89130
| Option | Type | Required | Default | Description |
@@ -94,3 +135,135 @@ ON `o`.`o_custkey` = `c`.`c_custkey`;
94135
| lookup.partial-cache.expire-after-write | Duration | optional | (none) | Duration to expire an entry in the cache after writing. |
95136
| lookup.partial-cache.cache-missing-key | Boolean | optional | true | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. |
96137
| lookup.partial-cache.max-rows | Long | optional | true | The maximum number of rows to store in the cache. |
138+
139+
140+
## Prefix Lookup
141+
142+
### Instructions
143+
144+
- Use a primary key table as a dimension table, and the join condition must a prefix subset of the primary keys of the dimension table.
145+
- The bucket key of Fluss dimension table need to set as the join key when creating Fluss table.
146+
- Fluss prefix lookup join is in asynchronous mode by default for higher throughput. You can change the mode of prefix lookup join as synchronous mode by setting the SQL Hint `'lookup.async' = 'false'`.
147+
148+
149+
### Examples
150+
1. Create two tables.
151+
```sql title="Flink SQL"
152+
CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
153+
`o_orderkey` INT NOT NULL,
154+
`o_custkey` INT NOT NULL,
155+
`o_orderstatus` CHAR(1) NOT NULL,
156+
`o_totalprice` DECIMAL(15, 2) NOT NULL,
157+
`o_orderdate` DATE NOT NULL,
158+
`o_orderpriority` CHAR(15) NOT NULL,
159+
`o_clerk` CHAR(15) NOT NULL,
160+
`o_shippriority` INT NOT NULL,
161+
`o_comment` STRING NOT NULL,
162+
`o_dt` STRING NOT NULL,
163+
PRIMARY KEY (o_orderkey) NOT ENFORCED
164+
);
165+
```
166+
167+
```sql title="Flink SQL"
168+
-- primary keys are (c_custkey, c_nationkey)
169+
-- bucket key is (c_custkey)
170+
CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
171+
`c_custkey` INT NOT NULL,
172+
`c_name` STRING NOT NULL,
173+
`c_address` STRING NOT NULL,
174+
`c_nationkey` INT NOT NULL,
175+
`c_phone` CHAR(15) NOT NULL,
176+
`c_acctbal` DECIMAL(15, 2) NOT NULL,
177+
`c_mktsegment` CHAR(10) NOT NULL,
178+
`c_comment` STRING NOT NULL,
179+
PRIMARY KEY (`c_custkey`, `c_nationkey`) NOT ENFORCED
180+
) WITH (
181+
'bucket.key' = 'c_custkey'
182+
);
183+
```
184+
185+
2. Perform prefix lookup.
186+
```sql title="Flink SQL"
187+
USE CATALOG fluss_catalog;
188+
```
189+
190+
```sql title="Flink SQL"
191+
USE my_db;
192+
```
193+
194+
```sql title="Flink SQL"
195+
CREATE TEMPORARY TABLE lookup_join_sink
196+
(
197+
order_key INT NOT NULL,
198+
order_totalprice DECIMAL(15, 2) NOT NULL,
199+
customer_name STRING NOT NULL,
200+
customer_address STRING NOT NULL
201+
) WITH ('connector' = 'blackhole');
202+
```
203+
204+
```sql title="Flink SQL"
205+
-- prefix look up join in asynchronous mode.
206+
INSERT INTO lookup_join_sink
207+
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
208+
FROM
209+
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
210+
LEFT JOIN `customer`
211+
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
212+
ON `o`.`o_custkey` = `c`.`c_custkey`;
213+
214+
-- join key is a prefix set of dimension table primary keys.
215+
```
216+
217+
```sql title="Flink SQL"
218+
-- prefix look up join in synchronous mode.
219+
INSERT INTO lookup_join_sink
220+
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
221+
FROM
222+
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
223+
LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */
224+
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
225+
ON `o`.`o_custkey` = `c`.`c_custkey`;
226+
```
227+
228+
### Examples (Partitioned Table)
229+
230+
Continuing from the previous prefix lookup example, if our dimension table is a Fluss partitioned primary key table, as follows:
231+
232+
```sql title="Flink SQL"
233+
-- primary keys are (c_custkey, c_nationkey, dt)
234+
-- bucket key is (c_custkey)
235+
CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned` (
236+
`c_custkey` INT NOT NULL,
237+
`c_name` STRING NOT NULL,
238+
`c_address` STRING NOT NULL,
239+
`c_nationkey` INT NOT NULL,
240+
`c_phone` CHAR(15) NOT NULL,
241+
`c_acctbal` DECIMAL(15, 2) NOT NULL,
242+
`c_mktsegment` CHAR(10) NOT NULL,
243+
`c_comment` STRING NOT NULL,
244+
`dt` STRING NOT NULL,
245+
PRIMARY KEY (`c_custkey`, `c_nationkey`, `dt`) NOT ENFORCED
246+
)
247+
PARTITIONED BY (`dt`)
248+
WITH (
249+
'bucket.key' = 'c_custkey',
250+
'table.auto-partition.enabled' = 'true',
251+
'table.auto-partition.time-unit' = 'year'
252+
);
253+
```
254+
255+
To do a prefix lookup with the Fluss partitioned primary key table, the prefix lookup join key is in pattern of
256+
`a prefix subset of primary keys (excluding partition key)` + `partition key`.
257+
```sql title="Flink SQL"
258+
INSERT INTO lookup_join_sink
259+
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
260+
FROM
261+
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
262+
LEFT JOIN `customer_partitioned`
263+
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
264+
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;
265+
266+
-- join key is a prefix set of dimension table primary keys (excluding partition key) + partition key.
267+
```
268+
269+
For more details about Fluss partitioned table, see [Partitioned Tables](/docs/table-design/data-distribution/partitioning.md).

website/docs/table-design/table-types/pk-table/index.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,6 @@ The following merge engines are supported:
8585
1. [FirstRow Merge Engine](/docs/table-design/table-types/pk-table/merge-engines/first-row)
8686
2. [Versioned Merge Engine](/docs/table-design/table-types/pk-table/merge-engines/versioned)
8787

88-
## Data Queries
89-
90-
For primary key tables, Fluss supports querying data directly based on the key. Please refer to
91-
the [Flink Reads](../../../engine-flink/reads.md) for detailed instructions.
9288

9389
## Changelog Generation
9490

@@ -117,10 +113,22 @@ be generated.
117113
-D(1, 4.0, 'banana')
118114
```
119115

120-
## Data Consumption
116+
## Data Queries
117+
118+
For primary key tables, Fluss supports various kinds of querying abilities.
119+
120+
### Reads
121121

122-
For a primary key table, the default consumption method is a full snapshot followed by incremental data. First, the
122+
For a primary key table, the default read method is a full snapshot followed by incremental data. First, the
123123
snapshot data of the table is consumed, followed by the binlog data of the table.
124124

125-
It is also possible to only consume the binlog data of the table. For more details, please refer to
126-
the [Flink Reads](../../../engine-flink/reads.md)
125+
It is also possible to only consume the binlog data of the table. For more details, please refer to the [Flink Reads](/docs/engine-flink/reads.md)
126+
127+
### Lookup
128+
129+
Fluss primary key table can lookup data by the primary keys. If the key exists in Fluss, lookup will return a unique row. it always used in [Flink Lookup Join](/docs/engine-flink//lookups.md#lookup).
130+
131+
### Prefix Lookup
132+
133+
Fluss primary key table can also do prefix lookup by the prefix subset primary keys. Unlike lookup, prefix lookup
134+
will scan data based on the prefix of primary keys and may return multiple rows. It always used in [Flink Prefix Lookup Join](/docs/engine-flink/lookups.md#prefix-lookup).

0 commit comments

Comments
 (0)