Skip to content

Commit ac1569c

Browse files
authored
[lake/iceberg] Support Log Table in IcebergLakeCatalog (#1508)
1 parent 17d69ff commit ac1569c

File tree

2 files changed

+227
-28
lines changed

2 files changed

+227
-28
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
5151
import static org.apache.iceberg.CatalogUtil.loadCatalog;
5252

53-
/** A Iceberg implementation of {@link LakeCatalog}. */
53+
/** An Iceberg implementation of {@link LakeCatalog}. */
5454
public class IcebergLakeCatalog implements LakeCatalog {
5555

5656
private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new LinkedHashMap<>();
@@ -101,18 +101,16 @@ private Catalog createIcebergCatalog(Configuration configuration) {
101101
@Override
102102
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
103103
throws TableAlreadyExistException {
104-
if (!tableDescriptor.hasPrimaryKey()) {
105-
throw new UnsupportedOperationException(
106-
"Iceberg integration currently supports only primary key tables.");
107-
}
108104
// convert Fluss table path to iceberg table
105+
boolean isPkTable = tableDescriptor.hasPrimaryKey();
109106
TableIdentifier icebergId = toIcebergTableIdentifier(tablePath);
110-
Schema icebergSchema = convertToIcebergSchema(tableDescriptor);
107+
Schema icebergSchema = convertToIcebergSchema(tableDescriptor, isPkTable);
111108
Catalog.TableBuilder tableBuilder = icebergCatalog.buildTable(icebergId, icebergSchema);
112109

113-
PartitionSpec partitionSpec = createPartitionSpec(tableDescriptor, icebergSchema);
110+
PartitionSpec partitionSpec =
111+
createPartitionSpec(tableDescriptor, icebergSchema, isPkTable);
114112
SortOrder sortOrder = createSortOrder(icebergSchema);
115-
tableBuilder.withProperties(buildTableProperties(tableDescriptor));
113+
tableBuilder.withProperties(buildTableProperties(tableDescriptor, isPkTable));
116114
tableBuilder.withPartitionSpec(partitionSpec);
117115
tableBuilder.withSortOrder(sortOrder);
118116
try {
@@ -145,10 +143,11 @@ private void createTable(TablePath tablePath, Catalog.TableBuilder tableBuilder)
145143
}
146144
}
147145

148-
public Schema convertToIcebergSchema(TableDescriptor tableDescriptor) {
146+
public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, boolean isPkTable) {
149147
List<Types.NestedField> fields = new ArrayList<>();
150-
int fieldId = 1;
148+
int fieldId = 0;
151149

150+
// general columns
152151
for (com.alibaba.fluss.metadata.Schema.Column column :
153152
tableDescriptor.getSchema().getColumns()) {
154153
String colName = column.getName();
@@ -176,24 +175,29 @@ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor) {
176175
}
177176
fields.add(field);
178177
}
178+
179+
// system columns
179180
for (Map.Entry<String, Type> systemColumn : SYSTEM_COLUMNS.entrySet()) {
180181
fields.add(
181182
Types.NestedField.required(
182183
fieldId++, systemColumn.getKey(), systemColumn.getValue()));
183184
}
184185

185-
// set identifier fields
186-
int[] primaryKeyIndexes = tableDescriptor.getSchema().getPrimaryKeyIndexes();
187-
Set<Integer> identifierFieldIds = new HashSet<>();
188-
for (int i = 0; i < primaryKeyIndexes.length; i++) {
189-
identifierFieldIds.add(fields.get(i).fieldId());
186+
if (isPkTable) {
187+
// set identifier fields
188+
int[] primaryKeyIndexes = tableDescriptor.getSchema().getPrimaryKeyIndexes();
189+
Set<Integer> identifierFieldIds = new HashSet<>();
190+
for (int pkIdx : primaryKeyIndexes) {
191+
identifierFieldIds.add(fields.get(pkIdx).fieldId());
192+
}
193+
return new Schema(fields, identifierFieldIds);
194+
} else {
195+
return new Schema(fields);
190196
}
191-
return new Schema(fields, identifierFieldIds);
192197
}
193198

194199
private PartitionSpec createPartitionSpec(
195-
TableDescriptor tableDescriptor, Schema icebergSchema) {
196-
// Only PK tables supported for now
200+
TableDescriptor tableDescriptor, Schema icebergSchema, boolean isPkTable) {
197201
List<String> bucketKeys = tableDescriptor.getBucketKeys();
198202
int bucketCount =
199203
tableDescriptor
@@ -204,21 +208,35 @@ private PartitionSpec createPartitionSpec(
204208
new IllegalArgumentException(
205209
"Bucket count (bucket.num) must be set"));
206210

207-
if (bucketKeys.isEmpty()) {
208-
throw new IllegalArgumentException(
209-
"Bucket key must be set for primary key Iceberg tables");
210-
}
211+
// Only support one bucket key for now
211212
if (bucketKeys.size() > 1) {
212213
throw new UnsupportedOperationException(
213214
"Only one bucket key is supported for Iceberg at the moment");
214215
}
215216

217+
// pk table must have bucket key
218+
if (bucketKeys.isEmpty() && isPkTable) {
219+
throw new IllegalArgumentException(
220+
"Bucket key must be set for primary key Iceberg tables");
221+
}
222+
216223
PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
217224
List<String> partitionKeys = tableDescriptor.getPartitionKeys();
225+
// always set identity partition with partition key
218226
for (String partitionKey : partitionKeys) {
219227
builder.identity(partitionKey);
220228
}
221-
builder.bucket(bucketKeys.get(0), bucketCount);
229+
230+
if (isPkTable) {
231+
builder.bucket(bucketKeys.get(0), bucketCount);
232+
} else {
233+
// if there is no bucket keys, use identity(__bucket)
234+
if (bucketKeys.isEmpty()) {
235+
builder.identity(BUCKET_COLUMN_NAME);
236+
} else {
237+
builder.bucket(bucketKeys.get(0), bucketCount);
238+
}
239+
}
222240

223241
return builder.build();
224242
}
@@ -252,13 +270,16 @@ private SortOrder createSortOrder(Schema icebergSchema) {
252270
return builder.build();
253271
}
254272

255-
private Map<String, String> buildTableProperties(TableDescriptor tableDescriptor) {
273+
private Map<String, String> buildTableProperties(
274+
TableDescriptor tableDescriptor, boolean isPkTable) {
256275
Map<String, String> icebergProperties = new HashMap<>();
257276

258-
// MOR table properties for streaming workloads
259-
icebergProperties.put("write.delete.mode", "merge-on-read");
260-
icebergProperties.put("write.update.mode", "merge-on-read");
261-
icebergProperties.put("write.merge.mode", "merge-on-read");
277+
if (isPkTable) {
278+
// MOR table properties for streaming workloads
279+
icebergProperties.put("write.delete.mode", "merge-on-read");
280+
icebergProperties.put("write.update.mode", "merge-on-read");
281+
icebergProperties.put("write.merge.mode", "merge-on-read");
282+
}
262283

263284
tableDescriptor
264285
.getProperties()

fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,37 @@ void setupCatalog() {
6161
this.flussIcebergCatalog = new IcebergLakeCatalog(configuration);
6262
}
6363

64+
/** Verify property prefix rewriting. */
65+
@Test
66+
void testPropertyPrefixRewriting() {
67+
String database = "test_db";
68+
String tableName = "test_table";
69+
70+
Schema flussSchema = Schema.newBuilder().column("id", DataTypes.BIGINT()).build();
71+
72+
TableDescriptor tableDescriptor =
73+
TableDescriptor.builder()
74+
.schema(flussSchema)
75+
.distributedBy(3)
76+
.property("iceberg.commit.retry.num-retries", "5")
77+
.property("table.datalake.freshness", "30s")
78+
.build();
79+
80+
TablePath tablePath = TablePath.of(database, tableName);
81+
flussIcebergCatalog.createTable(tablePath, tableDescriptor);
82+
83+
Table created =
84+
flussIcebergCatalog
85+
.getIcebergCatalog()
86+
.loadTable(TableIdentifier.of(database, tableName));
87+
88+
// Verify property prefix rewriting
89+
assertThat(created.properties()).containsEntry("commit.retry.num-retries", "5");
90+
assertThat(created.properties()).containsEntry("fluss.table.datalake.freshness", "30s");
91+
assertThat(created.properties())
92+
.doesNotContainKeys("iceberg.commit.retry.num-retries", "table.datalake.freshness");
93+
}
94+
6495
@Test
6596
void testCreatePrimaryKeyTable() {
6697
String database = "test_db";
@@ -217,4 +248,151 @@ void rejectsPrimaryKeyTableWithMultipleBucketKeys() {
217248
.isInstanceOf(UnsupportedOperationException.class)
218249
.hasMessageContaining("Only one bucket key is supported for Iceberg");
219250
}
251+
252+
@Test
253+
void testCreateLogTable() {
254+
String database = "test_db";
255+
String tableName = "log_table";
256+
257+
Schema flussSchema =
258+
Schema.newBuilder()
259+
.column("id", DataTypes.BIGINT())
260+
.column("name", DataTypes.STRING())
261+
.column("amount", DataTypes.INT())
262+
.column("address", DataTypes.STRING())
263+
.build();
264+
265+
TableDescriptor td =
266+
TableDescriptor.builder()
267+
.schema(flussSchema)
268+
.distributedBy(3) // no bucket key
269+
.build();
270+
271+
TablePath tablePath = TablePath.of(database, tableName);
272+
flussIcebergCatalog.createTable(tablePath, td);
273+
274+
TableIdentifier tableId = TableIdentifier.of(database, tableName);
275+
Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
276+
277+
org.apache.iceberg.Schema expectIcebergSchema =
278+
new org.apache.iceberg.Schema(
279+
Arrays.asList(
280+
Types.NestedField.optional(1, "id", Types.LongType.get()),
281+
Types.NestedField.optional(2, "name", Types.StringType.get()),
282+
Types.NestedField.optional(3, "amount", Types.IntegerType.get()),
283+
Types.NestedField.optional(4, "address", Types.StringType.get()),
284+
Types.NestedField.required(
285+
5, BUCKET_COLUMN_NAME, Types.IntegerType.get()),
286+
Types.NestedField.required(
287+
6, OFFSET_COLUMN_NAME, Types.LongType.get()),
288+
Types.NestedField.required(
289+
7, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())));
290+
291+
// Verify iceberg table schema
292+
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
293+
294+
// Verify partition field and transform
295+
assertThat(createdTable.spec().fields()).hasSize(1);
296+
PartitionField partitionField = createdTable.spec().fields().get(0);
297+
assertThat(partitionField.name()).isEqualTo(BUCKET_COLUMN_NAME);
298+
assertThat(partitionField.transform().toString()).isEqualTo("identity");
299+
300+
// Verify sort field and order
301+
assertThat(createdTable.sortOrder().fields()).hasSize(1);
302+
SortField sortField = createdTable.sortOrder().fields().get(0);
303+
assertThat(sortField.sourceId())
304+
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
305+
assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
306+
}
307+
308+
@Test
309+
void testCreatePartitionedLogTable() {
310+
String database = "test_db";
311+
String tableName = "partitioned_log_table";
312+
313+
Schema flussSchema =
314+
Schema.newBuilder()
315+
.column("id", DataTypes.BIGINT())
316+
.column("name", DataTypes.STRING())
317+
.column("amount", DataTypes.INT())
318+
.column("order_type", DataTypes.STRING())
319+
.build();
320+
321+
TableDescriptor td =
322+
TableDescriptor.builder()
323+
.schema(flussSchema)
324+
.distributedBy(3)
325+
.partitionedBy("order_type")
326+
.build();
327+
328+
TablePath path = TablePath.of(database, tableName);
329+
flussIcebergCatalog.createTable(path, td);
330+
331+
Table createdTable =
332+
flussIcebergCatalog
333+
.getIcebergCatalog()
334+
.loadTable(TableIdentifier.of(database, tableName));
335+
336+
org.apache.iceberg.Schema expectIcebergSchema =
337+
new org.apache.iceberg.Schema(
338+
Arrays.asList(
339+
Types.NestedField.optional(1, "id", Types.LongType.get()),
340+
Types.NestedField.optional(2, "name", Types.StringType.get()),
341+
Types.NestedField.optional(3, "amount", Types.IntegerType.get()),
342+
Types.NestedField.optional(4, "order_type", Types.StringType.get()),
343+
Types.NestedField.required(
344+
5, BUCKET_COLUMN_NAME, Types.IntegerType.get()),
345+
Types.NestedField.required(
346+
6, OFFSET_COLUMN_NAME, Types.LongType.get()),
347+
Types.NestedField.required(
348+
7, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())));
349+
350+
// Verify iceberg table schema
351+
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
352+
353+
// Verify partition field and transform
354+
assertThat(createdTable.spec().fields()).hasSize(2);
355+
PartitionField firstPartitionField = createdTable.spec().fields().get(0);
356+
assertThat(firstPartitionField.name()).isEqualTo("order_type");
357+
assertThat(firstPartitionField.transform().toString()).isEqualTo("identity");
358+
359+
PartitionField secondPartitionField = createdTable.spec().fields().get(1);
360+
assertThat(secondPartitionField.name()).isEqualTo(BUCKET_COLUMN_NAME);
361+
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity");
362+
363+
// Verify sort field and order
364+
assertThat(createdTable.sortOrder().fields()).hasSize(1);
365+
SortField sortField = createdTable.sortOrder().fields().get(0);
366+
assertThat(sortField.sourceId())
367+
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
368+
assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
369+
}
370+
371+
@Test
372+
void rejectsLogTableWithMultipleBucketKeys() {
373+
String database = "test_db";
374+
String tableName = "multi_bucket_log_table";
375+
376+
Schema flussSchema =
377+
Schema.newBuilder()
378+
.column("id", DataTypes.BIGINT())
379+
.column("name", DataTypes.STRING())
380+
.column("amount", DataTypes.INT())
381+
.column("user_type", DataTypes.STRING())
382+
.column("order_type", DataTypes.STRING())
383+
.build();
384+
385+
TableDescriptor tableDescriptor =
386+
TableDescriptor.builder()
387+
.schema(flussSchema)
388+
.distributedBy(3, "user_type", "order_type")
389+
.build();
390+
391+
TablePath tablePath = TablePath.of(database, tableName);
392+
393+
// Do not allow multiple bucket keys for log table
394+
assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, tableDescriptor))
395+
.isInstanceOf(UnsupportedOperationException.class)
396+
.hasMessageContaining("Only one bucket key is supported for Iceberg");
397+
}
220398
}

0 commit comments

Comments
 (0)