Skip to content

Commit dc3b700

Browse files
committed
feat: review comments
1 parent 387f7c3 commit dc3b700

File tree

4 files changed

+79
-237
lines changed

4 files changed

+79
-237
lines changed

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 3 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import java.nio.file.Files;
5151
import java.util.Arrays;
52+
import java.util.Collections;
5253
import java.util.HashMap;
5354
import java.util.List;
5455
import java.util.Map;
@@ -391,16 +392,12 @@ void testAlterLakeEnabledLogTable() throws Exception {
391392
() ->
392393
paimonCatalog.getTable(
393394
Identifier.create(DATABASE, logTablePath.getTableName())))
394-
.isInstanceOf(Catalog.TableNotExistException.class)
395-
.hasMessageContaining(
396-
String.format(
397-
"Table %s.%s does not exist.",
398-
DATABASE, logTablePath.getTableName()));
395+
.isInstanceOf(Catalog.TableNotExistException.class);
399396

400397
// enable lake
401398
TableChange.SetOption enableLake =
402399
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
403-
List<TableChange> changes = Arrays.asList(enableLake);
400+
List<TableChange> changes = Collections.singletonList(enableLake);
404401

405402
admin.alterTable(logTablePath, changes, false).get();
406403

@@ -434,150 +431,6 @@ void testAlterLakeEnabledLogTable() throws Exception {
434431
BUCKET_NUM);
435432
}
436433

