Skip to content

Commit abe5966

Browse files
authored
[flink] Increase code coverage for FlinkCatalog (#942)
1 parent 7aed531 commit abe5966

File tree

2 files changed

+245
-4
lines changed

2 files changed

+245
-4
lines changed

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 245 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.alibaba.fluss.config.ConfigOptions;
2020
import com.alibaba.fluss.config.Configuration;
21+
import com.alibaba.fluss.exception.IllegalConfigurationException;
2122
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
2223
import com.alibaba.fluss.utils.ExceptionUtils;
2324

@@ -38,11 +39,16 @@
3839
import org.apache.flink.table.catalog.exceptions.CatalogException;
3940
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
4041
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
42+
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
43+
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
4144
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
4245
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
4346
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
47+
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
48+
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
4449
import org.apache.flink.table.expressions.ResolvedExpression;
4550
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
51+
import org.apache.flink.table.factories.Factory;
4652
import org.apache.flink.table.factories.FactoryUtil;
4753
import org.junit.jupiter.api.AfterAll;
4854
import org.junit.jupiter.api.BeforeAll;
@@ -55,7 +61,9 @@
5561
import java.util.HashMap;
5662
import java.util.List;
5763
import java.util.Map;
64+
import java.util.Optional;
5865

66+
import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
5967
import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
6068
import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
6169
import static com.alibaba.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
@@ -111,7 +119,7 @@ static void beforeAll() {
111119
new FlinkCatalog(
112120
CATALOG_NAME,
113121
DEFAULT_DB,
114-
String.join(",", flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
122+
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
115123
Thread.currentThread().getContextClassLoader(),
116124
Collections.emptyMap());
117125
catalog.open();
@@ -221,6 +229,19 @@ void testCreateTable() throws Exception {
221229
expectedTable = addOptions(table2, addedOptions);
222230

223231
checkEqualsRespectSchema((CatalogTable) tableCreated, expectedTable);
232+
233+
assertThatThrownBy(() -> catalog.renameTable(this.tableInDefaultDb, "newName", false))
234+
.isInstanceOf(UnsupportedOperationException.class);
235+
236+
assertThatThrownBy(() -> catalog.alterTable(this.tableInDefaultDb, null, false))
237+
.isInstanceOf(UnsupportedOperationException.class);
238+
239+
// Test lake table handling - should throw TableNotExistException for non-existent lake
240+
// table
241+
ObjectPath lakePath = new ObjectPath(DEFAULT_DB, "regularTable$lake");
242+
assertThatThrownBy(() -> catalog.getTable(lakePath))
243+
.isInstanceOf(TableNotExistException.class)
244+
.hasMessageContaining("regularTable$lake does not exist");
224245
}
225246

226247
@Test
@@ -394,6 +415,21 @@ void testDatabase() throws Exception {
394415
assertThatThrownBy(() -> catalog.listTables("unknown"))
395416
.isInstanceOf(DatabaseNotExistException.class)
396417
.hasMessage("Database %s does not exist in Catalog %s.", "unknown", CATALOG_NAME);
418+
assertThatThrownBy(() -> catalog.alterDatabase("db2", null, false))
419+
.isInstanceOf(UnsupportedOperationException.class);
420+
assertThat(catalog.getDefaultDatabase()).isEqualTo(DEFAULT_DB);
421+
422+
// Test catalog with null default database
423+
Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
424+
Catalog catalogWithoutDefault =
425+
new FlinkCatalog(
426+
"test-catalog-no-default",
427+
null, // null default database
428+
String.join(",", flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
429+
Thread.currentThread().getContextClassLoader(),
430+
Collections.emptyMap());
431+
432+
assertThat(catalogWithoutDefault.getDefaultDatabase()).isNull();
397433
}
398434

399435
@Test
@@ -433,6 +469,51 @@ void testListPartitions() throws Exception {
433469
List<CatalogPartitionSpec> catalogPartitionSpecs = catalog.listPartitions(path2);
434470
assertThat(catalogPartitionSpecs).hasSize(1);
435471
assertThat(catalogPartitionSpecs.get(0).getPartitionSpec()).containsEntry("first", "1");
472+
// NEW: Test dropPartition functionality
473+
CatalogPartitionSpec firstPartSpec = catalogPartitionSpecs.get(0);
474+
catalog.dropPartition(path2, firstPartSpec, false);
475+
476+
// Verify partition is gone
477+
assertThat(catalog.listPartitions(path2)).isEmpty();
478+
479+
// Recreate partition for further testing
480+
catalog.createPartition(path2, firstPartSpec, null, false);
481+
482+
// Test dropping non-existent partition
483+
CatalogPartitionSpec nonExistentSpec =
484+
new CatalogPartitionSpec(Collections.singletonMap("first", "999"));
485+
assertThatThrownBy(() -> catalog.dropPartition(path2, nonExistentSpec, false))
486+
.isInstanceOf(
487+
org.apache.flink.table.catalog.exceptions.PartitionNotExistException.class);
488+
489+
// Should not throw with ignoreIfNotExists = true
490+
catalog.dropPartition(path2, nonExistentSpec, true);
491+
492+
// NEW: Test partition creation exceptions
493+
// Try to create duplicate partition
494+
assertThatThrownBy(() -> catalog.createPartition(path2, firstPartSpec, null, false))
495+
.isInstanceOf(PartitionAlreadyExistsException.class);
496+
497+
// NEW: Test unsupported partition operations
498+
assertThatThrownBy(() -> catalog.getPartition(path2, firstPartSpec))
499+
.isInstanceOf(UnsupportedOperationException.class);
500+
501+
assertThatThrownBy(() -> catalog.partitionExists(path2, firstPartSpec))
502+
.isInstanceOf(UnsupportedOperationException.class);
503+
504+
assertThatThrownBy(() -> catalog.alterPartition(path2, firstPartSpec, null, false))
505+
.isInstanceOf(UnsupportedOperationException.class);
506+
507+
CatalogPartitionSpec testSpec =
508+
new CatalogPartitionSpec(Collections.singletonMap("first", "test"));
509+
assertThatThrownBy(() -> catalog.listPartitions(path2, testSpec))
510+
.isInstanceOf(UnsupportedOperationException.class);
511+
512+
assertThatThrownBy(() -> catalog.listPartitionsByFilter(path2, Collections.emptyList()))
513+
.isInstanceOf(UnsupportedOperationException.class);
514+
515+
// Clean up the partition we created for testing
516+
catalog.dropPartition(path2, firstPartSpec, false);
436517
}
437518

438519
private void createAndCheckAndDropTable(
@@ -444,4 +525,167 @@ private void createAndCheckAndDropTable(
444525
checkEqualsRespectSchema((CatalogTable) tableCreated, table);
445526
catalog.dropTable(tablePath, false);
446527
}
528+
529+
@Test
530+
void testConnectionFailureHandling() {
531+
// Create a catalog with invalid connection settings
532+
Catalog badCatalog =
533+
new FlinkCatalog(
534+
"bad-catalog",
535+
"default",
536+
"invalid-bootstrap-server:9092",
537+
Thread.currentThread().getContextClassLoader(),
538+
Collections.emptyMap());
539+
540+
// Test open() throws proper exception
541+
assertThatThrownBy(() -> badCatalog.open())
542+
.isInstanceOf(IllegalConfigurationException.class)
543+
.hasMessageContaining("No resolvable bootstrap urls");
544+
}
545+
546+
@Test
547+
void testStatisticsOperations() throws Exception {
548+
// Statistics testing
549+
CatalogTable table = newCatalogTable(Collections.emptyMap());
550+
ObjectPath tablePath = new ObjectPath(DEFAULT_DB, "statsTable");
551+
catalog.createTable(tablePath, table, false);
552+
553+
// Test table statistics - should return UNKNOWN for existing tables
554+
CatalogTableStatistics tableStats = catalog.getTableStatistics(tablePath);
555+
assertThat(tableStats).isEqualTo(CatalogTableStatistics.UNKNOWN);
556+
557+
CatalogColumnStatistics columnStats = catalog.getTableColumnStatistics(tablePath);
558+
assertThat(columnStats).isEqualTo(CatalogColumnStatistics.UNKNOWN);
559+
560+
// Test that statistics methods return UNKNOWN even for non-existent tables
561+
ObjectPath nonExistent = new ObjectPath(DEFAULT_DB, "nonexistent");
562+
CatalogTableStatistics nonExistentStats = catalog.getTableStatistics(nonExistent);
563+
assertThat(nonExistentStats).isEqualTo(CatalogTableStatistics.UNKNOWN);
564+
565+
CatalogColumnStatistics nonExistentColStats = catalog.getTableColumnStatistics(nonExistent);
566+
assertThat(nonExistentColStats).isEqualTo(CatalogColumnStatistics.UNKNOWN);
567+
568+
// Create partitioned table for partition statistics testing
569+
ResolvedSchema schema = createSchema();
570+
CatalogTable partTable =
571+
new ResolvedCatalogTable(
572+
CatalogTable.of(
573+
Schema.newBuilder().fromResolvedSchema(schema).build(),
574+
"partitioned table for stats",
575+
Collections.singletonList("first"),
576+
Collections.emptyMap()),
577+
schema);
578+
579+
ObjectPath partTablePath = new ObjectPath(DEFAULT_DB, "partStatsTable");
580+
catalog.createTable(partTablePath, partTable, false);
581+
582+
CatalogPartitionSpec partSpec =
583+
new CatalogPartitionSpec(Collections.singletonMap("first", "value"));
584+
catalog.createPartition(partTablePath, partSpec, null, false);
585+
586+
// Test partition statistics - should return UNKNOWN
587+
CatalogTableStatistics partStats = catalog.getPartitionStatistics(partTablePath, partSpec);
588+
assertThat(partStats).isEqualTo(CatalogTableStatistics.UNKNOWN);
589+
590+
CatalogColumnStatistics partColStats =
591+
catalog.getPartitionColumnStatistics(partTablePath, partSpec);
592+
assertThat(partColStats).isEqualTo(CatalogColumnStatistics.UNKNOWN);
593+
594+
// Test unsupported statistics operations
595+
assertThatThrownBy(() -> catalog.alterTableStatistics(tablePath, null, false))
596+
.isInstanceOf(UnsupportedOperationException.class);
597+
598+
assertThatThrownBy(() -> catalog.alterTableColumnStatistics(tablePath, null, false))
599+
.isInstanceOf(UnsupportedOperationException.class);
600+
601+
assertThatThrownBy(
602+
() ->
603+
catalog.alterPartitionStatistics(
604+
partTablePath, partSpec, null, false))
605+
.isInstanceOf(UnsupportedOperationException.class);
606+
607+
assertThatThrownBy(
608+
() ->
609+
catalog.alterPartitionColumnStatistics(
610+
partTablePath, partSpec, null, false))
611+
.isInstanceOf(UnsupportedOperationException.class);
612+
613+
// Clean up
614+
catalog.dropPartition(partTablePath, partSpec, false);
615+
catalog.dropTable(partTablePath, false);
616+
catalog.dropTable(tablePath, false);
617+
}
618+
619+
@Test
620+
void testViewsAndFunctions() throws Exception {
621+
622+
List<String> views = catalog.listViews(DEFAULT_DB);
623+
assertThat(views).isEmpty();
624+
625+
// Test functions operations
626+
List<String> functions = catalog.listFunctions(DEFAULT_DB);
627+
assertThat(functions).isEmpty();
628+
629+
ObjectPath functionPath = new ObjectPath(DEFAULT_DB, "testFunction");
630+
assertThat(catalog.functionExists(functionPath)).isFalse();
631+
632+
// Test getFunction - should always throw FunctionNotExistException
633+
assertThatThrownBy(() -> catalog.getFunction(functionPath))
634+
.isInstanceOf(FunctionNotExistException.class);
635+
636+
// Test unsupported function operations
637+
assertThatThrownBy(() -> catalog.createFunction(functionPath, null, false))
638+
.isInstanceOf(UnsupportedOperationException.class);
639+
640+
assertThatThrownBy(() -> catalog.alterFunction(functionPath, null, false))
641+
.isInstanceOf(UnsupportedOperationException.class);
642+
643+
assertThatThrownBy(() -> catalog.dropFunction(functionPath, false))
644+
.isInstanceOf(UnsupportedOperationException.class);
645+
}
646+
647+
@Test
648+
void testGetFactory() {
649+
Optional<Factory> factory = catalog.getFactory();
650+
assertThat(factory).isPresent();
651+
assertThat(factory.get()).isInstanceOf(FlinkTableFactory.class);
652+
}
653+
654+
@Test
655+
void testSecurityConfigsIntegration() throws Exception {
656+
Map<String, String> securityConfigs = new HashMap<>();
657+
securityConfigs.put("security.protocol", "SASL_SSL");
658+
securityConfigs.put("sasl.mechanism", "PLAIN");
659+
660+
// Create catalog with security configs
661+
Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
662+
Catalog securedCatalog =
663+
new FlinkCatalog(
664+
"secured-catalog",
665+
DEFAULT_DB,
666+
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
667+
Thread.currentThread().getContextClassLoader(),
668+
securityConfigs);
669+
securedCatalog.open();
670+
671+
try {
672+
securedCatalog.createDatabase(
673+
DEFAULT_DB, new CatalogDatabaseImpl(Collections.emptyMap(), null), true);
674+
675+
Map<String, String> tableOptions = new HashMap<>();
676+
CatalogTable table = newCatalogTable(tableOptions);
677+
securedCatalog.createTable(tableInDefaultDb, table, false);
678+
679+
// Get table and verify security configs are included
680+
CatalogBaseTable retrievedTable = securedCatalog.getTable(tableInDefaultDb);
681+
Map<String, String> actualOptions = retrievedTable.getOptions();
682+
683+
assertThat(actualOptions).containsEntry("security.protocol", "SASL_SSL");
684+
assertThat(actualOptions).containsEntry("sasl.mechanism", "PLAIN");
685+
assertThat(actualOptions).containsKey(BOOTSTRAP_SERVERS.key());
686+
687+
} finally {
688+
securedCatalog.close();
689+
}
690+
}
447691
}

fluss-test-coverage/pom.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,6 @@
289289
<exclude>com.alibaba.fluss.flink.utils.*</exclude>
290290
<exclude>com.alibaba.fluss.flink.source.*
291291
</exclude>
292-
<exclude>
293-
com.alibaba.fluss.flink.catalog.FlinkCatalog
294-
</exclude>
295292
<exclude>
296293
com.alibaba.fluss.flink.catalog.FlinkCatalogOptions
297294
</exclude>

0 commit comments

Comments
 (0)