Skip to content

Commit c3cc1d6

Browse files
Mehul BatraMehul Batra
authored andcommitted
shading changed & union read for append table
1 parent 5582b0b commit c3cc1d6

File tree

2 files changed

+37
-33
lines changed

2 files changed

+37
-33
lines changed

fluss-lake/fluss-lake-iceberg/pom.xml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -319,15 +319,20 @@
319319
</includes>
320320
</artifactSet>
321321
<relocations>
322-
<!-- Shade Jackson to match Iceberg's shaded package -->
322+
<!-- Shade Iceberg to Fluss namespace for complete isolation -->
323+
<relocation>
324+
<pattern>org.apache.iceberg</pattern>
325+
<shadedPattern>org.apache.fluss.lake.iceberg.shaded.org.apache.iceberg</shadedPattern>
326+
</relocation>
327+
<!-- Shade Jackson to Fluss namespace -->
323328
<relocation>
324329
<pattern>com.fasterxml.jackson</pattern>
325-
<shadedPattern>org.apache.iceberg.shaded.com.fasterxml.jackson</shadedPattern>
330+
<shadedPattern>org.apache.fluss.lake.iceberg.shaded.com.fasterxml.jackson</shadedPattern>
326331
</relocation>
327-
<!-- Shade Parquet to match Iceberg's shaded package -->
332+
<!-- Shade Parquet to Fluss namespace -->
328333
<relocation>
329334
<pattern>org.apache.parquet</pattern>
330-
<shadedPattern>org.apache.iceberg.shaded.org.apache.parquet</shadedPattern>
335+
<shadedPattern>org.apache.fluss.lake.iceberg.shaded.org.apache.parquet</shadedPattern>
331336
</relocation>
332337
</relocations>
333338
<filters>

website/docs/quickstart/flink-iceberg.md

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ services:
6363

6464
coordinator-server:
6565
image: fluss/fluss:$FLUSS_DOCKER_VERSION$
66-
command: coordinatorServer
6766
depends_on:
6867
- zookeeper
6968
environment:
@@ -401,8 +400,7 @@ CREATE TABLE datalake_enriched_orders (
401400
`cust_phone` STRING,
402401
`cust_acctbal` DECIMAL(15, 2),
403402
`cust_mktsegment` STRING,
404-
`nation_name` STRING,
405-
PRIMARY KEY (`order_key`) NOT ENFORCED
403+
`nation_name` STRING
406404
) WITH (
407405
'table.datalake.enabled' = 'true',
408406
'table.datalake.freshness' = '30s'
@@ -429,11 +427,14 @@ SELECT o.order_key,
429427
c.acctbal,
430428
c.mktsegment,
431429
n.name
432-
FROM fluss_order o
433-
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
434-
ON o.cust_key = c.cust_key
435-
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
436-
ON c.nation_key = n.nation_key;
430+
FROM (
431+
SELECT *, PROCTIME() as ptime
432+
FROM `default_catalog`.`default_database`.source_order
433+
) o
434+
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF o.ptime AS c
435+
ON o.cust_key = c.cust_key
436+
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF o.ptime AS n
437+
ON c.nation_key = n.nation_key;
437438
```
438439

439440
### Real-Time Analytics on Fluss datalake-enabled Tables
@@ -459,11 +460,12 @@ SELECT snapshot_id, operation FROM datalake_enriched_orders$lake$snapshots;
459460

460461
**Sample Output:**
461462
```shell
462-
+-------------+--------------------+
463-
| snapshot_id | operation |
464-
+-------------+--------------------+
465-
| 1 | append |
466-
+-------------+--------------------+
463+
+---------------------+-----------+
464+
| snapshot_id | operation |
465+
+---------------------+-----------+
466+
| 7792523713868625335 | append |
467+
| 7960217942125627573 | append |
468+
+---------------------+-----------+
467469
```
468470
**Note:** Make sure to wait for the configured `datalake.freshness` (~30s) to complete before querying the snapshots, otherwise the result will be empty.
469471

@@ -474,30 +476,27 @@ SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake;
474476
```
475477
**Sample Output:**
476478
```shell
477-
+------------+
478-
| sum_price |
479-
+------------+
480-
| 1669519.92 |
481-
+------------+
479+
+-----------+
480+
| sum_price |
481+
+-----------+
482+
| 432880.93 |
483+
+-----------+
482484
```
483485

484486
To achieve results with sub-second data freshness, you can query the table directly, which seamlessly unifies data from both Fluss and Iceberg:
485487

486488
```sql title="Flink SQL"
487-
-- all orders (combining fluss and iceberg data)
488-
> SELECT * FROM datalake_enriched_orders limit 2;
489+
-- to sum prices of all orders (combining fluss and iceberg data)
490+
> SELECT sum(total_price) as sum_price FROM datalake_enriched_orders;
489491
```
490492

491493
**Sample Output:**
492494
```shell
493-
494-
+-----------+----------+-------------+------------+----------------+--------+--------------+----------------+--------------+-----------------+-------------+
495-
| order_key | cust_key | total_price | order_date | order_priority | clerk | cust_name | cust_phone | cust_acctbal | cust_mktsegment | nation_name |
496-
+-----------+----------+-------------+------------+----------------+--------+--------------+----------------+--------------+-----------------+-------------+
497-
| 77733888 | 12 | 412.61 | 2025-07-11 | high | Clerk3 | Frank Furter | 1-203-732-5680 | 623.86 | HOUSEHOLD | CHINA |
498-
| 340736 | 1 | 111.26 | 2025-09-05 | low | Clerk1 | <NULL> | <NULL> | <NULL> | <NULL> | <NULL> |
499-
+-----------+----------+-------------+------------+----------------+--------+--------------+----------------+--------------+-----------------+-------------+
500-
495+
+-----------+
496+
| sum_price |
497+
+-----------+
498+
| 558660.03 |
499+
+-----------+
501500
```
502501

503502
You can execute the real-time analytics query multiple times, and the results will vary with each run as new data is continuously written to Fluss in real-time.
@@ -509,7 +508,7 @@ docker compose exec taskmanager tree /tmp/iceberg/fluss.db
509508

510509
**Sample Output:**
511510
```shell
512-
/tmp/iceberg/fluss.db
511+
/tmp/iceberg/fluss
513512
└── datalake_enriched_orders
514513
├── data
515514
│ └── 00000-0-abc123.parquet

0 commit comments

Comments
 (0)