Skip to content

Commit e7a2205

Browse files
committed
add small impovements
1 parent e436aad commit e7a2205

File tree

1 file changed

+45
-19
lines changed

1 file changed

+45
-19
lines changed

website/docs/engine-flink/deltajoins.md

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
---
2-
sidebar_label: DeltaJoins
2+
sidebar_label: Delta Joins
33
title: Flink Delta Joins
44
sidebar_position: 6
55
---
66

7-
# Delta Join
8-
Begin with Flink 2.1, a new delta join operator was introduced. Compared to traditional streaming join, delta join significantly reduces the required state, effectively alleviating issues associated with large state, such as resource bottlenecks, lengthy checkpoint execution times, and long recovery times during job restarts.
7+
# The Delta Join
8+
Beginning with **Apache Flink 2.1**, a new operator called **Delta Join** was introduced.
9+
Compared to traditional streaming joins, the delta join operator significantly reduces the amount of state that needs to be maintained during execution. This improvement helps mitigate several common issues associated with large state sizes, including:
910

10-
Starting from Fluss version 0.8, streaming join jobs running on Flink 2.1 or higher will be automatically optimized to delta join in applicable scenarios.
11+
- Excessive memory and storage consumption
12+
- Long checkpointing durations
13+
- Extended recovery times after failures or restarts
1114

12-
## Examples
15+
Starting with **Apache Fluss 0.8**, streaming join jobs running on **Flink 2.1 or later** will be automatically optimized into **delta joins** whenever applicable. This optimization happens transparently at query planning time, requiring no manual configuration.
1316

14-
Here is an example of delta join currently supported in Flink 2.1.
17+
## How Delta Join Works
18+
Traditional streaming joins in Flink require maintaining both input sides entirely in state to match updates across streams. Delta join, by contrast, uses a **prefix-based lookup mechanism** that only retains *relevant subsets* of one table’s data in state. This drastically reduces memory pressure and improves performance for many streaming analytics and enrichment workloads.
1519

16-
1. Create two source tables and one sink tables
20+
## Example: Delta Join in Flink 2.1
21+
22+
Below is an example demonstrating a delta join query supported by Flink 2.1.
23+
24+
#### Create Source and Sink Tables
1725

1826
```sql title="Flink SQL"
1927
USE CATALOG fluss_catalog;
@@ -27,6 +35,7 @@ CREATE DATABASE my_db;
2735
USE my_db;
2836
```
2937

38+
#### Create Left Source Table
3039
```sql title="Flink SQL"
3140
CREATE TABLE `fluss_catalog`.`my_db`.`left_src` (
3241
`city_id` INT NOT NULL,
@@ -38,10 +47,10 @@ CREATE TABLE `fluss_catalog`.`my_db`.`left_src` (
3847
'bucket.key' = 'city_id',
3948
-- in Flink 2.1, delta join only support append-only source
4049
'table.merge-engine' = 'first_row'
41-
...)
42-
;
50+
);
4351
```
4452

53+
#### Create Right Source Table
4554
```sql title="Flink SQL"
4655
CREATE TABLE `fluss_catalog`.`my_db`.`right_src` (
4756
`city_id` INT NOT NULL,
@@ -50,35 +59,33 @@ CREATE TABLE `fluss_catalog`.`my_db`.`right_src` (
5059
) WITH (
5160
-- in Flink 2.1, delta join only support append-only source
5261
'table.merge-engine' = 'first_row'
53-
...)
54-
;
62+
);
5563
```
5664

65+
#### Create Sink Table
5766
```sql title="Flink SQL"
5867
CREATE TABLE `fluss_catalog`.`my_db`.`snk` (
5968
`city_id` INT NOT NULL,
6069
`order_id` INT NOT NULL,
6170
`content` VARCHAR NOT NULL,
6271
`city_name` VARCHAR NOT NULL,
6372
PRIMARY KEY (city_id, order_id) NOT ENFORCED
64-
) WITH (...)
65-
;
73+
) WITH (...);
6674
```
6775

68-
2. Explain DML about streaming join
69-
76+
#### Explain the Join Query
7077
```sql title="Flink SQL"
7178
EXPLAIN
7279
INSERT INTO `fluss_catalog`.`my_db`.`snk`
7380
SELECT T1.`city_id`, T1.`order_id`, T1.`content`, T2.`city_name`
7481
FROM `fluss_catalog`.`my_db`.`left_src` T1
7582
Join `fluss_catalog`.`my_db`.`right_src` T2
76-
ON T1.`city_id` = T2.`city_id`
77-
;
83+
ON T1.`city_id` = T2.`city_id`;
7884
```
7985

80-
If you see the plan that includes DeltaJoin as following, it indicates that the optimization has been effective, and the streaming join has been successfully optimized into a delta join.
86+
If the physical plan includes `DeltaJoin`, it indicates that the optimizer has successfully transformed the traditional streaming join into a delta join.
8187

88+
### Example Optimized Execution Plan
8289
```title="Flink Plan"
8390
== Abstract Syntax Tree ==
8491
LogicalSink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_name])
@@ -105,10 +112,29 @@ Sink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_n
105112
+- Exchange(distribution=[hash[city_id]])
106113
+- TableSourceScan(table=[[fluss_catalog, my_db, right_src]], fields=[city_id, city_name])
107114
```
115+
This confirms that the delta join optimization is active.
116+
117+
## Understanding Prefix Keys
118+
A prefix key defines the portion of a table’s primary key that can be used for efficient key-based lookups or index pruning.
119+
120+
In Fluss, the option `'bucket.key' = 'city_id'` specifies that data is organized (or bucketed) by `city_id`. When performing a delta join, this allows Flink to quickly locate and read only the subset of records corresponding to the specific prefix key value, rather than scanning or caching the entire table state.
121+
122+
For example:
123+
- Full primary key: `(city_id, order_id)`
124+
- Prefix key: `city_id`
125+
126+
In this setup:
127+
* The delta join operator uses the prefix key (`city_id`) to retrieve only relevant right-side records matching each left-side event.
128+
* This eliminates the need to hold all records for every city in memory, significantly reducing state size.
129+
130+
Prefix keys thus form the foundation for state-efficient lookups in delta joins, enabling Flink to scale join workloads efficiently even under high throughput.
108131

109132
## Flink Version Support
110133

111-
The work on Delta Join is still ongoing, so the support for more sql patterns that can be optimized into delta join varies across different versions of Flink. More details can be found at [Delta Join](https://issues.apache.org/jira/browse/FLINK-37836).
134+
The delta join feature is still evolving, and its optimization capabilities vary across Flink versions.
135+
136+
Refer to the [Delta Join](https://issues.apache.org/jira/browse/FLINK-37836) for the most up-to-date information.
137+
112138

113139
### Flink 2.1
114140

0 commit comments

Comments
 (0)