You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This guide will get you up and running with Apache Flink to do real-time analytics, covering some powerful features of Fluss,
9
-
including integrating with Paimon.
7
+
# Real-Time Analytics With Flink
8
+
9
+
This guide will get you up and running with Apache Flink to do real-time analytics, covering some powerful features of Fluss.
10
10
The guide is derived from [TPC-H](https://www.tpc.org/tpch/)**Q5**.
11
11
12
12
For more information on working with Flink, refer to the [Apache Flink Engine](engine-flink/getting-started.md) section.
13
13
14
14
## Environment Setup
15
+
15
16
### Prerequisites
16
17
17
18
Before proceeding with this guide, ensure that [Docker](https://docs.docker.com/engine/install/) and the [Docker Compose plugin](https://docs.docker.com/compose/install/linux/) are installed on your machine.
The Docker Compose environment consists of the following containers:
116
96
-**Fluss Cluster:** a Fluss `CoordinatorServer`, a Fluss `TabletServer` and a `ZooKeeper` server.
117
97
-**Flink Cluster**: a Flink `JobManager` and a Flink `TaskManager` container to execute queries.
118
98
119
99
**Note:** The `apache/fluss-quickstart-flink` image is based on [flink:1.20.1-java17](https://hub.docker.com/layers/library/flink/1.20-java17/images/sha256:bf1af6406c4f4ad8faa46efe2b3d0a0bf811d1034849c42c1e3484712bc83505) and
120
-
includes the [fluss-flink](engine-flink/getting-started.md), [paimon-flink](https://paimon.apache.org/docs/1.0/flink/quick-start/) and
100
+
includes the [fluss-flink](engine-flink/getting-started.md) and
121
101
[flink-connector-faker](https://flink-packages.org/packages/flink-faker) to simplify this guide.
122
102
123
103
3. To start all containers, run:
@@ -136,7 +116,7 @@ You can also visit http://localhost:8083/ to see if Flink is running normally.
136
116
137
117
:::note
138
118
- If you want to additionally use an observability stack, follow one of the provided quickstart guides [here](maintenance/observability/quickstart.md) and then continue with this guide.
139
-
- If you want to run with your own Flink environment, remember to download the [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases), [paimon-flink connector jar](https://paimon.apache.org/docs/1.0/flink/quick-start/) and then put them to `FLINK_HOME/lib/`.
119
+
- If you want to run with your own Flink environment, remember to download the [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases) and then put them to `FLINK_HOME/lib/`.
140
120
- All the following commands involving `docker compose` should be executed in the created working directory that contains the `docker-compose.yml` file.
141
121
:::
142
122
@@ -164,6 +144,7 @@ SHOW CREATE TABLE source_order;
164
144
SHOW CREATE TABLE source_nation;
165
145
```
166
146
147
+
167
148
## Create Fluss Tables
168
149
### Create Fluss Catalog
169
150
Use the following SQL to create a Fluss catalog:
@@ -185,6 +166,7 @@ For further information how to store catalog configurations, see [Flink's Catalo
185
166
186
167
### Create Tables
187
168
Running the following SQL to create Fluss tables to be used in this guide:
169
+
188
170
```sql title="Flink SQL"
189
171
CREATETABLEfluss_order (
190
172
`order_key`BIGINT,
@@ -252,15 +234,15 @@ the `fluss_orders` table with information from the `fluss_customer` and `fluss_n
252
234
253
235
```sql title="Flink SQL"
254
236
INSERT INTO enriched_orders
255
-
SELECTo.order_key,
256
-
o.cust_key,
237
+
SELECTo.order_key,
238
+
o.cust_key,
257
239
o.total_price,
258
-
o.order_date,
240
+
o.order_date,
259
241
o.order_priority,
260
242
o.clerk,
261
243
c.name,
262
244
c.phone,
263
-
c.acctbal,
245
+
c.acctbal,
264
246
c.mktsegment,
265
247
n.name
266
248
FROM fluss_order o
@@ -270,7 +252,6 @@ LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
270
252
ONc.nation_key=n.nation_key;
271
253
```
272
254
273
-
274
255
## Run Ad-hoc Queries on Fluss Tables
275
256
You can now perform real-time analytics directly on Fluss tables.
276
257
For instance, to calculate the number of orders placed by a specific customer, you can execute the following SQL query to obtain instant, real-time results.
@@ -285,6 +266,11 @@ SET 'sql-client.execution.result-mode' = 'tableau';
285
266
SET'execution.runtime-mode'='batch';
286
267
```
287
268
269
+
```sql title="Flink SQL"
270
+
-- execute DML job synchronously
271
+
SET'table.dml-sync'='true';
272
+
```
273
+
288
274
```sql title="Flink SQL"
289
275
-- use limit to query the enriched_orders table
290
276
SELECT*FROM enriched_orders LIMIT2;
@@ -346,160 +332,6 @@ The following SQL query should return an empty result.
346
332
SELECT*FROM fluss_customer WHERE`cust_key`=1;
347
333
```
348
334
349
-
## Integrate with Paimon
350
-
### Start the Lakehouse Tiering Service
351
-
To integrate with [Apache Paimon](https://paimon.apache.org/), you need to start the `Lakehouse Tiering Service`.
352
-
Open a new terminal, navigate to the `fluss-quickstart-flink` directory, and execute the following command within this directory to start the service:
You should see a Flink Job to tier data from Fluss to Paimon running in the [Flink Web UI](http://localhost:8083/).
363
-
364
-
### Streaming into Fluss datalake-enabled tables
365
-
366
-
By default, tables are created with data lake integration disabled, meaning the Lakehouse Tiering Service will not tier the table's data to the data lake.
367
-
368
-
To enable lakehouse functionality as a tiered storage solution for a table, you must create the table with the configuration option `table.datalake.enabled = true`.
369
-
Return to the `SQL client` and execute the following SQL statement to create a table with data lake integration enabled:
370
-
```sql title="Flink SQL"
371
-
CREATETABLEdatalake_enriched_orders (
372
-
`order_key`BIGINT,
373
-
`cust_key`INTNOT NULL,
374
-
`total_price`DECIMAL(15, 2),
375
-
`order_date`DATE,
376
-
`order_priority` STRING,
377
-
`clerk` STRING,
378
-
`cust_name` STRING,
379
-
`cust_phone` STRING,
380
-
`cust_acctbal`DECIMAL(15, 2),
381
-
`cust_mktsegment` STRING,
382
-
`nation_name` STRING,
383
-
PRIMARY KEY (`order_key`) NOT ENFORCED
384
-
) WITH (
385
-
'table.datalake.enabled'='true',
386
-
'table.datalake.freshness'='30s'
387
-
);
388
-
```
389
-
390
-
Next, perform streaming data writing into the **datalake-enabled** table, `datalake_enriched_orders`:
391
-
```sql title="Flink SQL"
392
-
-- switch to streaming mode
393
-
SET'execution.runtime-mode'='streaming';
394
-
```
395
-
396
-
```sql title="Flink SQL"
397
-
-- insert tuples into datalake_enriched_orders
398
-
INSERT INTO datalake_enriched_orders
399
-
SELECTo.order_key,
400
-
o.cust_key,
401
-
o.total_price,
402
-
o.order_date,
403
-
o.order_priority,
404
-
o.clerk,
405
-
c.name,
406
-
c.phone,
407
-
c.acctbal,
408
-
c.mktsegment,
409
-
n.name
410
-
FROM fluss_order o
411
-
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime`AS`c`
412
-
ONo.cust_key=c.cust_key
413
-
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime`AS`n`
414
-
ONc.nation_key=n.nation_key;
415
-
```
416
-
417
-
### Real-Time Analytics on Fluss datalake-enabled Tables
418
-
419
-
The data for the `datalake_enriched_orders` table is stored in Fluss (for real-time data) and Paimon (for historical data).
420
-
421
-
When querying the `datalake_enriched_orders` table, Fluss uses a union operation that combines data from both Fluss and Paimon to provide a complete result set -- combines **real-time** and **historical** data.
422
-
423
-
If you wish to query only the data stored in Paimon—offering high-performance access without the overhead of unioning data—you can use the `datalake_enriched_orders$lake` table by appending the `$lake` suffix.
424
-
This approach also enables all the optimizations and features of a Flink Paimon table source, including [system table](https://paimon.apache.org/docs/master/concepts/system-tables/) such as `datalake_enriched_orders$lake$snapshots`.
425
-
426
-
To query the snapshots directly from Paimon, use the following SQL:
427
-
```sql title="Flink SQL"
428
-
-- switch to batch mode
429
-
SET'execution.runtime-mode'='batch';
430
-
```
431
-
432
-
```sql title="Flink SQL"
433
-
-- query snapshots in paimon
434
-
SELECT snapshot_id, total_record_count FROM datalake_enriched_orders$lake$snapshots;
435
-
```
436
-
437
-
**Sample Output:**
438
-
```shell
439
-
+-------------+--------------------+
440
-
| snapshot_id | total_record_count |
441
-
+-------------+--------------------+
442
-
| 1 | 650 |
443
-
+-------------+--------------------+
444
-
```
445
-
**Note:** Make sure to wait for the configured `datalake.freshness` (~30s) to complete before querying the snapshots, otherwise the result will be empty.
446
-
447
-
Run the following SQL to do analytics on Paimon data:
448
-
```sql title="Flink SQL"
449
-
-- to sum prices of all orders in paimon
450
-
SELECTsum(total_price) as sum_price FROM datalake_enriched_orders$lake;
451
-
```
452
-
**Sample Output:**
453
-
```shell
454
-
+------------+
455
-
| sum_price |
456
-
+------------+
457
-
| 1669519.92 |
458
-
+------------+
459
-
```
460
-
461
-
To achieve results with sub-second data freshness, you can query the table directly, which seamlessly unifies data from both Fluss and Paimon:
462
-
```sql title="Flink SQL"
463
-
-- to sum prices of all orders in fluss and paimon
464
-
SELECTsum(total_price) as sum_price FROM datalake_enriched_orders;
465
-
```
466
-
The result looks like:
467
-
```
468
-
+------------+
469
-
| sum_price |
470
-
+------------+
471
-
| 1777908.36 |
472
-
+------------+
473
-
```
474
-
You can execute the real-time analytics query multiple times, and the results will vary with each run as new data is continuously written to Fluss in real-time.
475
-
476
-
Finally, you can use the following command to view the files stored in Paimon:
477
-
```shell
478
-
docker compose exec taskmanager tree /tmp/paimon/fluss.db
The files adhere to Paimon's standard format, enabling seamless querying with other engines such as [StarRocks](https://docs.starrocks.io/docs/data_source/catalog/paimon_catalog/).
502
-
503
335
## Clean up
504
336
After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and then run
0 commit comments