437-
@Test
438-
void testAlterPkLakeEnabledTable() throws Exception {
439-
Map<String, String> customProperties = new HashMap<>();
440-
customProperties.put("k1", "v1");
441-
customProperties.put("paimon.file.format", "parquet");
442-
443-
// test pk table
444-
TableDescriptor pkTable =
445-
TableDescriptor.builder()
446-
.schema(
447-
Schema.newBuilder()
448-
.column("pk_c1", DataTypes.INT())
449-
.column("pk_c2", DataTypes.STRING())
450-
.primaryKey("pk_c1")
451-
.build())
452-
.distributedBy(BUCKET_NUM)
453-
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
454-
.customProperties(customProperties)
455-
.build();
456-
TablePath pkTablePath = TablePath.of(DATABASE, "pk_table_alter");
457-
admin.createTable(pkTablePath, pkTable, false).get();
458-
459-
assertThatThrownBy(
460-
() ->
461-
paimonCatalog.getTable(
462-
Identifier.create(DATABASE, pkTablePath.getTableName())))
463-
.isInstanceOf(Catalog.TableNotExistException.class)
464-
.hasMessageContaining(
465-
String.format(
466-
"Table %s.%s does not exist.",
467-
DATABASE, pkTablePath.getTableName()));
468-
469-
// enable lake
470-
TableChange.SetOption enableLake =
471-
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
472-
List<TableChange> changes = Arrays.asList(enableLake);
473-
474-
admin.alterTable(pkTablePath, changes, false).get();
475-
476-
Table enabledPaimonPkTable =
477-
paimonCatalog.getTable(Identifier.create(DATABASE, pkTablePath.getTableName()));
478-
479-
Map<String, String> updatedProperties = new HashMap<>();
480-
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
481-
TableDescriptor updatedPkTable = pkTable.withProperties(updatedProperties);
482-
// check the gotten log table
483-
verifyPaimonTable(
484-
enabledPaimonPkTable,
485-
updatedPkTable,
486-
RowType.of(
487-
new DataType[] {
488-
org.apache.paimon.types.DataTypes.INT().notNull(),
489-
org.apache.paimon.types.DataTypes.STRING(),
490-
// for __bucket, __offset, __timestamp
491-
org.apache.paimon.types.DataTypes.INT(),
492-
org.apache.paimon.types.DataTypes.BIGINT(),
493-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
494-
},
495-
new String[] {
496-
"pk_c1",
497-
"pk_c2",
498-
BUCKET_COLUMN_NAME,
499-
OFFSET_COLUMN_NAME,
500-
TIMESTAMP_COLUMN_NAME
501-
}),
502-
"pk_c1",
503-
BUCKET_NUM);
504-
}
505-
506-
@Test
507-
void testAlterPartitionedLakeEnabledTable() throws Exception {
508-
Map<String, String> customProperties = new HashMap<>();
509-
customProperties.put("k1", "v1");
510-
customProperties.put("paimon.file.format", "parquet");
511-
512-
// test partitioned table
513-
TableDescriptor partitionedTableDescriptor =
514-
TableDescriptor.builder()
515-
.schema(
516-
Schema.newBuilder()
517-
.column("c1", DataTypes.INT())
518-
.column("c2", DataTypes.STRING())
519-
.column("c3", DataTypes.STRING())
520-
.primaryKey("c1", "c3")
521-
.build())
522-
.distributedBy(BUCKET_NUM)
523-
.partitionedBy("c3")
524-
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
525-
.customProperties(customProperties)
526-
.build();
527-
TablePath partitionedTablePath = TablePath.of(DATABASE, "partitioned_table_alter");
528-
admin.createTable(partitionedTablePath, partitionedTableDescriptor, false).get();
529-
530-
assertThatThrownBy(
531-
() ->
532-
paimonCatalog.getTable(
533-
Identifier.create(
534-
DATABASE, partitionedTablePath.getTableName())))
535-
.isInstanceOf(Catalog.TableNotExistException.class)
536-
.hasMessageContaining(
537-
String.format(
538-
"Table %s.%s does not exist.",
539-
DATABASE, partitionedTablePath.getTableName()));
540-
541-
// enable lake
542-
TableChange.SetOption enableLake =
543-
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
544-
List<TableChange> changes = Arrays.asList(enableLake);
545-
admin.alterTable(partitionedTablePath, changes, false).get();
546-
547-
Table enabledPaimonPartitionedTable =
548-
paimonCatalog.getTable(
549-
Identifier.create(DATABASE, partitionedTablePath.getTableName()));
550-
551-
Map<String, String> updatedProperties = new HashMap<>();
552-
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
553-
TableDescriptor updatedPartitionedTable =
554-
partitionedTableDescriptor.withProperties(updatedProperties);
555-
556-
verifyPaimonTable(
557-
enabledPaimonPartitionedTable,
558-
updatedPartitionedTable,
559-
RowType.of(
560-
new DataType[] {
561-
org.apache.paimon.types.DataTypes.INT().notNull(),
562-
org.apache.paimon.types.DataTypes.STRING(),
563-
org.apache.paimon.types.DataTypes.STRING().notNull(),
564-
// for __bucket, __offset, __timestamp
565-
org.apache.paimon.types.DataTypes.INT(),
566-
org.apache.paimon.types.DataTypes.BIGINT(),
567-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
568-
},
569-
new String[] {
570-
"c1",
571-
"c2",
572-
"c3",
573-
BUCKET_COLUMN_NAME,
574-
OFFSET_COLUMN_NAME,
575-
TIMESTAMP_COLUMN_NAME
576-
}),
577-
"c1",
578-
BUCKET_NUM);
579-
}
580-
581434
@Test
582435
void testThrowExceptionWhenConflictWithSystemColumn() {
583436
for (String systemColumn :

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -171,24 +171,12 @@ void testTiering() throws Exception {
171171
0);
172172
}
173173

174-
String raw =
175-
"{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}";
176-
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
177-
Collections.sort(partitionIds);
178-
List<String> partitionOffsetStrs = new ArrayList<>();
179-
180-
for (Long partitionId : partitionIds) {
181-
String partitionName = partitionNameByIds.get(partitionId);
182-
String partitionOffsetStr = String.format(raw, partitionId, partitionName);
183-
partitionOffsetStrs.add(partitionOffsetStr);
184-
}
185-
186-
String res = "[" + String.join(",", partitionOffsetStrs) + "]";
187-
188174
properties =
189175
new HashMap<String, String>() {
190176
{
191-
put(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, res);
177+
put(
178+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
179+
getPartitionOffsetStr(partitionNameByIds));
192180
}
193181
};
194182
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
@@ -202,7 +190,6 @@ void testTieringForAlterTable() throws Exception {
202190
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");
203191
Map<String, String> tableProperties = new HashMap<>();
204192
tableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false");
205-
tableProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "500 ms");
206193

207194
long t1Id = createPkTable(t1, 1, tableProperties, Collections.emptyMap());
208195

@@ -317,10 +304,7 @@ void testTieringForAlterTable() throws Exception {
317304
{
318305
put(
319306
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
320-
"["
321-
+ "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
322-
+ "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
323-
+ "]");
307+
getPartitionOffsetStr(partitionNameByIds));
324308
}
325309
};
326310
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
@@ -329,6 +313,22 @@ void testTieringForAlterTable() throws Exception {
329313
}
330314
}
331315

