Skip to content

Commit 4afb5ba

Browse files
committed
feat: add test
1 parent 1e5ca41 commit 4afb5ba

File tree

7 files changed

+477
-27
lines changed

7 files changed

+477
-27
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6363
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
6464
import org.apache.fluss.types.DataTypes;
65+
6566
import org.junit.jupiter.api.BeforeEach;
6667
import org.junit.jupiter.api.Test;
6768

fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,21 @@ public TableDescriptor withProperties(Map<String, String> newProperties) {
236236
schema, comment, partitionKeys, tableDistribution, newProperties, customProperties);
237237
}
238238

239+
/**
240+
* Returns a new TableDescriptor instance that is a copy of this TableDescriptor with a new
241+
* custom properties.
242+
*/
243+
public TableDescriptor withProperties(
244+
Map<String, String> newProperties, Map<String, String> newCustomProperties) {
245+
return new TableDescriptor(
246+
schema,
247+
comment,
248+
partitionKeys,
249+
tableDistribution,
250+
newProperties,
251+
newCustomProperties);
252+
}
253+
239254
/**
240255
* Returns a new TableDescriptor instance that is a copy of this TableDescriptor with a new
241256
* replication factor property.

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.exception.FlussRuntimeException;
2626
import org.apache.fluss.exception.InvalidTableException;
2727
import org.apache.fluss.metadata.Schema;
28+
import org.apache.fluss.metadata.TableChange;
2829
import org.apache.fluss.metadata.TableDescriptor;
2930
import org.apache.fluss.metadata.TablePath;
3031
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -49,6 +50,7 @@
4950
import java.nio.file.Files;
5051
import java.util.Arrays;
5152
import java.util.HashMap;
53+
import java.util.List;
5254
import java.util.Map;
5355
import java.util.stream.Stream;
5456

@@ -364,6 +366,218 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
364366
BUCKET_NUM);
365367
}
366368

369+
@Test
370+
void testAlterLakeEnabledLogTable() throws Exception {
371+
Map<String, String> customProperties = new HashMap<>();
372+
customProperties.put("k1", "v1");
373+
customProperties.put("paimon.file.format", "parquet");
374+
375+
// log table with lake disabled
376+
TableDescriptor logTable =
377+
TableDescriptor.builder()
378+
.schema(
379+
Schema.newBuilder()
380+
.column("log_c1", DataTypes.INT())
381+
.column("log_c2", DataTypes.STRING())
382+
.build())
383+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
384+
.customProperties(customProperties)
385+
.distributedBy(BUCKET_NUM, "log_c1", "log_c2")
386+
.build();
387+
TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
388+
admin.createTable(logTablePath, logTable, false).get();
389+
390+
assertThatThrownBy(
391+
() ->
392+
paimonCatalog.getTable(
393+
Identifier.create(DATABASE, logTablePath.getTableName())))
394+
.isInstanceOf(Catalog.TableNotExistException.class)
395+
.hasMessageContaining(
396+
String.format(
397+
"Table %s.%s does not exist.",
398+
DATABASE, logTablePath.getTableName()));
399+
400+
// enable lake
401+
TableChange.SetOption enableLake =
402+
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
403+
List<TableChange> changes = Arrays.asList(enableLake);
404+
405+
admin.alterTable(logTablePath, changes, false).get();
406+
407+
Table enabledPaimonLogTable =
408+
paimonCatalog.getTable(Identifier.create(DATABASE, logTablePath.getTableName()));
409+
410+
Map<String, String> updatedProperties = new HashMap<>();
411+
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
412+
TableDescriptor updatedLogTable = logTable.withProperties(updatedProperties);
413+
// check the gotten log table
414+
verifyPaimonTable(
415+
enabledPaimonLogTable,
416+
updatedLogTable,
417+
RowType.of(
418+
new DataType[] {
419+
org.apache.paimon.types.DataTypes.INT(),
420+
org.apache.paimon.types.DataTypes.STRING(),
421+
// for __bucket, __offset, __timestamp
422+
org.apache.paimon.types.DataTypes.INT(),
423+
org.apache.paimon.types.DataTypes.BIGINT(),
424+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
425+
},
426+
new String[] {
427+
"log_c1",
428+
"log_c2",
429+
BUCKET_COLUMN_NAME,
430+
OFFSET_COLUMN_NAME,
431+
TIMESTAMP_COLUMN_NAME
432+
}),
433+
"log_c1,log_c2",
434+
BUCKET_NUM);
435+
}
436+
437+
@Test
438+
void testAlterPkLakeEnabledTable() throws Exception {
439+
Map<String, String> customProperties = new HashMap<>();
440+
customProperties.put("k1", "v1");
441+
customProperties.put("paimon.file.format", "parquet");
442+
443+
// test pk table
444+
TableDescriptor pkTable =
445+
TableDescriptor.builder()
446+
.schema(
447+
Schema.newBuilder()
448+
.column("pk_c1", DataTypes.INT())
449+
.column("pk_c2", DataTypes.STRING())
450+
.primaryKey("pk_c1")
451+
.build())
452+
.distributedBy(BUCKET_NUM)
453+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
454+
.customProperties(customProperties)
455+
.build();
456+
TablePath pkTablePath = TablePath.of(DATABASE, "pk_table_alter");
457+
admin.createTable(pkTablePath, pkTable, false).get();
458+
459+
assertThatThrownBy(
460+
() ->
461+
paimonCatalog.getTable(
462+
Identifier.create(DATABASE, pkTablePath.getTableName())))
463+
.isInstanceOf(Catalog.TableNotExistException.class)
464+
.hasMessageContaining(
465+
String.format(
466+
"Table %s.%s does not exist.",
467+
DATABASE, pkTablePath.getTableName()));
468+
469+
// enable lake
470+
TableChange.SetOption enableLake =
471+
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
472+
List<TableChange> changes = Arrays.asList(enableLake);
473+
474+
admin.alterTable(pkTablePath, changes, false).get();
475+
476+
Table enabledPaimonPkTable =
477+
paimonCatalog.getTable(Identifier.create(DATABASE, pkTablePath.getTableName()));
478+
479+
Map<String, String> updatedProperties = new HashMap<>();
480+
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
481+
TableDescriptor updatedPkTable = pkTable.withProperties(updatedProperties);
482+
// check the gotten log table
483+
verifyPaimonTable(
484+
enabledPaimonPkTable,
485+
updatedPkTable,
486+
RowType.of(
487+
new DataType[] {
488+
org.apache.paimon.types.DataTypes.INT().notNull(),
489+
org.apache.paimon.types.DataTypes.STRING(),
490+
// for __bucket, __offset, __timestamp
491+
org.apache.paimon.types.DataTypes.INT(),
492+
org.apache.paimon.types.DataTypes.BIGINT(),
493+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
494+
},
495+
new String[] {
496+
"pk_c1",
497+
"pk_c2",
498+
BUCKET_COLUMN_NAME,
499+
OFFSET_COLUMN_NAME,
500+
TIMESTAMP_COLUMN_NAME
501+
}),
502+
"pk_c1",
503+
BUCKET_NUM);
504+
}
505+
506+
@Test
507+
void testAlterPartitionedLakeEnabledTable() throws Exception {
508+
Map<String, String> customProperties = new HashMap<>();
509+
customProperties.put("k1", "v1");
510+
customProperties.put("paimon.file.format", "parquet");
511+
512+
// test partitioned table
513+
TableDescriptor partitionedTableDescriptor =
514+
TableDescriptor.builder()
515+
.schema(
516+
Schema.newBuilder()
517+
.column("c1", DataTypes.INT())
518+
.column("c2", DataTypes.STRING())
519+
.column("c3", DataTypes.STRING())
520+
.primaryKey("c1", "c3")
521+
.build())
522+
.distributedBy(BUCKET_NUM)
523+
.partitionedBy("c3")
524+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
525+
.customProperties(customProperties)
526+
.build();
527+
TablePath partitionedTablePath = TablePath.of(DATABASE, "partitioned_table_alter");
528+
admin.createTable(partitionedTablePath, partitionedTableDescriptor, false).get();
529+
530+
assertThatThrownBy(
531+
() ->
532+
paimonCatalog.getTable(
533+
Identifier.create(
534+
DATABASE, partitionedTablePath.getTableName())))
535+
.isInstanceOf(Catalog.TableNotExistException.class)
536+
.hasMessageContaining(
537+
String.format(
538+
"Table %s.%s does not exist.",
539+
DATABASE, partitionedTablePath.getTableName()));
540+
541+
// enable lake
542+
TableChange.SetOption enableLake =
543+
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
544+
List<TableChange> changes = Arrays.asList(enableLake);
545+
admin.alterTable(partitionedTablePath, changes, false).get();
546+
547+
Table enabledPaimonPartitionedTable =
548+
paimonCatalog.getTable(
549+
Identifier.create(DATABASE, partitionedTablePath.getTableName()));
550+
551+
Map<String, String> updatedProperties = new HashMap<>();
552+
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
553+
TableDescriptor updatedPartitionedTable =
554+
partitionedTableDescriptor.withProperties(updatedProperties);
555+
556+
verifyPaimonTable(
557+
enabledPaimonPartitionedTable,
558+
updatedPartitionedTable,
559+
RowType.of(
560+
new DataType[] {
561+
org.apache.paimon.types.DataTypes.INT().notNull(),
562+
org.apache.paimon.types.DataTypes.STRING(),
563+
org.apache.paimon.types.DataTypes.STRING().notNull(),
564+
// for __bucket, __offset, __timestamp
565+
org.apache.paimon.types.DataTypes.INT(),
566+
org.apache.paimon.types.DataTypes.BIGINT(),
567+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
568+
},
569+
new String[] {
570+
"c1",
571+
"c2",
572+
"c3",
573+
BUCKET_COLUMN_NAME,
574+
OFFSET_COLUMN_NAME,
575+
TIMESTAMP_COLUMN_NAME
576+
}),
577+
"c1",
578+
BUCKET_NUM);
579+
}
580+
367581
@Test
368582
void testThrowExceptionWhenConflictWithSystemColumn() {
369583
for (String systemColumn :

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,16 @@ protected long createLogTable(TablePath tablePath) throws Exception {
256256
}
257257

258258
protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception {
259-
return createLogTable(tablePath, bucketNum, false);
259+
return createLogTable(
260+
tablePath, bucketNum, false, Collections.emptyMap(), Collections.emptyMap());
260261
}
261262

262-
protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned)
263+
protected long createLogTable(
264+
TablePath tablePath,
265+
int bucketNum,
266+
boolean isPartitioned,
267+
Map<String, String> properties,
268+
Map<String, String> customProperties)
263269
throws Exception {
264270
Schema.Builder schemaBuilder =
265271
Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING());
@@ -277,6 +283,8 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
277283
tableBuilder.property(
278284
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
279285
}
286+
tableBuilder.properties(properties);
287+
tableBuilder.customProperties(customProperties);
280288
tableBuilder.schema(schemaBuilder.build());
281289
return createTable(tablePath, tableBuilder.build());
282290
}

0 commit comments

Comments
 (0)