Skip to content

Commit 72c0888

Browse files
committed
improve by Jark
1 parent 3445f9a commit 72c0888

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

website/docs/assets/delta_join.jpg

83.4 KB
Loading
Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,21 @@ sidebar_position: 6
55
---
66

77
# The Delta Join
8-
Beginning with **Apache Flink 2.1**, a new operator called **Delta Join** was introduced.
8+
Beginning with **Apache Flink 2.1**, a new operator called [Delta Join](https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin) was introduced.
99
Compared to traditional streaming joins, the delta join operator significantly reduces the amount of state that needs to be maintained during execution. This improvement helps mitigate several common issues associated with large state sizes, including:
1010

1111
- Excessive computing resource and storage consumption
1212
- Long checkpointing durations
1313
- Extended recovery times after failures or restarts
14+
- Heavy state bootstrap overhead after changing job logic
1415

1516
Starting with **Apache Fluss 0.8**, streaming join jobs running on **Flink 2.1 or later** will be automatically optimized into **delta joins** whenever applicable. This optimization happens transparently at query planning time, requiring no manual configuration.
1617

1718
## How Delta Join Works
18-
Traditional streaming joins in Flink require maintaining both input sides entirely in state to match records across streams. Delta join, by contrast, uses a **prefix-based lookup mechanism** to transform the behavior of querying data from the state into querying data from the Fluss source table, thereby avoiding redundant storage of the same data in both the Fluss source table and the state. This drastically reduces state size and improves performance for many streaming analytics and enrichment workloads.
19+
20+
Traditional streaming joins in Flink require maintaining both input sides entirely in state to match records across streams. Delta join, by contrast, uses a **index-key lookup mechanism** to transform the behavior of querying data from the state into querying data from the Fluss source table, thereby avoiding redundant storage of the same data in both the Fluss source table and the state. This drastically reduces state size and improves performance for many streaming analytics and enrichment workloads.
21+
22+
![](../assets/delta_join.jpg)
1923

2024
## Example: Delta Join in Flink 2.1
2125

@@ -70,7 +74,7 @@ CREATE TABLE `fluss_catalog`.`my_db`.`snk` (
7074
`content` VARCHAR NOT NULL,
7175
`city_name` VARCHAR NOT NULL,
7276
PRIMARY KEY (city_id, order_id) NOT ENFORCED
73-
) WITH (...);
77+
);
7478
```
7579

7680
#### Explain the Join Query
@@ -112,14 +116,21 @@ Sink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_n
112116
+- TableSourceScan(table=[[fluss_catalog, my_db, right_src]], fields=[city_id, city_name])
113117
```
114118

115-
## Understanding Prefix Keys
119+
## Understanding Index Keys
120+
121+
Delta Join relies on performing lookups by the join key (e.g., the `city_id` in the above example) on the Fluss source tables. This requires that the Fluss source tables are defined with appropriate index on the join key to enable efficient lookups.
122+
123+
Currently, Fluss only supports to defines a single index key per table, which is also referred to as the **prefix key**. The general secondary index which allows define multiple indexes with arbitrary fields is planned to be supported in the near future releases.
124+
116125
A prefix key defines the portion of a table’s primary key that can be used for efficient key-based lookups or index pruning.
117126

118127
In Fluss, the option `'bucket.key' = 'city_id'` specifies that data is organized (or bucketed) by `city_id`. When performing a delta join, this allows Flink to quickly locate and read only the subset of records corresponding to the specific prefix key value, rather than scanning or caching the entire table state.
119128

120129
For example:
121130
- Full primary key: `(city_id, order_id)`
122-
- Prefix key: `city_id`
131+
- Bucket key: `city_id`
132+
133+
This yields an **index** on the prefix key `city_id`, so that you can perform [Prefix Key Lookup](/docs/engine-flink/lookups/#prefix-lookup) by the `city_id`.
123134

124135
In this setup:
125136
* The delta join operator uses the prefix key (`city_id`) to retrieve only relevant right-side records matching each left-side event.
@@ -129,9 +140,9 @@ Prefix keys thus form the foundation for efficient lookups in delta joins, enabl
129140

130141
## Flink Version Support
131142

132-
The delta join feature is still evolving, and its optimization capabilities vary across Flink versions.
143+
The delta join feature is introduced since Flink 2.1 and still evolving, and its optimization capabilities vary across Flink versions.
133144

134-
Refer to the [Delta Join](https://issues.apache.org/jira/browse/FLINK-37836) for the most up-to-date information.
145+
Refer to the [Delta Join Issue](https://issues.apache.org/jira/browse/FLINK-37836) for the most up-to-date information.
135146

136147

137148
### Flink 2.1
@@ -148,3 +159,11 @@ Refer to the [Delta Join](https://issues.apache.org/jira/browse/FLINK-37836) for
148159
- All join inputs should be INSERT-ONLY streams.
149160
- This is why the option `'table.merge-engine' = 'first_row'` is added to the source table DDL.
150161
- All upstream nodes of the join should be `TableSourceScan` or `Exchange`.
162+
163+
## Future Plan
164+
165+
The Fluss and Flink community are actively working together on enhancing delta join. Planned improvements include:
166+
- Support for additional join types, such as LEFT JOIN and FULL OUTER JOIN.
167+
- Support for multi-way joins involving more than two streams. This also requires Fluss to support defining multiple secondary index keys on a single table.
168+
- Support for more complex query patterns, including Calc and LookupJoin nodes.
169+
- Relaxation of restrictions on input stream types, allowing for changelog streams.

0 commit comments

Comments
 (0)