Skip to content

Commit 3445f9a

Browse files
committed
a few minor adjustments
1 parent e7a2205 commit 3445f9a

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

website/docs/engine-flink/deltajoins.md

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ sidebar_position: 6
88
Beginning with **Apache Flink 2.1**, a new operator called **Delta Join** was introduced.
99
Compared to traditional streaming joins, the delta join operator significantly reduces the amount of state that needs to be maintained during execution. This improvement helps mitigate several common issues associated with large state sizes, including:
1010

11-
- Excessive memory and storage consumption
11+
- Excessive computing resource and storage consumption
1212
- Long checkpointing durations
1313
- Extended recovery times after failures or restarts
1414

1515
Starting with **Apache Fluss 0.8**, streaming join jobs running on **Flink 2.1 or later** will be automatically optimized into **delta joins** whenever applicable. This optimization happens transparently at query planning time, requiring no manual configuration.
1616

1717
## How Delta Join Works
18-
Traditional streaming joins in Flink require maintaining both input sides entirely in state to match updates across streams. Delta join, by contrast, uses a **prefix-based lookup mechanism** that only retains *relevant subsets* of one table’s data in state. This drastically reduces memory pressure and improves performance for many streaming analytics and enrichment workloads.
18+
Traditional streaming joins in Flink require maintaining both input sides entirely in state to match records across streams. Delta join, by contrast, uses a **prefix-based lookup mechanism** to transform the behavior of querying data from the state into querying data from the Fluss source table, thereby avoiding redundant storage of the same data in both the Fluss source table and the state. This drastically reduces state size and improves performance for many streaming analytics and enrichment workloads.
1919

2020
## Example: Delta Join in Flink 2.1
2121

@@ -43,7 +43,7 @@ CREATE TABLE `fluss_catalog`.`my_db`.`left_src` (
4343
`content` VARCHAR NOT NULL,
4444
PRIMARY KEY (city_id, order_id) NOT ENFORCED
4545
) WITH (
46-
-- prefix lookup key
46+
-- prefix key
4747
'bucket.key' = 'city_id',
4848
-- in Flink 2.1, delta join only support append-only source
4949
'table.merge-engine' = 'first_row'
@@ -83,9 +83,8 @@ Join `fluss_catalog`.`my_db`.`right_src` T2
8383
ON T1.`city_id` = T2.`city_id`;
8484
```
8585

86-
If the physical plan includes `DeltaJoin`, it indicates that the optimizer has successfully transformed the traditional streaming join into a delta join.
86+
If the printed plan includes `DeltaJoin` as shown below, it indicates that the optimizer has successfully transformed the traditional streaming join into a delta join.
8787

88-
### Example Optimized Execution Plan
8988
```title="Flink Plan"
9089
== Abstract Syntax Tree ==
9190
LogicalSink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_name])
@@ -112,7 +111,6 @@ Sink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_n
112111
+- Exchange(distribution=[hash[city_id]])
113112
+- TableSourceScan(table=[[fluss_catalog, my_db, right_src]], fields=[city_id, city_name])
114113
```
115-
This confirms that the delta join optimization is active.
116114

117115
## Understanding Prefix Keys
118116
A prefix key defines the portion of a table’s primary key that can be used for efficient key-based lookups or index pruning.
@@ -125,9 +123,9 @@ For example:
125123

126124
In this setup:
127125
* The delta join operator uses the prefix key (`city_id`) to retrieve only relevant right-side records matching each left-side event.
128-
* This eliminates the need to hold all records for every city in memory, significantly reducing state size.
126+
* This eliminates the need to hold all records for every city in Flink state, significantly reducing state size.
129127

130-
Prefix keys thus form the foundation for state-efficient lookups in delta joins, enabling Flink to scale join workloads efficiently even under high throughput.
128+
Prefix keys thus form the foundation for efficient lookups in delta joins, enabling Flink to scale join workloads efficiently even under high throughput.
131129

132130
## Flink Version Support
133131

@@ -144,9 +142,9 @@ Refer to the [Delta Join](https://issues.apache.org/jira/browse/FLINK-37836) for
144142

145143
#### Limitations
146144

147-
- The primary key or the prefix lookup key of the tables must be included as part of the equivalence conditions in the join.
145+
- The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join.
148146
- The join must be a INNER join.
149-
- The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode.
147+
- The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode without `upsertMaterialize`.
150148
- All join inputs should be INSERT-ONLY streams.
151149
- This is why the option `'table.merge-engine' = 'first_row'` is added to the source table DDL.
152150
- All upstream nodes of the join should be `TableSourceScan` or `Exchange`.

0 commit comments

Comments
 (0)