|
| 1 | +--- |
| 2 | +title: Iceberg |
| 3 | +sidebar_position: 2 |
| 4 | +--- |
| 5 | + |
| 6 | +# Iceberg |
| 7 | + |
| 8 | +## Introduction |
| 9 | + |
| 10 | +[Apache Iceberg](https://iceberg.apache.org/) is an open table format for huge analytic datasets. It provides ACID transactions, schema evolution, and efficient data organization for data lakes. |
| 11 | +To integrate Fluss with Iceberg, you must enable lakehouse storage and configure Iceberg as the lakehouse storage. For more details, see [Enable Lakehouse Storage](maintenance/tiered-storage/lakehouse-storage.md#enable-lakehouse-storage). |
| 12 | + |
| 13 | +> **NOTE**: Iceberg requires JDK11 or later. Please ensure that both your Fluss deployment and the Flink cluster used for tiering services are running on JDK11+. |
| 14 | +
|
| 15 | + |
| 16 | +## Configure Iceberg as LakeHouse Storage |
| 17 | + |
| 18 | +### Configure Iceberg in Cluster Configurations |
| 19 | + |
| 20 | +To configure Iceberg as the lakehouse storage, you must configure the following configurations in `server.yaml`: |
| 21 | +```yaml |
| 22 | +# Iceberg configuration |
| 23 | +datalake.format: iceberg |
| 24 | + |
| 25 | +# the catalog config about Iceberg, assuming using Hadoop catalog, |
| 26 | +datalake.iceberg.type: hadoop |
| 27 | +datalake.iceberg.warehouse: /tmp/iceberg |
| 28 | +``` |
| 29 | +
|
| 30 | +Fluss processes Iceberg configurations by stripping the `datalake.iceberg.` prefix and uses the stripped configurations (without the prefix `datalake.iceberg.`) to initialize the Iceberg catalog. |
| 31 | +This approach enables passing custom configurations for iceberg catalog initiation. Checkout the [Iceberg Catalog Properties](https://iceberg.apache.org/docs/1.9.1/configuration/#catalog-properties) for more details on the available configurations of catalog. |
| 32 | + |
| 33 | +Fluss supports all Iceberg-compatible catalog types. For catalogs such as `hive`, `hadoop`, `rest`, `glue`, `nessie`, and `jdbc`, you can specify them using the configuration `datalake.iceberg.type` with the corresponding value (e.g., `hive`, `hadoop`, etc.). |
| 34 | +For other types of catalogs, you can use `datalake.iceberg.catalog-impl: <your_iceberg_catalog_impl_class_name>` to specify the catalog implementation. |
| 35 | +For example, configure with `datalake.iceberg.catalog-impl: org.apache.iceberg.snowflake.SnowflakeCatalog` to use Snowflake catalog. |
| 36 | + |
| 37 | +> **NOTE**: |
| 38 | +> 1: Some catalog requires Hadoop related classes such as `hadoop`, `hive` catalog. Make sure hadoop related classes are in your classpath. You can either download from [pre-bundled Hadoop jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) |
| 39 | +or [hadoop.tar.gz](https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz) which required to be unzipped. Then put hadoop related jars into `FLUSS_HOME/plugins/iceberg`. |
| 40 | +> 2: Fluss only bundles the catalog implementation included in `iceberg-core` module. For any other catalog implementations not bundled within `iceberg-core` module (e.g., Hive Catalog), you must place the corresponding JAR file into the into `FLUSS_HOME/plugins/iceberg`. |
| 41 | +> 3: The version if Iceberg that Fluss bundles is based on `1.9.2`, please make sure the jars you put is compatible with `Iceberg-1.9.2` |
| 42 | + |
| 43 | +### Start Tiering Service to Iceberg |
| 44 | + |
| 45 | +Then, you must start the datalake tiering service to tier Fluss's data to Iceberg. For guidance, you can refer to [Start The Datalake Tiering Service |
| 46 | +](maintenance/tiered-storage/lakehouse-storage.md#start-the-datalake-tiering-service). Although the example uses Paimon, the process is also applicable to Iceberg. |
| 47 | + |
| 48 | +However, for the [Prepare required jars](maintenance/tiered-storage/lakehouse-storage.md#prepare-required-jars) step, adhere to the dependency management guidelines listed below: |
| 49 | +- Put [fluss-flink connector jar](/downloads) into `${FLINK_HOME}/lib`, you should choose a connector version matching your Flink version. If you're using Flink 1.20, please use [fluss-flink-1.20-$FLUSS_VERSION$.jar](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-1.20/$FLUSS_VERSION$/fluss-flink-1.20-$FLUSS_VERSION$.jar) |
| 50 | +- If you are using [Amazon S3](http://aws.amazon.com/s3/), [Aliyun OSS](https://www.aliyun.com/product/oss) or [HDFS(Hadoop Distributed File System)](https://hadoop.apache.org/docs/stable/) as Fluss's [remote storage](maintenance/tiered-storage/remote-storage.md), |
| 51 | + you should download the corresponding [Fluss filesystem jar](/downloads#filesystem-jars) and also put it into `${FLINK_HOME}/lib` |
| 52 | +- Put [fluss-lake-iceberg jar](https://repo1.maven.org/maven2/org/apache/fluss/fluss-lake-iceberg/$FLUSS_VERSION$/fluss-lake-iceberg-$FLUSS_VERSION$.jar) into `${FLINK_HOME}/lib` |
| 53 | +- Put the jars required by Iceberg Catalog into `${FLINK_HOME}/lib`. For example, if you are using Hive catalog, you should put [iceberg-hive-metastore](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-metastore/1.9.2/iceberg-hive-metastore-1.9.2.jar), hadoop related jars, hive related jars into `${FLINK_HOME}/lib` |
| 54 | +- Put the jars required by Iceberg FileIO into `${FLINK_HOME}/lib`. For example, if your Iceberg is backed by S3, you should put [iceberg-aws-bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle/1.9.2) into `${FLINK_HOME}/lib` |
| 55 | + |
| 56 | +Additionally, when following the [Start Datalake Tiering Service](maintenance/tiered-storage/lakehouse-storage.md#start-datalake-tiering-service) guide, make sure to use Iceberg-specific configurations as parameters when starting the Flink tiering job: |
| 57 | +```shell |
| 58 | +<FLINK_HOME>/bin/flink run /path/to/fluss-flink-tiering-$FLUSS_VERSION$.jar \ |
| 59 | + --fluss.bootstrap.servers localhost:9123 \ |
| 60 | + --datalake.format iceberg \ |
| 61 | + --datalake.iceberg.type hadoop \ |
| 62 | + --datalake.iceberg.warehouse /tmp/iceberg |
| 63 | +``` |
| 64 | + |
| 65 | +## Table Mapping Between Fluss and Iceberg |
| 66 | + |
| 67 | +When a Fluss table is created or altered with the option `'table.datalake.enabled' = 'true'` and configured with Iceberg as the datalake format, Fluss will automatically create a corresponding Iceberg table with the same table path. |
| 68 | + |
| 69 | +The schema of the Iceberg table matches that of the Fluss table, except for the addition of three system columns at the end: `__bucket`, `__offset`, and `__timestamp`. |
| 70 | +These system columns help Fluss clients consume data from Iceberg in a streaming fashion, such as seeking by a specific bucket using an offset or timestamp. |
| 71 | + |
| 72 | +Here is an example using Flink SQL to create a table with data lake enabled: |
| 73 | +```sql title="Flink SQL" |
| 74 | +USE CATALOG fluss_catalog; |
| 75 | +
|
| 76 | +CREATE TABLE fluss_order_with_lake ( |
| 77 | + `order_key` BIGINT, |
| 78 | + `cust_key` INT NOT NULL, |
| 79 | + `total_price` DECIMAL(15, 2), |
| 80 | + `order_date` DATE, |
| 81 | + `order_priority` STRING, |
| 82 | + `clerk` STRING, |
| 83 | + `ptime` AS PROCTIME(), |
| 84 | + PRIMARY KEY (`order_key`) NOT ENFORCED |
| 85 | + ) WITH ( |
| 86 | + 'table.datalake.enabled' = 'true', |
| 87 | + 'table.datalake.freshness' = '30s' |
| 88 | +); |
| 89 | +``` |
| 90 | + |
| 91 | +You can also specify Iceberg [table properties](https://iceberg.apache.org/docs/latest/configuration/#table-properties) when creating a datalake-enabled Fluss table by using the `iceberg.` prefix within the Fluss table properties clause. |
| 92 | +Here is an example to change iceberg format to `orc` and set `commit.retry.num-retries` to `5`: |
| 93 | +```sql title="Flink SQL" |
| 94 | +CREATE TABLE fluss_order_with_lake ( |
| 95 | + `order_key` BIGINT, |
| 96 | + `cust_key` INT NOT NULL, |
| 97 | + `total_price` DECIMAL(15, 2), |
| 98 | + `order_date` DATE, |
| 99 | + `order_priority` STRING, |
| 100 | + `clerk` STRING, |
| 101 | + `ptime` AS PROCTIME(), |
| 102 | + PRIMARY KEY (`order_key`) NOT ENFORCED |
| 103 | + ) WITH ( |
| 104 | + 'table.datalake.enabled' = 'true', |
| 105 | + 'table.datalake.freshness' = '30s', |
| 106 | + 'table.datalake.auto-maintenance' = 'true', |
| 107 | + 'iceberg.write.format.default' = 'orc', |
| 108 | + 'iceberg.commit.retry.num-retries' = '5' |
| 109 | +); |
| 110 | +``` |
| 111 | + |
| 112 | +### Primary Key Tables |
| 113 | + |
| 114 | +Primary key tables in Fluss are mapped to Iceberg tables with: |
| 115 | +- **Primary key constraints**: The Iceberg table maintains the same primary key definition |
| 116 | +- **Merge-on-read (MOR) strategy**: Updates and deletes are handled efficiently using Iceberg's MOR capabilities |
| 117 | +- **Bucket partitioning**: Automatically partitioned by the primary key using Iceberg's bucket transform with the bucket num of Fluss to align with Fluss |
| 118 | +- **Sorted by system column `__offset`**: Sorted by the system column `__offset` (which is derived from the Fluss change log) to preserve the data order and facilitate mapping back to the original Fluss change log |
| 119 | + |
| 120 | +```sql title="Primary Key Table Example" |
| 121 | +CREATE TABLE user_profiles ( |
| 122 | + `user_id` BIGINT, |
| 123 | + `username` STRING, |
| 124 | + `email` STRING, |
| 125 | + `last_login` TIMESTAMP, |
| 126 | + `profile_data` STRING, |
| 127 | + PRIMARY KEY (`user_id`) NOT ENFORCED |
| 128 | +) WITH ( |
| 129 | + 'table.datalake.enabled' = 'true', |
| 130 | + 'bucket.num' = '4', |
| 131 | + 'bucket.key' = 'user_id' |
| 132 | +); |
| 133 | +``` |
| 134 | + |
| 135 | +**Corresponding Iceberg table structure:** |
| 136 | +```sql |
| 137 | +CREATE TABLE user_profiles ( |
| 138 | + user_id BIGINT, |
| 139 | + username STRING, |
| 140 | + email STRING, |
| 141 | + last_login TIMESTAMP, |
| 142 | + profile_data STRING, |
| 143 | + __bucket INT, |
| 144 | + __offset BIGINT, |
| 145 | + __timestamp TIMESTAMP_LTZ, |
| 146 | + PRIMARY KEY (user_id) NOT ENFORCED |
| 147 | +) PARTITIONED BY (bucket(user_id, 4)) |
| 148 | +SORTED BY (__offset ASC); |
| 149 | +``` |
| 150 | + |
| 151 | +### Log Tables |
| 152 | + |
| 153 | +The table mapping for Fluss log table are a little of different depending on whether the bucket key is specified or not. |
| 154 | + |
| 155 | +#### No Bucket Key |
| 156 | +Log Table without bucket in Fluss are mapped to Iceberg tables with: |
| 157 | +- **Identity partitioning**: Using identity partitioning on the `__bucket` system column, which enables to seek to the data files in iceberg if a specified Fluss bucket is given |
| 158 | +- **Sorted by system column `__offset`**: Sorted by the system column `__offset` (which is derived from the Fluss log data) to preserve the data order and facilitate mapping back to the original Fluss log data |
| 159 | + |
| 160 | +```sql title="Log Table without Bucket Key" |
| 161 | +CREATE TABLE access_logs ( |
| 162 | + `timestamp` TIMESTAMP, |
| 163 | + `user_id` BIGINT, |
| 164 | + `action` STRING, |
| 165 | + `ip_address` STRING |
| 166 | +) WITH ( |
| 167 | + 'table.datalake.enabled' = 'true', |
| 168 | + 'bucket.num' = '3' |
| 169 | +); |
| 170 | +``` |
| 171 | + |
| 172 | +**Corresponding Iceberg table:** |
| 173 | +```sql |
| 174 | +CREATE TABLE access_logs ( |
| 175 | + timestamp TIMESTAMP, |
| 176 | + user_id BIGINT, |
| 177 | + action STRING, |
| 178 | + ip_address STRING, |
| 179 | + __bucket INT, |
| 180 | + __offset BIGINT, |
| 181 | + __timestamp TIMESTAMP_LTZ |
| 182 | +) PARTITIONED BY (IDENTITY(__bucket)) |
| 183 | +SORTED BY (__offset ASC); |
| 184 | +``` |
| 185 | + |
| 186 | +#### Single Bucket Key |
| 187 | +Log Table with one bucket key in Fluss are mapped to Iceberg tables with: |
| 188 | +- **Bucket partitioning**: Automatically partitioned by the bucket key using Iceberg's bucket transform with the bucket num of Fluss to align with Fluss |
| 189 | +- **Sorted by system column `__offset`**: Sorted by the system column `__offset` (which is derived from the Fluss log data) to preserve the data order and facilitate mapping back to the original Fluss log data |
| 190 | + |
| 191 | +```sql title="Log Table with Bucket Key" |
| 192 | +CREATE TABLE order_events ( |
| 193 | + `order_id` BIGINT, |
| 194 | + `item_id` BIGINT, |
| 195 | + `amount` INT, |
| 196 | + `event_time` TIMESTAMP |
| 197 | +) WITH ( |
| 198 | + 'table.datalake.enabled' = 'true', |
| 199 | + 'bucket.num' = '5', |
| 200 | + 'bucket.key' = 'order_id' |
| 201 | +); |
| 202 | +``` |
| 203 | + |
| 204 | +**Corresponding Iceberg table:** |
| 205 | +```sql |
| 206 | +CREATE TABLE order_events ( |
| 207 | + order_id BIGINT, |
| 208 | + item_id BIGINT, |
| 209 | + amount INT, |
| 210 | + event_time TIMESTAMP, |
| 211 | + __bucket INT, |
| 212 | + __offset BIGINT, |
| 213 | + __timestamp TIMESTAMP_LTZ |
| 214 | +) PARTITIONED BY (bucket(order_id, 5)) |
| 215 | +SORTED BY (__offset ASC); |
| 216 | +``` |
| 217 | + |
| 218 | +### Partitioned Tables |
| 219 | + |
| 220 | +For Fluss partitioned tables, Iceberg first partitions by Fluss partition keys, then by following the above rules: |
| 221 | + |
| 222 | +```sql title="Partitioned Table Example" |
| 223 | +CREATE TABLE daily_sales ( |
| 224 | + `sale_id` BIGINT, |
| 225 | + `amount` DECIMAL(10,2), |
| 226 | + `customer_id` BIGINT, |
| 227 | + `sale_date` STRING, |
| 228 | + PRIMARY KEY (`sale_id`) NOT ENFORCED |
| 229 | +) PARTITIONED BY (`sale_date`) |
| 230 | +WITH ( |
| 231 | + 'table.datalake.enabled' = 'true', |
| 232 | + 'bucket.num' = '4', |
| 233 | + 'bucket.key' = 'sale_id' |
| 234 | +); |
| 235 | +``` |
| 236 | + |
| 237 | +**Corresponding Iceberg table:** |
| 238 | +```sql |
| 239 | +CREATE TABLE daily_sales ( |
| 240 | + sale_id BIGINT, |
| 241 | + amount DECIMAL(10,2), |
| 242 | + customer_id BIGINT, |
| 243 | + sale_date STRING, |
| 244 | + __bucket INT, |
| 245 | + __offset BIGINT, |
| 246 | + __timestamp TIMESTAMP_LTZ, |
| 247 | + PRIMARY KEY (sale_id) NOT ENFORCED |
| 248 | +) PARTITIONED BY (IDENTITY(sale_date), bucket(sale_id, 4)) |
| 249 | +SORTED BY (__offset ASC); |
| 250 | +``` |
| 251 | + |
| 252 | +## Read Tables |
| 253 | + |
| 254 | +### Reading with other Engines |
| 255 | + |
| 256 | +Since data tiered to Iceberg from Fluss is stored as standard Iceberg tables, you can use any Iceberg-compatible engine. Below is an example using [StarRocks](https://docs.starrocks.io/docs/data_source/catalog/iceberg/iceberg_catalog/): |
| 257 | + |
| 258 | +#### StarRocks with Hadoop Catalog |
| 259 | + |
| 260 | +```sql title="StarRocks SQL" |
| 261 | +CREATE EXTERNAL CATALOG iceberg_catalog |
| 262 | +PROPERTIES ( |
| 263 | + "type" = "iceberg", |
| 264 | + "iceberg.catalog.type" = "hadoop", |
| 265 | + "iceberg.catalog.warehouse" = "/tmp/iceberg" |
| 266 | +); |
| 267 | +``` |
| 268 | + |
| 269 | +```sql title="Query Examples" |
| 270 | +-- Basic query |
| 271 | +SELECT COUNT(*) FROM iceberg_catalog.fluss.orders; |
| 272 | + |
| 273 | +-- Time travel query |
| 274 | +SELECT * FROM iceberg_catalog.fluss.orders |
| 275 | +FOR SYSTEM_VERSION AS OF 123456789; |
| 276 | + |
| 277 | +-- Query with bucket filtering for efficiency |
| 278 | +SELECT * FROM iceberg_catalog.fluss.orders |
| 279 | +WHERE __bucket = 1 AND __offset >= 100; |
| 280 | +``` |
| 281 | + |
| 282 | +> **NOTE**: The configuration values must match those used when configuring Iceberg as the lakehouse storage for Fluss in `server.yaml`. |
| 283 | +
|
| 284 | +## Data Type Mapping |
| 285 | + |
| 286 | +When integrating with Iceberg, Fluss automatically converts between Fluss data types and Iceberg data types: |
| 287 | + |
| 288 | +| Fluss Data Type | Iceberg Data Type | Notes | |
| 289 | +|-------------------------------|-------------------------------|---------------------| |
| 290 | +| BOOLEAN | BOOLEAN | | |
| 291 | +| TINYINT | INTEGER | Promoted to INT | |
| 292 | +| SMALLINT | INTEGER | Promoted to INT | |
| 293 | +| INT | INTEGER | | |
| 294 | +| BIGINT | LONG | | |
| 295 | +| FLOAT | FLOAT | | |
| 296 | +| DOUBLE | DOUBLE | | |
| 297 | +| DECIMAL | DECIMAL | | |
| 298 | +| STRING | STRING | | |
| 299 | +| CHAR | STRING | Converted to STRING | |
| 300 | +| DATE | DATE | | |
| 301 | +| TIME | TIME | | |
| 302 | +| TIMESTAMP | TIMESTAMP (without timezone) | | |
| 303 | +| TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone) | | |
| 304 | +| BINARY | BINARY | | |
| 305 | +| BYTES | BINARY | Converted to BINARY | |
| 306 | + |
| 307 | + |
| 308 | +## Maintenance and Optimization |
| 309 | + |
| 310 | +### Auto Compaction |
| 311 | + |
| 312 | +The table option `table.datalake.auto-compaction` (disabled by default) provides per-table control over automatic compaction. |
| 313 | +When enabled for a specific table, compaction is automatically triggered during write operations to that table by the tiering service. |
| 314 | + |
| 315 | +### Snapshot Metadata |
| 316 | + |
| 317 | +Fluss adds specific metadata to Iceberg snapshots for traceability: |
| 318 | + |
| 319 | +- **commit-user**: Set to `__fluss_lake_tiering` to identify Fluss-generated snapshots |
| 320 | +- **fluss-bucket-offset**: JSON string containing the Fluss bucket offset mapping to track the tiering progress: |
| 321 | + ```json |
| 322 | + [ |
| 323 | + {"bucket": 0, "offset": 1234}, |
| 324 | + {"bucket": 1, "offset": 5678}, |
| 325 | + {"bucket": 2, "offset": 9012} |
| 326 | + ] |
| 327 | + ``` |
| 328 | + |
| 329 | +## Limitations |
| 330 | + |
| 331 | +When using Iceberg as the lakehouse storage layer with Fluss, the following limitations currently exist: |
| 332 | + |
| 333 | +- **Union Read**: Union read of data from both Fluss and Iceberg layers is not supported |
| 334 | +- **Complex Types**: Array, Map, and Row types are not supported |
| 335 | +- **Multiple bucket keys**: Not supported until Iceberg implements multi-argument partition transforms |
0 commit comments