Skip to content

Commit d10252d

Browse files
authored
[iceberg] support altering table properties (#2043)
* [iceberg] support altering table properties * prevent altering reserved table properties
1 parent 20e87cc commit d10252d

File tree

2 files changed

+224
-5
lines changed

2 files changed

+224
-5
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,19 @@
3030
import org.apache.fluss.utils.IOUtils;
3131

3232
import org.apache.iceberg.PartitionSpec;
33+
import org.apache.iceberg.RowLevelOperationMode;
3334
import org.apache.iceberg.Schema;
3435
import org.apache.iceberg.SortOrder;
36+
import org.apache.iceberg.Table;
37+
import org.apache.iceberg.TableProperties;
38+
import org.apache.iceberg.UpdateProperties;
3539
import org.apache.iceberg.catalog.Catalog;
3640
import org.apache.iceberg.catalog.Namespace;
3741
import org.apache.iceberg.catalog.SupportsNamespaces;
3842
import org.apache.iceberg.catalog.TableIdentifier;
3943
import org.apache.iceberg.exceptions.AlreadyExistsException;
4044
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
45+
import org.apache.iceberg.exceptions.NoSuchTableException;
4146
import org.apache.iceberg.types.Type;
4247
import org.apache.iceberg.types.Types;
4348

@@ -52,10 +57,17 @@
5257
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5358
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
5459
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
60+
import static org.apache.fluss.utils.Preconditions.checkArgument;
5561
import static org.apache.iceberg.types.Type.TypeID.STRING;
5662

5763
/** An Iceberg implementation of {@link LakeCatalog}. */
5864
public class IcebergLakeCatalog implements LakeCatalog {
65+
@VisibleForTesting
66+
static final Set<String> RESERVED_PROPERTIES =
67+
Set.of(
68+
TableProperties.MERGE_MODE,
69+
TableProperties.UPDATE_MODE,
70+
TableProperties.DELETE_MODE);
5971

6072
public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new LinkedHashMap<>();
6173

@@ -119,8 +131,35 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
119131
@Override
120132
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
121133
throws TableNotExistException {
122-
throw new UnsupportedOperationException(
123-
"Alter table is not supported for Iceberg at the moment");
134+
try {
135+
Table table = icebergCatalog.loadTable(toIcebergTableIdentifier(tablePath));
136+
UpdateProperties updateProperties = table.updateProperties();
137+
for (TableChange tableChange : tableChanges) {
138+
if (tableChange instanceof TableChange.SetOption) {
139+
TableChange.SetOption option = (TableChange.SetOption) tableChange;
140+
checkArgument(
141+
!RESERVED_PROPERTIES.contains(option.getKey()),
142+
"Cannot set table property '%s'",
143+
option.getKey());
144+
updateProperties.set(
145+
convertFlussPropertyKeyToIceberg(option.getKey()), option.getValue());
146+
} else if (tableChange instanceof TableChange.ResetOption) {
147+
TableChange.ResetOption option = (TableChange.ResetOption) tableChange;
148+
checkArgument(
149+
!RESERVED_PROPERTIES.contains(option.getKey()),
150+
"Cannot reset table property '%s'",
151+
option.getKey());
152+
updateProperties.remove(convertFlussPropertyKeyToIceberg(option.getKey()));
153+
} else {
154+
throw new UnsupportedOperationException(
155+
"Unsupported table change: " + tableChange.getClass());
156+
}
157+
}
158+
159+
updateProperties.commit();
160+
} catch (NoSuchTableException e) {
161+
throw new TableNotExistException("Table " + tablePath + " does not exist.");
162+
}
124163
}
125164

126165
private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
@@ -249,6 +288,14 @@ private void setFlussPropertyToIceberg(
249288
}
250289
}
251290

291+
private static String convertFlussPropertyKeyToIceberg(String key) {
292+
if (key.startsWith(ICEBERG_CONF_PREFIX)) {
293+
return key.substring(ICEBERG_CONF_PREFIX.length());
294+
} else {
295+
return FLUSS_CONF_PREFIX + key;
296+
}
297+
}
298+
252299
private void createDatabase(String databaseName) {
253300
Namespace namespace = Namespace.of(databaseName);
254301
if (icebergCatalog instanceof SupportsNamespaces) {
@@ -275,9 +322,12 @@ private Map<String, String> buildTableProperties(
275322

276323
if (isPkTable) {
277324
// MOR table properties for streaming workloads
278-
icebergProperties.put("write.delete.mode", "merge-on-read");
279-
icebergProperties.put("write.update.mode", "merge-on-read");
280-
icebergProperties.put("write.merge.mode", "merge-on-read");
325+
icebergProperties.put(
326+
TableProperties.DELETE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName());
327+
icebergProperties.put(
328+
TableProperties.UPDATE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName());
329+
icebergProperties.put(
330+
TableProperties.MERGE_MODE, RowLevelOperationMode.MERGE_ON_READ.modeName());
281331
}
282332

283333
tableDescriptor

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.exception.InvalidTableException;
23+
import org.apache.fluss.exception.TableNotExistException;
2324
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
2425
import org.apache.fluss.metadata.Schema;
26+
import org.apache.fluss.metadata.TableChange;
2527
import org.apache.fluss.metadata.TableDescriptor;
2628
import org.apache.fluss.metadata.TablePath;
2729
import org.apache.fluss.types.DataTypes;
2830

2931
import org.apache.iceberg.PartitionField;
32+
import org.apache.iceberg.RowLevelOperationMode;
3033
import org.apache.iceberg.SortDirection;
3134
import org.apache.iceberg.SortField;
3235
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.catalog.Catalog;
3337
import org.apache.iceberg.catalog.TableIdentifier;
3438
import org.apache.iceberg.types.Types;
3539
import org.assertj.core.api.Assertions;
@@ -455,4 +459,169 @@ void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws Exception {
455459
.hasMessage(
456460
"Partition key only support string type for iceberg currently. Column `c1` is not string type.");
457461
}
462+
463+
@Test
464+
void alterTableProperties() {
465+
String database = "test_alter_table_db";
466+
String tableName = "test_alter_table";
467+
468+
Schema flussSchema = Schema.newBuilder().column("id", DataTypes.BIGINT()).build();
469+
470+
TableDescriptor tableDescriptor =
471+
TableDescriptor.builder()
472+
.schema(flussSchema)
473+
.distributedBy(3)
474+
.property("iceberg.commit.retry.num-retries", "5")
475+
.property("table.datalake.freshness", "30s")
476+
.build();
477+
478+
TablePath tablePath = TablePath.of(database, tableName);
479+
TestingLakeCatalogContext context = new TestingLakeCatalogContext();
480+
flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
481+
482+
Catalog catalog = flussIcebergCatalog.getIcebergCatalog();
483+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
484+
.containsEntry("commit.retry.num-retries", "5")
485+
.containsEntry("fluss.table.datalake.freshness", "30s")
486+
.doesNotContainKeys("iceberg.commit.retry.num-retries", "table.datalake.freshness");
487+
488+
// set new iceberg property
489+
flussIcebergCatalog.alterTable(
490+
tablePath,
491+
List.of(TableChange.set("iceberg.commit.retry.min-wait-ms", "1000")),
492+
context);
493+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
494+
.containsEntry("commit.retry.min-wait-ms", "1000")
495+
.containsEntry("commit.retry.num-retries", "5")
496+
.containsEntry("fluss.table.datalake.freshness", "30s")
497+
.doesNotContainKeys(
498+
"iceberg.commit.retry.min-wait-ms",
499+
"iceberg.commit.retry.num-retries",
500+
"table.datalake.freshness");
501+
502+
// update existing properties
503+
flussIcebergCatalog.alterTable(
504+
tablePath,
505+
List.of(
506+
TableChange.set("iceberg.commit.retry.num-retries", "10"),
507+
TableChange.set("table.datalake.freshness", "23s")),
508+
context);
509+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
510+
.containsEntry("commit.retry.min-wait-ms", "1000")
511+
.containsEntry("commit.retry.num-retries", "10")
512+
.containsEntry("fluss.table.datalake.freshness", "23s")
513+
.doesNotContainKeys(
514+
"iceberg.commit.retry.min-wait-ms",
515+
"iceberg.commit.retry.num-retries",
516+
"table.datalake.freshness");
517+
518+
// remove existing properties
519+
flussIcebergCatalog.alterTable(
520+
tablePath,
521+
List.of(
522+
TableChange.reset("iceberg.commit.retry.min-wait-ms"),
523+
TableChange.reset("table.datalake.freshness")),
524+
context);
525+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
526+
.containsEntry("commit.retry.num-retries", "10")
527+
.doesNotContainKeys(
528+
"commit.retry.min-wait-ms",
529+
"iceberg.commit.retry.min-wait-ms",
530+
"table.datalake.freshness",
531+
"fluss.table.datalake.freshness");
532+
533+
// remove non-existing property
534+
flussIcebergCatalog.alterTable(
535+
tablePath, List.of(TableChange.reset("iceberg.non-existing.property")), context);
536+
assertThat(catalog.loadTable(TableIdentifier.of(database, tableName)).properties())
537+
.containsEntry("commit.retry.num-retries", "10")
538+
.doesNotContainKeys(
539+
"non-existing.property",
540+
"iceberg.non-existing.property",
541+
"commit.retry.min-wait-ms",
542+
"iceberg.commit.retry.min-wait-ms",
543+
"table.datalake.freshness",
544+
"fluss.table.datalake.freshness");
545+
}
546+
547+
@Test
548+
void alterTablePropertiesWithNonExistingTable() {
549+
TestingLakeCatalogContext context = new TestingLakeCatalogContext();
550+
// db & table don't exist
551+
assertThatThrownBy(
552+
() ->
553+
flussIcebergCatalog.alterTable(
554+
TablePath.of("non_existing_db", "non_existing_table"),
555+
List.of(
556+
TableChange.set(
557+
"iceberg.commit.retry.min-wait-ms",
558+
"1000")),
559+
context))
560+
.isInstanceOf(TableNotExistException.class)
561+
.hasMessage("Table non_existing_db.non_existing_table does not exist.");
562+
563+
TableDescriptor tableDescriptor =
564+
TableDescriptor.builder()
565+
.schema(Schema.newBuilder().column("id", DataTypes.BIGINT()).build())
566+
.distributedBy(3)
567+
.property("iceberg.commit.retry.num-retries", "5")
568+
.property("table.datalake.freshness", "30s")
569+
.build();
570+
571+
String database = "test_db";
572+
TablePath tablePath = TablePath.of(database, "test_table");
573+
flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
574+
575+
// database exists but table doesn't exist
576+
assertThatThrownBy(
577+
() ->
578+
flussIcebergCatalog.alterTable(
579+
TablePath.of(database, "non_existing_table"),
580+
List.of(
581+
TableChange.set(
582+
"iceberg.commit.retry.min-wait-ms",
583+
"1000")),
584+
context))
585+
.isInstanceOf(TableNotExistException.class)
586+
.hasMessage("Table test_db.non_existing_table does not exist.");
587+
}
588+
589+
@Test
590+
void alterReservedTableProperties() {
591+
String database = "test_alter_table_with_reserved_properties_db";
592+
String tableName = "test_alter_table_with_reserved_properties";
593+
594+
Schema flussSchema = Schema.newBuilder().column("id", DataTypes.BIGINT()).build();
595+
596+
TableDescriptor tableDescriptor =
597+
TableDescriptor.builder().schema(flussSchema).distributedBy(3).build();
598+
599+
TablePath tablePath = TablePath.of(database, tableName);
600+
TestingLakeCatalogContext context = new TestingLakeCatalogContext();
601+
flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
602+
603+
for (String property : IcebergLakeCatalog.RESERVED_PROPERTIES) {
604+
assertThatThrownBy(
605+
() ->
606+
flussIcebergCatalog.alterTable(
607+
tablePath,
608+
List.of(
609+
TableChange.set(
610+
property,
611+
RowLevelOperationMode.COPY_ON_WRITE
612+
.modeName())),
613+
context))
614+
.isInstanceOf(IllegalArgumentException.class)
615+
.hasMessage("Cannot set table property '%s'", property);
616+
617+
assertThatThrownBy(
618+
() ->
619+
flussIcebergCatalog.alterTable(
620+
tablePath,
621+
List.of(TableChange.reset(property)),
622+
context))
623+
.isInstanceOf(IllegalArgumentException.class)
624+
.hasMessage("Cannot reset table property '%s'", property);
625+
}
626+
}
458627
}

0 commit comments

Comments
 (0)