Skip to content

Commit def979f

Browse files
authored
[blog] Publish "Understanding Partial Updates" blog posg (#959)
1 parent cd5071d commit def979f

File tree

6 files changed

+323
-2
lines changed

6 files changed

+323
-2
lines changed
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
---
2+
slug: partial-updates
3+
title: "Understanding Partial Updates"
4+
authors: [giannis]
5+
---
6+
7+
<!--
8+
Copyright (c) 2025 Alibaba Group Holding Ltd.
9+
10+
Licensed under the Apache License, Version 2.0 (the "License");
11+
you may not use this file except in compliance with the License.
12+
You may obtain a copy of the License at
13+
14+
http://www.apache.org/licenses/LICENSE-2.0
15+
16+
Unless required by applicable law or agreed to in writing, software
17+
distributed under the License is distributed on an "AS IS" BASIS,
18+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
See the License for the specific language governing permissions and
20+
limitations under the License.
21+
-->
22+
23+
![Banner](assets/partial_updates/banner.png)
24+
25+
Traditional streaming data pipelines often need to join many tables or streams on a primary key to create a wide view.
26+
For example, imagine you’re building a real-time recommendation engine for an e-commerce platform.
27+
To serve highly personalized recommendations, your system needs a complete 360° view of each user, including:
28+
*user preferences*, *past purchases*, *clickstream behavior*, *cart activity*, *product reviews*, *support tickets*, *ad impressions*, and *loyalty status*.
29+
30+
That’s at least **8 different data sources**, each producing updates independently.
31+
<!-- truncate -->
32+
Joining multiple data streams at scale, although it works with Apache Flink it can be really challenging and resource-intensive.
33+
More specifically, it can lead to:
34+
* **Really large state sizes in Flink:** as it needs to buffer all incoming events until they can be joined. In many case states need to be kept around for a long period of time if not indefinitely.
35+
* **Deal with checkpoints overhead and backpressure:** as the join operation and large state uploading can create a bottleneck in the pipeline.
36+
* **States are not easy to inspect and debug:** as they are often large and complex. This can make it difficult to understand what is happening in the pipeline and why certain events are not being processed correctly.
37+
* **State TTL can lead to inconsistent results:** as events may be dropped before they can be joined. This can lead to data loss and incorrect results in the final output.
38+
39+
Overall, this approach not only consumes a lot of memory and CPU, but also complicates the job design and maintenance.
40+
41+
![Streaming Joins](assets/partial_updates/streaming_join.png)
42+
43+
### Partial Updates: A Different Approach with Fluss
44+
Fluss introduces a more elegant solution: **partial updates** on a primary key table.
45+
46+
Instead of performing multi-way joins in the streaming job, Fluss allows each data stream source to independently update only its relevant columns into a shared wide table identified by the primary key.
47+
In Fluss, you can define a wide table (for example, a user_profile table based on a `user_id`) that contains all possible fields from all sources.
48+
Each source stream then writes partial rows – only the fields it knows about – into this table.
49+
50+
![Partial Update](assets/partial_updates/partial_update.png)
51+
52+
Fluss’s storage engine automatically merges these partial updates together based on the primary key.
53+
Essentially, Fluss maintains the latest combined value for each key, so you don’t have to manage large join states in Flink.
54+
55+
Under the hood, when a new partial update for a key arrives, Fluss will look up the existing record for that primary key, update the specific columns provided, and leave other columns unchanged.
56+
The result is written back as the new version of the record.
57+
This happens in *real-time*, so the table is **always up-to-date** with the latest information from all streams.
58+
59+
Next, let's try and better understand how this works in practice with a concrete example.
60+
### Example: Building a Unified Wide Table
61+
> You can find the full source code on github [here](https://github.com/ververica/ververica-fluss-examples/tree/main/partial_updates).
62+
63+
Start by cloning the repository, run `docker compose up` to spin up the development enviroment and finally grab a terminal
64+
into the `jobmanager` and start the Flink SQL cli, by running the following command:
65+
```shell
66+
./bin/sql-client.sh
67+
```
68+
69+
Great so far ! 👍
70+
71+
**Step 1:** The first thing we need to do is to create a Flink catalog that will be used to store the tables we are going to create.
72+
Let's create a catalog called `fluss_catalog` and use this catalog.
73+
```sql
74+
CREATE CATALOG fluss_catalog WITH (
75+
'type' = 'fluss',
76+
'bootstrap.servers' = 'coordinator-server:9123'
77+
);
78+
79+
USE CATALOG fluss_catalog;
80+
```
81+
82+
**Step 2:** Then let's create `3 tables` to represent the different data sources that will be used to build the recommendations wide table.
83+
```sql
84+
-- Recommendations – model scores
85+
CREATE TABLE recommendations (
86+
user_id STRING,
87+
item_id STRING,
88+
rec_score DOUBLE,
89+
rec_ts TIMESTAMP(3),
90+
PRIMARY KEY (user_id, item_id) NOT ENFORCED
91+
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');
92+
93+
94+
-- Impressions – how often we showed something
95+
CREATE TABLE impressions (
96+
user_id STRING,
97+
item_id STRING,
98+
imp_cnt INT,
99+
imp_ts TIMESTAMP(3),
100+
PRIMARY KEY (user_id, item_id) NOT ENFORCED
101+
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');
102+
103+
-- Clicks – user engagement
104+
CREATE TABLE clicks (
105+
user_id STRING,
106+
item_id STRING,
107+
click_cnt INT,
108+
clk_ts TIMESTAMP(3),
109+
PRIMARY KEY (user_id, item_id) NOT ENFORCED
110+
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');
111+
112+
CREATE TABLE user_rec_wide (
113+
user_id STRING,
114+
item_id STRING,
115+
rec_score DOUBLE, -- updated by recs stream
116+
imp_cnt INT, -- updated by impressions stream
117+
click_cnt INT, -- updated by clicks stream
118+
PRIMARY KEY (user_id, item_id) NOT ENFORCED
119+
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');
120+
```
121+
122+
**Step 3:** Of course, we will need some sample data to work with , so let's go on and insert some records into the tables. 💻
123+
```sql
124+
-- Recommendations – model scores
125+
INSERT INTO recommendations VALUES
126+
('user_101','prod_501',0.92 , TIMESTAMP '2025-05-16 09:15:02'),
127+
('user_101','prod_502',0.78 , TIMESTAMP '2025-05-16 09:15:05'),
128+
('user_102','prod_503',0.83 , TIMESTAMP '2025-05-16 09:16:00'),
129+
('user_103','prod_501',0.67 , TIMESTAMP '2025-05-16 09:16:20'),
130+
('user_104','prod_504',0.88 , TIMESTAMP '2025-05-16 09:16:45');
131+
```
132+
133+
```sql
134+
-- Impressions – how often each (user,item) was shown
135+
INSERT INTO impressions VALUES
136+
('user_101','prod_501', 3, TIMESTAMP '2025-05-16 09:17:10'),
137+
('user_101','prod_502', 1, TIMESTAMP '2025-05-16 09:17:15'),
138+
('user_102','prod_503', 7, TIMESTAMP '2025-05-16 09:18:22'),
139+
('user_103','prod_501', 4, TIMESTAMP '2025-05-16 09:18:30'),
140+
('user_104','prod_504', 2, TIMESTAMP '2025-05-16 09:18:55');
141+
```
142+
143+
```sql
144+
-- Clicks – user engagement
145+
INSERT INTO clicks VALUES
146+
('user_101','prod_501', 1, TIMESTAMP '2025-05-16 09:19:00'),
147+
('user_101','prod_502', 2, TIMESTAMP '2025-05-16 09:19:07'),
148+
('user_102','prod_503', 1, TIMESTAMP '2025-05-16 09:19:12'),
149+
('user_103','prod_501', 1, TIMESTAMP '2025-05-16 09:19:20'),
150+
('user_104','prod_504', 1, TIMESTAMP '2025-05-16 09:19:25');
151+
```
152+
153+
> **Note:** 🚨 So far the jobs we run were bounded jobs, so they will finish after inserting the records. Moving forward we will run some streaming jobs.
154+
So keep in mind that each job runs with a `parallelism of 3` and our environment is set up `with 10 slots total`.
155+
So make sure to keep an eye to the Flink Web UI to see how many slots are used and how many are available and stop some jobs when are no longer needed to free up resourecs.
156+
157+
158+
**Step 4:** At this point let's open up a separate terminal and start the Flink SQL CLI.
159+
In this new terminal, make sure to run set the `result-mode`:
160+
```shell
161+
SET 'sql-client.execution.result-mode' = 'tableau';
162+
```
163+
and then run:
164+
```sql
165+
SELECT * FROM user_rec_wide;
166+
```
167+
to observe the output of the table, as we insert `partially` records into the it from the different sources.
168+
169+
**Step 5:** Let's insert the records from the `recommendations` table into the `user_rec_wide` table.
170+
```sql
171+
-- Apply recommendation scores
172+
INSERT INTO user_rec_wide (user_id, item_id, rec_score)
173+
SELECT
174+
user_id,
175+
item_id,
176+
rec_score
177+
FROM recommendations;
178+
```
179+
180+
**Output:** Notice, how only the related columns are updated in the `user_rec_wide` table and the rest of the columns are `NULL`.
181+
```shell
182+
Flink SQL> SELECT * FROM user_rec_wide;
183+
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
184+
| op | user_id | item_id | rec_score | imp_cnt | click_cnt |
185+
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
186+
| +I | user_101 | prod_501 | 0.92 | <NULL> | <NULL> |
187+
| +I | user_101 | prod_502 | 0.78 | <NULL> | <NULL> |
188+
| +I | user_104 | prod_504 | 0.88 | <NULL> | <NULL> |
189+
| +I | user_102 | prod_503 | 0.83 | <NULL> | <NULL> |
190+
| +I | user_103 | prod_501 | 0.67 | <NULL> | <NULL> |
191+
```
192+
193+
**Step 5:** Next, let's insert the records from the `impressions` table into the `user_rec_wide` table.
194+
```sql
195+
-- Apply impression counts
196+
INSERT INTO user_rec_wide (user_id, item_id, imp_cnt)
197+
SELECT
198+
user_id,
199+
item_id,
200+
imp_cnt
201+
FROM impressions;
202+
```
203+
204+
**Output:** Notice how the `impressions` records are inserted into the `user_rec_wide` table and the `imp_cnt` column is updated.
205+
```shell
206+
Flink SQL> SELECT * FROM user_rec_wide;
207+
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
208+
| op | user_id | item_id | rec_score | imp_cnt | click_cnt |
209+
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
210+
| +I | user_101 | prod_501 | 0.92 | <NULL> | <NULL> |
211+
| +I | user_101 | prod_502 | 0.78 | <NULL> | <NULL> |
212+
| +I | user_104 | prod_504 | 0.88 | <NULL> | <NULL> |
213+
| +I | user_102 | prod_503 | 0.83 | <NULL> | <NULL> |
214+
| +I | user_103 | prod_501 | 0.67 | <NULL> | <NULL> |
215+
216+
217+
218+
| -U | user_101 | prod_501 | 0.92 | <NULL> | <NULL> |
219+
| +U | user_101 | prod_501 | 0.92 | 3 | <NULL> |
220+
| -U | user_101 | prod_502 | 0.78 | <NULL> | <NULL> |
221+
| +U | user_101 | prod_502 | 0.78 | 1 | <NULL> |
222+
| -U | user_104 | prod_504 | 0.88 | <NULL> | <NULL> |
223+
| +U | user_104 | prod_504 | 0.88 | 2 | <NULL> |
224+
| -U | user_102 | prod_503 | 0.83 | <NULL> | <NULL> |
225+
| +U | user_102 | prod_503 | 0.83 | 7 | <NULL> |
226+
| -U | user_103 | prod_501 | 0.67 | <NULL> | <NULL> |
227+
| +U | user_103 | prod_501 | 0.67 | 4 | <NULL> |
228+
```
229+
230+
**Step 6:** Finally, let's insert the records from the `clicks` table into the `user_rec_wide` table.
231+
```sql
232+
-- Apply click counts
233+
INSERT INTO user_rec_wide (user_id, item_id, click_cnt)
234+
SELECT
235+
user_id,
236+
item_id,
237+
click_cnt
238+
FROM clicks;
239+
```
240+
241+
**Output:** Notice how the `clicks` records are inserted into the `user_rec_wide` table and the `click_cnt` column is updated.
242+
```shell
243+
Flink SQL> SELECT * FROM user_rec_wide;
244+
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
245+
| op | user_id | item_id | rec_score | imp_cnt | click_cnt |
246+
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
247+
| +I | user_101 | prod_501 | 0.92 | <NULL> | <NULL> |
248+
| +I | user_101 | prod_502 | 0.78 | <NULL> | <NULL> |
249+
| +I | user_104 | prod_504 | 0.88 | <NULL> | <NULL> |
250+
| +I | user_102 | prod_503 | 0.83 | <NULL> | <NULL> |
251+
| +I | user_103 | prod_501 | 0.67 | <NULL> | <NULL> |
252+
253+
254+
255+
| -U | user_101 | prod_501 | 0.92 | <NULL> | <NULL> |
256+
| +U | user_101 | prod_501 | 0.92 | 3 | <NULL> |
257+
| -U | user_101 | prod_502 | 0.78 | <NULL> | <NULL> |
258+
| +U | user_101 | prod_502 | 0.78 | 1 | <NULL> |
259+
| -U | user_104 | prod_504 | 0.88 | <NULL> | <NULL> |
260+
| +U | user_104 | prod_504 | 0.88 | 2 | <NULL> |
261+
| -U | user_102 | prod_503 | 0.83 | <NULL> | <NULL> |
262+
| +U | user_102 | prod_503 | 0.83 | 7 | <NULL> |
263+
| -U | user_103 | prod_501 | 0.67 | <NULL> | <NULL> |
264+
| +U | user_103 | prod_501 | 0.67 | 4 | <NULL> |
265+
266+
267+
| -U | user_103 | prod_501 | 0.67 | 4 | <NULL> |
268+
| +U | user_103 | prod_501 | 0.67 | 4 | 1 |
269+
| -U | user_101 | prod_501 | 0.92 | 3 | <NULL> |
270+
| +U | user_101 | prod_501 | 0.92 | 3 | 1 |
271+
| -U | user_101 | prod_502 | 0.78 | 1 | <NULL> |
272+
| +U | user_101 | prod_502 | 0.78 | 1 | 2 |
273+
| -U | user_104 | prod_504 | 0.88 | 2 | <NULL> |
274+
| +U | user_104 | prod_504 | 0.88 | 2 | 1 |
275+
| -U | user_102 | prod_503 | 0.83 | 7 | <NULL> |
276+
| +U | user_102 | prod_503 | 0.83 | 7 | 1 |
277+
```
278+
279+
**Reminder:** ‼️As mentioned before make sure to stop the jobs that are no longer needed to free up resources.
280+
281+
Now let's switch to `batch` mode and query the current snapshot of the `user_rec_wide` table.
282+
283+
But before that, let's start the [Tiering Service](https://alibaba.github.io/fluss-docs/docs/maintenance/tiered-storage/lakehouse-storage/#start-the-datalake-tiering-service) that allows offloading the tables as `Lakehouse` tables.
284+
285+
**Step 7:** Open a new terminal 💻 in the `Coordinator Server` and run the following command to start the `Tiering Service`:
286+
```shell
287+
./bin/lakehouse.sh -D flink.rest.address=jobmanager -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=30s -D flink.parallelism.default=2
288+
```
289+
290+
The configured checkpoint is `flink.execution.checkpointing.interval=30s` so wait a bit until the first checkpoint is created
291+
and data gets offloading to the `Lakehouse` tables.
292+
293+
**Step 8:** Finally let's switch to `batch` mode and query the current snapshot of the `user_rec_wide` table.
294+
```shell
295+
SET 'execution.runtime-mode' = 'batch';
296+
297+
Flink SQL> SELECT * FROM user_rec_wide;
298+
+----------+----------+-----------+---------+-----------+
299+
| user_id | item_id | rec_score | imp_cnt | click_cnt |
300+
+----------+----------+-----------+---------+-----------+
301+
| user_102 | prod_503 | 0.83 | 7 | 1 |
302+
| user_103 | prod_501 | 0.67 | 4 | 1 |
303+
| user_101 | prod_501 | 0.92 | 3 | 1 |
304+
| user_101 | prod_502 | 0.78 | 1 | 2 |
305+
| user_104 | prod_504 | 0.88 | 2 | 1 |
306+
+----------+----------+-----------+---------+-----------+
307+
5 rows in set (2.63 seconds)
308+
```
309+
310+
🎉 That's it! You have successfully created a unified wide table using partial updates in Fluss.
311+
312+
### Conclusion
313+
Partial updates in Fluss enable an alternative approach in how we design streaming data pipelines for enriching or joining data.
314+
315+
When all your sources share a primary key - otherwise you can mix & match [streaming lookup joins](https://alibaba.github.io/fluss-docs/docs/engine-flink/lookups/#lookup) - you can turn the problem on its head: update a unified table incrementally, rather than joining streams on the fly.
316+
317+
The result is a more scalable, maintainable, and efficient pipeline.
318+
Engineers can spend less time wrestling with Flink’s state, checkpoints and join mechanics, and more time delivering fresh, integrated data to power real-time analytics and applications.
319+
With Fluss handling the merge logic, achieving a single, up-to-date view from multiple disparate streams becomes way more elegant. 😁
320+
321+
And before you go 😊 don’t forget to give Fluss 🌊 some ❤️ via ⭐ on [GitHub](https://github.com/alibaba/fluss)
1.27 MB
Loading
117 KB
Loading
128 KB
Loading

website/docusaurus.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ const config: Config = {
152152
prism: {
153153
theme: prismThemes.vsDark,
154154
darkTheme: prismThemes.dracula,
155-
additionalLanguages: ['java']
155+
additionalLanguages: ['java', 'bash']
156156
},
157157
algolia: {
158158
appId: "D8RXQUTC99",

website/src/css/custom.css

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494

9595
b,
9696
strong {
97-
font-weight: 500;
97+
font-weight: 700;
9898
color: #1d1d1d;
9999
}
100100

0 commit comments

Comments
 (0)