Skip to content

Commit c9dcf80

Browse files
xx789633wuchong
authored andcommitted
[doc] Separate Quickstart into "Building a Streaming Lakehouse" and "Real-Time Analytics with Flink" (#1924)
(cherry picked from commit 8faff0e)
1 parent 029a39d commit c9dcf80

File tree

2 files changed

+327
-310
lines changed

2 files changed

+327
-310
lines changed

website/docs/quickstart/flink.md

Lines changed: 18 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
---
2-
title: Real-Time Analytics with Flink (Paimon)
2+
title: Real-Time Analytics with Flink
33
sidebar_position: 1
44
---
55

6-
# Real-Time Analytics With Flink (Paimon)
76

8-
This guide will get you up and running with Apache Flink to do real-time analytics, covering some powerful features of Fluss,
9-
including integrating with Paimon.
7+
# Real-Time Analytics With Flink
8+
9+
This guide will get you up and running with Apache Flink to do real-time analytics, covering some powerful features of Fluss.
1010
The guide is derived from [TPC-H](https://www.tpc.org/tpch/) **Q5**.
1111

1212
For more information on working with Flink, refer to the [Apache Flink Engine](engine-flink/getting-started.md) section.
1313

1414
## Environment Setup
15+
1516
### Prerequisites
1617

1718
Before proceeding with this guide, ensure that [Docker](https://docs.docker.com/engine/install/) and the [Docker Compose plugin](https://docs.docker.com/compose/install/linux/) are installed on your machine.
@@ -49,11 +50,6 @@ services:
4950
zookeeper.address: zookeeper:2181
5051
bind.listeners: FLUSS://coordinator-server:9123
5152
remote.data.dir: /tmp/fluss/remote-data
52-
datalake.format: paimon
53-
datalake.paimon.metastore: filesystem
54-
datalake.paimon.warehouse: /tmp/paimon
55-
volumes:
56-
- shared-tmpfs:/tmp/paimon
5753
tablet-server:
5854
image: apache/fluss:$FLUSS_DOCKER_VERSION$
5955
command: tabletServer
@@ -67,11 +63,6 @@ services:
6763
data.dir: /tmp/fluss/data
6864
remote.data.dir: /tmp/fluss/remote-data
6965
kv.snapshot.interval: 0s
70-
datalake.format: paimon
71-
datalake.paimon.metastore: filesystem
72-
datalake.paimon.warehouse: /tmp/paimon
73-
volumes:
74-
- shared-tmpfs:/tmp/paimon
7566
zookeeper:
7667
restart: always
7768
image: zookeeper:3.9.2
@@ -86,8 +77,6 @@ services:
8677
- |
8778
FLINK_PROPERTIES=
8879
jobmanager.rpc.address: jobmanager
89-
volumes:
90-
- shared-tmpfs:/tmp/paimon
9180
taskmanager:
9281
image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$
9382
depends_on:
@@ -100,24 +89,15 @@ services:
10089
taskmanager.numberOfTaskSlots: 10
10190
taskmanager.memory.process.size: 2048m
10291
taskmanager.memory.framework.off-heap.size: 256m
103-
volumes:
104-
- shared-tmpfs:/tmp/paimon
10592
#end
106-
107-
volumes:
108-
shared-tmpfs:
109-
driver: local
110-
driver_opts:
111-
type: "tmpfs"
112-
device: "tmpfs"
11393
```
11494

11595
The Docker Compose environment consists of the following containers:
11696
- **Fluss Cluster:** a Fluss `CoordinatorServer`, a Fluss `TabletServer` and a `ZooKeeper` server.
11797
- **Flink Cluster**: a Flink `JobManager` and a Flink `TaskManager` container to execute queries.
11898

11999
**Note:** The `apache/fluss-quickstart-flink` image is based on [flink:1.20.1-java17](https://hub.docker.com/layers/library/flink/1.20-java17/images/sha256:bf1af6406c4f4ad8faa46efe2b3d0a0bf811d1034849c42c1e3484712bc83505) and
120-
includes the [fluss-flink](engine-flink/getting-started.md), [paimon-flink](https://paimon.apache.org/docs/1.0/flink/quick-start/) and
100+
includes the [fluss-flink](engine-flink/getting-started.md) and
121101
[flink-connector-faker](https://flink-packages.org/packages/flink-faker) to simplify this guide.
122102

123103
3. To start all containers, run:
@@ -136,7 +116,7 @@ You can also visit http://localhost:8083/ to see if Flink is running normally.
136116

137117
:::note
138118
- If you want to additionally use an observability stack, follow one of the provided quickstart guides [here](maintenance/observability/quickstart.md) and then continue with this guide.
139-
- If you want to run with your own Flink environment, remember to download the [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases), [paimon-flink connector jar](https://paimon.apache.org/docs/1.0/flink/quick-start/) and then put them to `FLINK_HOME/lib/`.
119+
- If you want to run with your own Flink environment, remember to download the [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases) and then put them to `FLINK_HOME/lib/`.
140120
- All the following commands involving `docker compose` should be executed in the created working directory that contains the `docker-compose.yml` file.
141121
:::
142122

@@ -164,6 +144,7 @@ SHOW CREATE TABLE source_order;
164144
SHOW CREATE TABLE source_nation;
165145
```
166146

147+
167148
## Create Fluss Tables
168149
### Create Fluss Catalog
169150
Use the following SQL to create a Fluss catalog:
@@ -185,6 +166,7 @@ For further information how to store catalog configurations, see [Flink's Catalo
185166

186167
### Create Tables
187168
Running the following SQL to create Fluss tables to be used in this guide:
169+
188170
```sql title="Flink SQL"
189171
CREATE TABLE fluss_order (
190172
`order_key` BIGINT,
@@ -252,15 +234,15 @@ the `fluss_orders` table with information from the `fluss_customer` and `fluss_n
252234

253235
```sql title="Flink SQL"
254236
INSERT INTO enriched_orders
255-
SELECT o.order_key,
256-
o.cust_key,
237+
SELECT o.order_key,
238+
o.cust_key,
257239
o.total_price,
258-
o.order_date,
240+
o.order_date,
259241
o.order_priority,
260242
o.clerk,
261243
c.name,
262244
c.phone,
263-
c.acctbal,
245+
c.acctbal,
264246
c.mktsegment,
265247
n.name
266248
FROM fluss_order o
@@ -270,7 +252,6 @@ LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
270252
ON c.nation_key = n.nation_key;
271253
```
272254

273-
274255
## Run Ad-hoc Queries on Fluss Tables
275256
You can now perform real-time analytics directly on Fluss tables.
276257
For instance, to calculate the number of orders placed by a specific customer, you can execute the following SQL query to obtain instant, real-time results.
@@ -285,6 +266,11 @@ SET 'sql-client.execution.result-mode' = 'tableau';
285266
SET 'execution.runtime-mode' = 'batch';
286267
```
287268

269+
```sql title="Flink SQL"
270+
-- execute DML job synchronously
271+
SET 'table.dml-sync' = 'true';
272+
```
273+
288274
```sql title="Flink SQL"
289275
-- use limit to query the enriched_orders table
290276
SELECT * FROM enriched_orders LIMIT 2;
@@ -346,160 +332,6 @@ The following SQL query should return an empty result.
346332
SELECT * FROM fluss_customer WHERE `cust_key` = 1;
347333
```
348334

349-
## Integrate with Paimon
350-
### Start the Lakehouse Tiering Service
351-
To integrate with [Apache Paimon](https://paimon.apache.org/), you need to start the `Lakehouse Tiering Service`.
352-
Open a new terminal, navigate to the `fluss-quickstart-flink` directory, and execute the following command within this directory to start the service:
353-
```shell
354-
docker compose exec jobmanager \
355-
/opt/flink/bin/flink run \
356-
/opt/flink/opt/fluss-flink-tiering-$FLUSS_VERSION$.jar \
357-
--fluss.bootstrap.servers coordinator-server:9123 \
358-
--datalake.format paimon \
359-
--datalake.paimon.metastore filesystem \
360-
--datalake.paimon.warehouse /tmp/paimon
361-
```
362-
You should see a Flink Job to tier data from Fluss to Paimon running in the [Flink Web UI](http://localhost:8083/).
363-
364-
### Streaming into Fluss datalake-enabled tables
365-
366-
By default, tables are created with data lake integration disabled, meaning the Lakehouse Tiering Service will not tier the table's data to the data lake.
367-
368-
To enable lakehouse functionality as a tiered storage solution for a table, you must create the table with the configuration option `table.datalake.enabled = true`.
369-
Return to the `SQL client` and execute the following SQL statement to create a table with data lake integration enabled:
370-
```sql title="Flink SQL"
371-
CREATE TABLE datalake_enriched_orders (
372-
`order_key` BIGINT,
373-
`cust_key` INT NOT NULL,
374-
`total_price` DECIMAL(15, 2),
375-
`order_date` DATE,
376-
`order_priority` STRING,
377-
`clerk` STRING,
378-
`cust_name` STRING,
379-
`cust_phone` STRING,
380-
`cust_acctbal` DECIMAL(15, 2),
381-
`cust_mktsegment` STRING,
382-
`nation_name` STRING,
383-
PRIMARY KEY (`order_key`) NOT ENFORCED
384-
) WITH (
385-
'table.datalake.enabled' = 'true',
386-
'table.datalake.freshness' = '30s'
387-
);
388-
```
389-
390-
Next, perform streaming data writing into the **datalake-enabled** table, `datalake_enriched_orders`:
391-
```sql title="Flink SQL"
392-
-- switch to streaming mode
393-
SET 'execution.runtime-mode' = 'streaming';
394-
```
395-
396-
```sql title="Flink SQL"
397-
-- insert tuples into datalake_enriched_orders
398-
INSERT INTO datalake_enriched_orders
399-
SELECT o.order_key,
400-
o.cust_key,
401-
o.total_price,
402-
o.order_date,
403-
o.order_priority,
404-
o.clerk,
405-
c.name,
406-
c.phone,
407-
c.acctbal,
408-
c.mktsegment,
409-
n.name
410-
FROM fluss_order o
411-
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
412-
ON o.cust_key = c.cust_key
413-
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
414-
ON c.nation_key = n.nation_key;
415-
```
416-
417-
### Real-Time Analytics on Fluss datalake-enabled Tables
418-
419-
The data for the `datalake_enriched_orders` table is stored in Fluss (for real-time data) and Paimon (for historical data).
420-
421-
When querying the `datalake_enriched_orders` table, Fluss uses a union operation that combines data from both Fluss and Paimon to provide a complete result set -- combines **real-time** and **historical** data.
422-
423-
If you wish to query only the data stored in Paimon—offering high-performance access without the overhead of unioning data—you can use the `datalake_enriched_orders$lake` table by appending the `$lake` suffix.
424-
This approach also enables all the optimizations and features of a Flink Paimon table source, including [system table](https://paimon.apache.org/docs/master/concepts/system-tables/) such as `datalake_enriched_orders$lake$snapshots`.
425-
426-
To query the snapshots directly from Paimon, use the following SQL:
427-
```sql title="Flink SQL"
428-
-- switch to batch mode
429-
SET 'execution.runtime-mode' = 'batch';
430-
```
431-
432-
```sql title="Flink SQL"
433-
-- query snapshots in paimon
434-
SELECT snapshot_id, total_record_count FROM datalake_enriched_orders$lake$snapshots;
435-
```
436-
437-
**Sample Output:**
438-
```shell
439-
+-------------+--------------------+
440-
| snapshot_id | total_record_count |
441-
+-------------+--------------------+
442-
| 1 | 650 |
443-
+-------------+--------------------+
444-
```
445-
**Note:** Make sure to wait for the configured `datalake.freshness` (~30s) to complete before querying the snapshots, otherwise the result will be empty.
446-
447-
Run the following SQL to do analytics on Paimon data:
448-
```sql title="Flink SQL"
449-
-- to sum prices of all orders in paimon
450-
SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake;
451-
```
452-
**Sample Output:**
453-
```shell
454-
+------------+
455-
| sum_price |
456-
+------------+
457-
| 1669519.92 |
458-
+------------+
459-
```
460-
461-
To achieve results with sub-second data freshness, you can query the table directly, which seamlessly unifies data from both Fluss and Paimon:
462-
```sql title="Flink SQL"
463-
-- to sum prices of all orders in fluss and paimon
464-
SELECT sum(total_price) as sum_price FROM datalake_enriched_orders;
465-
```
466-
The result looks like:
467-
```
468-
+------------+
469-
| sum_price |
470-
+------------+
471-
| 1777908.36 |
472-
+------------+
473-
```
474-
You can execute the real-time analytics query multiple times, and the results will vary with each run as new data is continuously written to Fluss in real-time.
475-
476-
Finally, you can use the following command to view the files stored in Paimon:
477-
```shell
478-
docker compose exec taskmanager tree /tmp/paimon/fluss.db
479-
```
480-
481-
**Sample Output:**
482-
```shell
483-
/tmp/paimon/fluss.db
484-
└── datalake_enriched_orders
485-
├── bucket-0
486-
│ ├── changelog-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-0.orc
487-
│ └── data-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-1.orc
488-
├── manifest
489-
│ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-0
490-
│ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-1
491-
│ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-0
492-
│ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-1
493-
│ └── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-2
494-
├── schema
495-
│ └── schema-0
496-
└── snapshot
497-
├── EARLIEST
498-
├── LATEST
499-
└── snapshot-1
500-
```
501-
The files adhere to Paimon's standard format, enabling seamless querying with other engines such as [StarRocks](https://docs.starrocks.io/docs/data_source/catalog/paimon_catalog/).
502-
503335
## Clean up
504336
After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and then run
505337
```shell

0 commit comments

Comments
 (0)