316+
private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
317+
String raw =
318+
"{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}";
319+
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
320+
Collections.sort(partitionIds);
321+
List<String> partitionOffsetStrs = new ArrayList<>();
322+
323+
for (Long partitionId : partitionIds) {
324+
String partitionName = partitionNameByIds.get(partitionId);
325+
String partitionOffsetStr = String.format(raw, partitionId, partitionName);
326+
partitionOffsetStrs.add(partitionOffsetStr);
327+
}
328+
329+
return "[" + String.join(",", partitionOffsetStrs) + "]";
330+
}
331+
332332
@Test
333333
void testTieringToDvEnabledTable() throws Exception {
334334
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.fluss.metadata.DatabaseDescriptor;
3737
import org.apache.fluss.metadata.PartitionSpec;
3838
import org.apache.fluss.metadata.ResolvedPartitionSpec;
39+
import org.apache.fluss.metadata.SchemaInfo;
3940
import org.apache.fluss.metadata.TableDescriptor;
4041
import org.apache.fluss.metadata.TableInfo;
4142
import org.apache.fluss.metadata.TablePath;
@@ -105,7 +106,6 @@
105106
import org.apache.fluss.server.zk.data.TableRegistration;
106107
import org.apache.fluss.utils.IOUtils;
107108
import org.apache.fluss.utils.concurrent.FutureUtils;
108-
import org.apache.fluss.utils.types.Tuple2;
109109

110110
import javax.annotation.Nullable;
111111

@@ -128,6 +128,7 @@
128128
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
129129
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePropertyChanges;
130130
import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
131+
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties;
131132
import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
132133
import static org.apache.fluss.utils.Preconditions.checkNotNull;
133134
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -306,25 +307,36 @@ public CompletableFuture<AlterTablePropertiesResponse> alterTableProperties(
306307
toTablePropertyChanges(request.getConfigChangesList());
307308

308309
try {
309-
Tuple2<TableDescriptor, TableDescriptor> tuple =
310-
metadataManager.validateAndGetUpdatedTableDescriptor(
311-
tablePath, tablePropertyChanges);
312-
TableDescriptor tableDescriptor = tuple.f0;
313-
TableDescriptor newDescriptor = tuple.f1;
310+
// it throws TableNotExistException if the table or database not exists
311+
TableRegistration tableReg = metadataManager.getTableRegistration(tablePath);
312+
SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath);
313+
// we can't use MetadataManager#getTable here, because it will add the default
314+
// lake options to the table properties, which may cause the validation failure
315+
TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo);
316+
317+
// validate the changes
318+
validateAlterTableProperties(
319+
tableInfo,
320+
tablePropertyChanges.tableKeysToChange(),
321+
tablePropertyChanges.customKeysToChange());
322+
323+
TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();
324+
TableDescriptor newDescriptor =
325+
metadataManager.getUpdatedTableDescriptor(
326+
tableDescriptor, tablePropertyChanges);
314327

315328
if (newDescriptor != null) {
316329
preAlterTableProperties(tablePath, tableDescriptor, newDescriptor);
317330
metadataManager.alterTableProperties(
318331
tablePath, tablePropertyChanges, request.isIgnoreIfNotExists());
319-
postAlterTableProperties(tablePath, tableDescriptor);
332+
postAlterTableProperties(
333+
tablePath, schemaInfo, tableReg, tableDescriptor, newDescriptor);
320334
}
321335
} catch (Exception e) {
322336
if (e instanceof TableNotExistException) {
323337
if (!request.isIgnoreIfNotExists()) {
324338
throw (TableNotExistException) e;
325339
}
326-
} else if (e instanceof RuntimeException) {
327-
throw (RuntimeException) e;
328340
} else {
329341
throw new FlussRuntimeException("Failed to alter table: " + tablePath, e);
330342
}
@@ -333,25 +345,10 @@ public CompletableFuture<AlterTablePropertiesResponse> alterTableProperties(
333345
}
334346

335347
private void preAlterTableProperties(
336-
TablePath tablePath,
337-
TableDescriptor existingTableDescriptor,
338-
TableDescriptor updatedDescriptor) {
339-
340-
mayEnableDataLake(tablePath, existingTableDescriptor, updatedDescriptor);
341-
// more pre-alter actions can be added here
342-
}
343-
344-
private void postAlterTableProperties(TablePath tablePath, TableDescriptor tableDescriptor) {
345-
mayAddRemoveTiering(tablePath, tableDescriptor);
346-
// more post-alter actions can be added here
347-
}
348-
349-
private void mayEnableDataLake(
350348
TablePath tablePath, TableDescriptor tableDescriptor, TableDescriptor newDescriptor) {
349+
351350
boolean toEnableDataLake =
352351
!isDataLakeEnabled(tableDescriptor) && isDataLakeEnabled(newDescriptor);
353-
boolean toDisableDataLake =
354-
isDataLakeEnabled(tableDescriptor) && !isDataLakeEnabled(newDescriptor);
355352

356353
// enable lake table
357354
if (toEnableDataLake) {
@@ -369,23 +366,36 @@ private void mayEnableDataLake(
369366
tablePath, dataLakeFormat, dataLakeFormat));
370367
}
371368
}
369+
// more pre-alter actions can be added here
372370
}
373371

