Skip to content

Commit 7bbba37

Browse files
gyang94polyzos
andauthored
[blog]: tiering service deep dive (#1249)
* [blog]: tiering service deep dive * Gianni's edits * fix: type and duplicate sentences --------- Co-authored-by: ipolyzos <[email protected]>
1 parent 77a93a5 commit 7bbba37

File tree

8 files changed

+229
-1
lines changed

8 files changed

+229
-1
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
---
2+
slug: tiering-service
3+
title: "Tiering Service Deep Dive"
4+
authors: [gyang94]
5+
toc_max_heading_level: 5
6+
---
7+
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing, software
20+
distributed under the License is distributed on an "AS IS" BASIS,
21+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22+
See the License for the specific language governing permissions and
23+
limitations under the License.
24+
-->
25+
26+
# Tiering Service Deep Dive
27+
28+
## Background
29+
30+
![](assets/tiering_service/background.png)
31+
32+
At the core of Fluss’s Lakehouse architecture sits the **Tiering Service:** a smart,
33+
policy-driven data pipeline that seamlessly bridges your real-time Fluss cluster and your cost-efficient lakehouse storage. It continuously ingests fresh events from the fluss cluster, automatically migrating older or less-frequently accessed data into colder storage tiers without interrupting ongoing queries. By balancing hot, warm, and cold storage according to configurable rules, the Tiering Service ensures that recent data remains instantly queryable while historical records are archived economically.
34+
35+
In this blog post we will take a deep dive and explore how Fluss’s Tiering Service `orchestrates data movement`, `preserves consistency`, and empowers `scalable`, `high-performance` analytics at `optimized costs`.
36+
37+
<!-- truncate -->
38+
39+
## Flink Tiering Service
40+
41+
Fluss tiering service is an Apache Flink job, which keeps moving data from fluss cluster to data lake.
42+
The execution plan is quite straight forward. It has a three operators: a `source`, a `committer` and an empty `sink writer`.
43+
44+
```
45+
Source: TieringSource -> TieringCommitter -> Sink: Writer
46+
```
47+
48+
- **TieringSource**: Reads records from the Fluss tiering table and writes them to the data lake.
49+
- **TieringCommitter**: Commits each sync batch by advancing offsets in both the lakehouse and the Fluss cluster.
50+
- **No-Op Sink**: A dummy sink that performs no action.
51+
52+
In the sections that follow, we’ll dive into the **TieringSource** and **TieringCommitter** to see exactly how they orchestrate seamless data movement between real-time and historical storage.
53+
54+
## TieringSource
55+
56+
![](assets/tiering_service/tiering-source.png)
57+
58+
The **TieringSource** operator reads records from the Fluss tiering table and writes them into your data lake.
59+
Built on Flink’s Source V2 API ([FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-238:+Introduce+FLIP-27-based+Data+Generator+Source)), it breaks down into two core components: the **TieringSourceEnumerator** and the **TieringSourceReader**.
60+
The high-level workflow is as follows:
61+
62+
1. The **Enumerator** queries the **CoordinatorService** for current tiering table metadata.
63+
2. Once it receives the table information, the Enumerator generates `“splits”` (data partitions) and assigns them to the **Reader**.
64+
3. The **Reader** fetches the actual data for each split.
65+
4. Finally the **Reader** writes those records into the data lake.
66+
67+
In the following sections, we’ll explore how the **TieringSourceEnumerator** and **TieringSourceReader** work under the hood to deliver reliable, scalable ingestion from Fluss into your lakehouse.
68+
69+
### TieringSourceEnumerator
70+
71+
![](assets/tiering_service/tiering-source-enumerator.png)
72+
73+
The **TieringSourceEnumerator** orchestrates split creation and assignment in five key steps:
74+
75+
1. **Heartbeat Request**: Uses an RPC client to send a `lakeTieringHeartbeatRequest` to the Fluss server.
76+
2. **Heartbeat Response**: Receives a `lakeTieringHeartbeatResponse` that contains the tiering table metadata and sync statuses for `completed`, `failed`, and `in-progress` tables.
77+
3. **Lake Tiering Info**: Forwards the returned `lakeTieringInfo` to the `TieringSplitGenerator`.
78+
4. **Split Generation**: The `TieringSplitGenerator` produces a set of `TieringSplits`, each representing a data partition to process.
79+
5. **Split Assignment**: Assigns those `TieringSplits` to `TieringSourceReader` instances for downstream ingestion into the data lake.
80+
81+
#### RpcClient
82+
83+
The `RpcClient` inside the `TieringSourceEnumerator` handles all RPC communication with the Fluss CoordinatorService. Its responsibilities include:
84+
85+
- **Sending Heartbeats**: It constructs and sends a `LakeTieringHeartbeatRequest`, which carries three lists of tables—`tiering_tables` (in-progress), `finished_tables`, and `failed_tables`—along with an optional `request_table` flag to request new tiering work.
86+
- **Receiving Responses**: It awaits a `LakeTieringHeartbeatResponse` that contains:
87+
- `coordinator_epoch`: the current epoch of the coordinator.
88+
- `tiering_table` (optional): a `PbLakeTieringTableInfo` message (with `table_id`, `table_path`, and `tiering_epoch`) describing the next table to tier.
89+
- `tiering_table_resp`, `finished_table_resp`, and `failed_table_resp`: lists of heartbeat responses reflecting the status of each table.
90+
- **Forwarding Metadata**: It parses the returned `PbLakeTieringTableInfo` and the sync-status responses, then forwards the assembled `lakeTieringInfo` to the `TieringSplitGenerator` for split creation.
91+
92+
#### TieringSplitGenerator
93+
94+
![](assets/tiering_service/tiering-split-generator.png)
95+
96+
The **TieringSplitGenerator** is an important component that orchestrates efficient data synchronization between your real-time Fluss cluster and your lakehouse.
97+
It precisely calculates the data `"delta"`, i.e what's new or changed in Fluss but not yet committed to the lake and then generates **TieringSplit** tasks for each segment requiring synchronization.
98+
99+
To achieve this, the `TieringSplitGenerator` leverages the `FlussAdminClient` to fetch three essential pieces of metadata:
100+
101+
**Lake Snapshot**
102+
103+
The generator first invokes the lake metadata API to retrieve a **LakeSnapshot** object. This snapshot provides a complete picture of the current state of your data in the lakehouse, including:
104+
* `snapshotId:` The identifier for the latest committed snapshot in your data lake.
105+
* `tableBucketsOffset:` A map that details the log offset in the lakehouse for each `TableBucket`.
106+
107+
**Current Bucket Offsets**
108+
109+
Next, the `TieringSplitGenerator` queries the Fluss server to determine the **current log end offset** for each bucket. This effectively captures the high-water mark of incoming data streams in real time within your Fluss cluster.
110+
111+
**KV Snapshots (for primary-keyed tables)**
112+
113+
For tables that utilize primary keys, the generator also retrieves a **KvSnapshots** record. This record contains vital information for maintaining consistency with key-value stores:
114+
* `tableId` and an optional `partitionId`.
115+
* `snapshotIds:` The latest snapshot ID specific to each bucket.
116+
* `logOffsets:` The exact log position from which to resume reading after that snapshot, ensuring seamless data ingestion.
117+
118+
With the `LakeSnapshot`, the live bucket offsets from the Fluss cluster, and (where applicable) the `KvSnapshots`, the `TieringSplitGenerator` performs its core function: it computes which log segments are present in Fluss but have not yet been committed to the lakehouse.
119+
120+
Finally, for each identified segment, it produces a distinct **TieringSplit**. Each `TieringSplit` precisely defines the specific bucket and the exact offset range that needs to be ingested. This meticulous process ensures incremental, highly efficient synchronization, seamlessly bridging your real-time operational data with your historical, cost-optimized storage.
121+
122+
#### TieringSplit
123+
124+
The **TieringSplit** abstraction defines exactly which slice of a table bucket needs to be synchronized. It captures three common fields:
125+
126+
- **tablePath**: the full path to the target table.
127+
- **tableBucket**: the specific bucket (shard) within that table.
128+
- **partitionName** (optional): the partition key, if the table is partitioned.
129+
130+
There are two concrete split types:
131+
132+
1. **TieringLogSplit** (for append-only “log” tables)
133+
- **startingOffset**: the last committed log offset in the lake.
134+
- **stoppingOffset**: the current end offset in the live Fluss bucket.
135+
- This split defines a contiguous range of new log records to ingest.
136+
2. **TieringSnapshotSplit** (for primary-keyed tables)
137+
- **snapshotId**: the identifier of the latest snapshot in Fluss.
138+
- **logOffsetOfSnapshot**: the log offset at which that snapshot was taken.
139+
- This split lets the TieringSourceReader replay all CDC (change-data-capture) events since the snapshot, ensuring up-to-date state.
140+
141+
By breaking each table into these well-defined splits, the Tiering Service can incrementally, reliably, and in parallel sync exactly the data that’s missing from your data lake.
142+
143+
### TieringSourceReader
144+
145+
![](assets/tiering_service/tiering-source-reader.png)
146+
147+
The **TieringSourceReader** pulls assigned splits from the enumerator, uses a `TieringSplitReader` to fetch the corresponding records from the Fluss server, and then writes them into the data lake. Its workflow breaks down as follows:
148+
149+
- **Split Selection:** The reader picks an assigned `TieringSplit` from its queue.
150+
- **Reader Dispatch:** Depending on the split type, it instantiates either:
151+
- **LogScanner** for `TieringLogSplit` (append-only tables)
152+
- **BoundedSplitReader** for `TieringSnapshotSplit` (primary-keyed tables)
153+
- **Data Fetch:** The chosen reader fetches the records defined by the split’s offset or snapshot boundaries from the Fluss server.
154+
- **Lake Writing"** Retrieved records are handed off to the lake writer, which persists them into the data lake.
155+
156+
By cleanly separating split assignment, reader selection, data fetching, and lake writing, the TieringSourceReader ensures scalable, parallel ingestion of streaming and snapshot data into your lakehouse.
157+
158+
#### LakeWriter & LakeTieringFactory
159+
160+
The LakeWriter is responsible for persisting Fluss records into your data lake, and it’s instantiated via a pluggable LakeTieringFactory. This interface defines how Fluss interacts with various lake formats (e.g., Paimon, Iceberg):
161+
162+
```java
163+
public interface LakeTieringFactory {
164+
165+
LakeWriter<WriteResult> createLakeWriter(WriterInitContext writerInitContext);
166+
167+
SimpleVersionedSerializer<WriteResult> getWriteResultSerializer();
168+
169+
LakeCommitter<WriteResult, CommitableT> createLakeCommitter(
170+
CommitterInitContext committerInitContext);
171+
172+
SimpleVersionedSerializer<CommitableT> getCommitableSerializer();
173+
}
174+
```
175+
- **createLakeWriter(WriterInitContext)**: builds a `LakeWriter` to convert Fluss rows into the target table format.
176+
- **getWriteResultSerializer()**: supplies a serializer for the writer’s output.
177+
- **createLakeCommitter(CommitterInitContext)**: constructs a `LakeCommitter` to finalize and atomically commit data files.
178+
- **getCommitableSerializer()**: provides a serializer for committable tokens.```
179+
180+
By default, Fluss includes a Paimon-backed tiering factory; Iceberg support is coming soon. Once the `TieringSourceReader` writes a batch of records through the `LakeWriter`, it emits the resulting write metadata downstream to the **TieringCommitOperator**, which then commits those changes both in the lakehouse and back to the Fluss cluster.
181+
182+
#### Stateless
183+
184+
The `TieringSourceReader` is designed to be completely stateless—it does not checkpoint or store any `TieringSplit` information itself. Instead, every checkpoint simply returns an empty list, leaving all split-tracking to the `TieringSourceEnumerator`:
185+
186+
```java
187+
@Override
188+
public List<TieringSplit> snapshotState(long checkpointId) {
189+
// Stateless: no splits are held in reader state
190+
return Collections.emptyList();
191+
}
192+
```
193+
194+
By delegating split assignment entirely to the Enumerator, the reader remains lightweight and easily scalable, always fetching its next work unit afresh from the coordinator.
195+
196+
## TieringCommitter
197+
198+
![](assets/tiering_service/tiering-committer.png)
199+
200+
The **TieringCommitter** operator wraps up each sync cycle by taking the `WriteResult` outputs from the TieringSourceReader and committing them in two phases:
201+
first to the data lake, then back to Fluss, before emitting status events to the Flink coordinator. It leverages two components:
202+
203+
- **LakeCommitter**: Provided by the pluggable `LakeTieringFactory`, this component atomically commits the written files into the lakehouse and returns the new snapshot ID.
204+
- **FlussTableLakeSnapshotCommitter**: Using that snapshot ID, it updates the Fluss cluster’s tiering table status so that the Fluss server and lakehouse remain in sync.
205+
206+
The end-to-end flow is:
207+
208+
1. **Collect Write Results** from the TieringSourceReader for the current checkpoint.
209+
2. **Lake Commit** via the `LakeCommitter`, which finalizes files and advances the lake snapshot.
210+
3. **Fluss Update** using the `FlussTableLakeSnapshotCommitter`, acknowledging success or failure back to the Fluss CoordinatorService.
211+
4. **Event Emission** of either `FinishedTieringEvent` (on success or completion) or `FailedTieringEvent` (on errors) to the Flink `OperatorCoordinator`.
212+
213+
This TieringCommitter operator ensures exactly-once consistent synchronization between your real-time Fluss cluster and your analytical lakehouse.
214+
215+
## Conclusion
216+
217+
In this deep dive, we thoroughly explored every facet of Fluss's **Tiering Service**.
218+
We began by dissecting the **TieringSource**, understanding the critical roles of its Enumerator, RpcClient, and SplitGenerator. From there, we examined the various split types and the efficiency of the stateless **TieringSourceReader**.
219+
220+
Our journey then led us to the flexible, pluggable integration of the **LakeWriter** and **LakeCommitter**. Finally, we saw how the **TieringCommitter**, with its LakeCommitter and FlussTableLakeSnapshotCommitter, orchestrates **atomic**, **exactly-once commits** across both your data lake and Fluss cluster.
221+
222+
Together, these components form a robust pipeline. This pipeline reliably synchronizes real-time streams with historical snapshots, ensuring **seamless**, **scalable consistency** between your live workloads and analytical storage.
109 KB
Loading
280 KB
Loading
176 KB
Loading
250 KB
Loading
157 KB
Loading
269 KB
Loading

website/blog/authors.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,10 @@ yuxia:
3232
name: Luo Yuxia
3333
title: Fluss Committer
3434
url: https://github.com/luoyuxia
35-
image_url: https://github.com/luoyuxia.png
35+
image_url: https://github.com/luoyuxia.png
36+
37+
gyang94:
38+
name: GUO Yang
39+
title: Fluss Contributor
40+
url: https://github.com/gyang94
41+
image_url: https://github.com/gyang94.png

0 commit comments

Comments
 (0)