-
Notifications
You must be signed in to change notification settings - Fork 460
[doc] Add docs for delta join support with Flink 2.1 #1875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| --- | ||
| title: "DataStream API" | ||
| sidebar_position: 6 | ||
| sidebar_position: 7 | ||
| --- | ||
|
|
||
| # DataStream API | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,186 @@ | ||
| --- | ||
| sidebar_label: Delta Joins | ||
| title: Flink Delta Joins | ||
| sidebar_position: 6 | ||
| --- | ||
|
|
||
| # The Delta Join | ||
| Beginning with **Apache Flink 2.1**, a new operator called [Delta Join](https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin) was introduced. | ||
| Compared to traditional streaming joins, the delta join operator significantly reduces the amount of state that needs to be maintained during execution. This improvement helps mitigate several common issues associated with large state sizes, including: | ||
|
|
||
| - Excessive computing resource and storage consumption | ||
| - Long checkpointing durations | ||
| - Extended recovery times after failures or restarts | ||
| - Heavy state bootstrap overhead after changing job logic | ||
|
|
||
| Starting with **Apache Fluss 0.8**, streaming join jobs running on **Flink 2.1 or later** will be automatically optimized into **delta joins** whenever applicable. This optimization happens transparently at query planning time, requiring no manual configuration. | ||
|
|
||
| ## How Delta Join Works | ||
|
|
||
| Traditional streaming joins in Flink require maintaining both input sides entirely in state to match records across streams. Delta join, by contrast, uses a **index-key lookup mechanism** to transform the behavior of querying data from the state into querying data from the Fluss source table, thereby avoiding redundant storage of the same data in both the Fluss source table and the state. This drastically reduces state size and improves performance for many streaming analytics and enrichment workloads. | ||
|
|
||
|  | ||
|
|
||
| ## Example: Delta Join in Flink 2.1 | ||
|
|
||
| Below is an example demonstrating a delta join query supported by Flink 2.1. | ||
|
|
||
| #### Create Source and Sink Tables | ||
|
|
||
| ```sql title="Flink SQL" | ||
| USE CATALOG fluss_catalog; | ||
| ``` | ||
|
|
||
| ```sql title="Flink SQL" | ||
| CREATE DATABASE my_db; | ||
| ``` | ||
|
|
||
| ```sql title="Flink SQL" | ||
| USE my_db; | ||
| ``` | ||
|
|
||
| #### Create Left Source Table | ||
| ```sql title="Flink SQL" | ||
| CREATE TABLE `fluss_catalog`.`my_db`.`left_src` ( | ||
| `city_id` INT NOT NULL, | ||
| `order_id` INT NOT NULL, | ||
| `content` VARCHAR NOT NULL, | ||
| PRIMARY KEY (city_id, order_id) NOT ENFORCED | ||
| ) WITH ( | ||
| -- prefix key | ||
| 'bucket.key' = 'city_id', | ||
| -- in Flink 2.1, delta join only support append-only source | ||
| 'table.merge-engine' = 'first_row' | ||
| ); | ||
| ``` | ||
|
|
||
| #### Create Right Source Table | ||
| ```sql title="Flink SQL" | ||
| CREATE TABLE `fluss_catalog`.`my_db`.`right_src` ( | ||
| `city_id` INT NOT NULL, | ||
| `city_name` VARCHAR NOT NULL, | ||
| PRIMARY KEY (city_id) NOT ENFORCED | ||
| ) WITH ( | ||
| -- in Flink 2.1, delta join only support append-only source | ||
| 'table.merge-engine' = 'first_row' | ||
| ); | ||
| ``` | ||
|
|
||
| #### Create Sink Table | ||
| ```sql title="Flink SQL" | ||
| CREATE TABLE `fluss_catalog`.`my_db`.`snk` ( | ||
| `city_id` INT NOT NULL, | ||
| `order_id` INT NOT NULL, | ||
| `content` VARCHAR NOT NULL, | ||
| `city_name` VARCHAR NOT NULL, | ||
| PRIMARY KEY (city_id, order_id) NOT ENFORCED | ||
| ); | ||
| ``` | ||
|
|
||
| #### Explain the Join Query | ||
| ```sql title="Flink SQL" | ||
| EXPLAIN | ||
| INSERT INTO `fluss_catalog`.`my_db`.`snk` | ||
| SELECT T1.`city_id`, T1.`order_id`, T1.`content`, T2.`city_name` | ||
| FROM `fluss_catalog`.`my_db`.`left_src` T1 | ||
| Join `fluss_catalog`.`my_db`.`right_src` T2 | ||
| ON T1.`city_id` = T2.`city_id`; | ||
| ``` | ||
|
|
||
| If the printed plan includes `DeltaJoin` as shown below, it indicates that the optimizer has successfully transformed the traditional streaming join into a delta join. | ||
|
|
||
| ```title="Flink Plan" | ||
| == Abstract Syntax Tree == | ||
| LogicalSink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_name]) | ||
| +- LogicalProject(city_id=[$0], order_id=[$1], content=[$2], city_name=[$4]) | ||
| +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) | ||
| :- LogicalTableScan(table=[[fluss_catalog, my_db, left_src]]) | ||
| +- LogicalTableScan(table=[[fluss_catalog, my_db, right_src]]) | ||
|
|
||
| == Optimized Physical Plan == | ||
| Sink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_name]) | ||
| +- Calc(select=[city_id, order_id, content, city_name]) | ||
| +- DeltaJoin(joinType=[InnerJoin], where=[=(city_id, city_id0)], select=[city_id, order_id, content, city_id0, city_name]) | ||
| :- Exchange(distribution=[hash[city_id]]) | ||
| : +- TableSourceScan(table=[[fluss_catalog, my_db, left_src]], fields=[city_id, order_id, content]) | ||
| +- Exchange(distribution=[hash[city_id]]) | ||
| +- TableSourceScan(table=[[fluss_catalog, my_db, right_src]], fields=[city_id, city_name]) | ||
|
|
||
| == Optimized Execution Plan == | ||
| Sink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_name]) | ||
| +- Calc(select=[city_id, order_id, content, city_name]) | ||
| +- DeltaJoin(joinType=[InnerJoin], where=[(city_id = city_id0)], select=[city_id, order_id, content, city_id0, city_name]) | ||
| :- Exchange(distribution=[hash[city_id]]) | ||
| : +- TableSourceScan(table=[[fluss_catalog, my_db, left_src]], fields=[city_id, order_id, content]) | ||
| +- Exchange(distribution=[hash[city_id]]) | ||
| +- TableSourceScan(table=[[fluss_catalog, my_db, right_src]], fields=[city_id, city_name]) | ||
| ``` | ||
|
|
||
| ## Understanding Index Keys | ||
|
|
||
| Delta Join relies on performing lookups by the join key (e.g., the `city_id` in the above example) on the Fluss source tables. This requires that the Fluss source tables are defined with appropriate index on the join key to enable efficient lookups. | ||
|
|
||
| Currently, Fluss only supports to defines a single index key per table, which is also referred to as the **prefix key**. The general secondary index which allows define multiple indexes with arbitrary fields is planned to be supported in the near future releases. | ||
|
|
||
| A prefix key defines the portion of a table’s primary key that can be used for efficient key-based lookups or index pruning. | ||
|
|
||
| In Fluss, the option `'bucket.key' = 'city_id'` specifies that data is organized (or bucketed) by `city_id`. When performing a delta join, this allows Flink to quickly locate and read only the subset of records corresponding to the specific prefix key value, rather than scanning or caching the entire table state. | ||
|
|
||
| For example: | ||
| - Full primary key: `(city_id, order_id)` | ||
| - Bucket key: `city_id` | ||
|
|
||
| This yields an **index** on the prefix key `city_id`, so that you can perform [Prefix Key Lookup](/docs/engine-flink/lookups/#prefix-lookup) by the `city_id`. | ||
|
|
||
| In this setup: | ||
| * The delta join operator uses the prefix key (`city_id`) to retrieve only relevant right-side records matching each left-side event. | ||
| * This eliminates the need to hold all records for every city in Flink state, significantly reducing state size. | ||
|
|
||
| Prefix keys thus form the foundation for efficient lookups in delta joins, enabling Flink to scale join workloads efficiently even under high throughput. | ||
|
|
||
| ## Flink Version Support | ||
|
|
||
| The delta join feature is introduced since Flink 2.1 and still evolving, and its optimization capabilities vary across Flink versions. | ||
|
|
||
| Refer to the [Delta Join Issue](https://issues.apache.org/jira/browse/FLINK-37836) for the most up-to-date information. | ||
|
|
||
|
|
||
| ### Flink 2.1 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the upcoming Flink 2.2, additional patterns will be supported, as the relevant PRs have already been merged. I'm uncertain whether I should include a description of Flink 2.2 here. So it's entirely up to you. |
||
|
|
||
| #### Supported Features | ||
|
|
||
| - Support for optimizing a dual-stream join in the straightforward pattern of `insert only source -> join -> upsert sink` into a delta join. | ||
|
|
||
| #### Limitations | ||
|
|
||
| - The primary key or the prefix key of the tables must be included as part of the equivalence conditions in the join. | ||
| - The join must be a INNER join. | ||
| - The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode without `upsertMaterialize`. | ||
| - All join inputs should be INSERT-ONLY streams. | ||
| - This is why the option `'table.merge-engine' = 'first_row'` is added to the source table DDL. | ||
| - All upstream nodes of the join should be `TableSourceScan` or `Exchange`. | ||
|
|
||
| ### Flink 2.2 (upcoming) | ||
|
|
||
| #### Supported Features | ||
|
|
||
| - Support for optimizing a dual-stream join from CDC sources that do not include delete messages into a delta join. | ||
| - Disable delete on the source table to guarantee there is no delete message in the table, by adding the option `'table.delete.behavior' = 'IGNORE'` or `'DISABLE'` on the table. | ||
| - The source table is no more required to be a `first_row` merge engine table since this version. | ||
| - Support `Project` and `Filter` between source and delta join. | ||
| - Support cache in delta join. | ||
|
|
||
| #### Limitations | ||
|
|
||
| - The primary key or the prefix lookup key of the tables must be included as part of the equivalence conditions in the join. | ||
| - The join must be a INNER join. | ||
| - The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode. | ||
| - When consuming a CDC stream, the join key used in the delta join must be part of the primary key. | ||
| - All filters must be applied on the upsert key, and neither filters nor projections should contain non-deterministic functions. | ||
|
|
||
| ## Future Plan | ||
|
|
||
| The Fluss and Flink community are actively working together on enhancing delta join. Planned improvements include: | ||
| - Support for additional join types, such as LEFT JOIN and FULL OUTER JOIN. | ||
| - Support for multi-way joins involving more than two streams. This also requires Fluss to support defining multiple secondary index keys on a single table. | ||
| - Support for more complex query patterns to optimize a wider range of join scenarios. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| --- | ||
| title: Connector Options | ||
| sidebar_position: 7 | ||
| sidebar_position: 8 | ||
| --- | ||
|
|
||
| # Connector Options | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the position here to set

DeltaJoinspage betweenLookupsandDataStream API.