374-
private void mayAddRemoveTiering(TablePath tablePath, TableDescriptor tableDescriptor) {
375-
TableInfo alteredTableInfo = metadataManager.getTable(tablePath);
372+
private void postAlterTableProperties(
373+
TablePath tablePath,
374+
SchemaInfo schemaInfo,
375+
TableRegistration oldTableReg,
376+
TableDescriptor oldTableDescriptor,
377+
TableDescriptor newTableDescriptor) {
378+
379+
TableRegistration updatedTableRegistration =
380+
oldTableReg.newProperties(
381+
newTableDescriptor.getProperties(),
382+
newTableDescriptor.getCustomProperties());
383+
384+
TableInfo alteredTableInfo = updatedTableRegistration.toTableInfo(tablePath, schemaInfo);
376385
TableDescriptor alteredDescriptor = alteredTableInfo.toTableDescriptor();
377386

378387
boolean toEnableDataLake =
379-
!isDataLakeEnabled(tableDescriptor) && isDataLakeEnabled(alteredDescriptor);
388+
!isDataLakeEnabled(oldTableDescriptor) && isDataLakeEnabled(alteredDescriptor);
380389
boolean toDisableDataLake =
381-
isDataLakeEnabled(tableDescriptor) && !isDataLakeEnabled(alteredDescriptor);
390+
isDataLakeEnabled(oldTableDescriptor) && !isDataLakeEnabled(alteredDescriptor);
382391

383392
if (toEnableDataLake) {
384393
// if the table is lake table, we need to add it to lake table tiering manager
385394
lakeTableTieringManager.addNewLakeTable(alteredTableInfo);
386395
} else if (toDisableDataLake) {
387396
lakeTableTieringManager.removeLakeTable(alteredTableInfo.getTableId());
388397
}
398+
// more post-alter actions can be added here
389399
}
390400

391401
private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) {

0 commit comments

Comments
 (0)