Skip to content

Commit 3b96888

Browse files
authored
[test] Improve com.alibaba.fluss.flink.catalog.* test code coverage (#989)
1 parent 20b243e commit 3b96888

File tree

7 files changed

+455
-2
lines changed

7 files changed

+455
-2
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.alibaba.fluss.utils.ExceptionUtils;
3737
import com.alibaba.fluss.utils.IOUtils;
3838

39+
import org.apache.flink.annotation.VisibleForTesting;
3940
import org.apache.flink.table.catalog.Catalog;
4041
import org.apache.flink.table.catalog.CatalogBaseTable;
4142
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -652,4 +653,9 @@ private void mayInitLakeCatalogCatalog(Configuration tableOptions) {
652653
}
653654
}
654655
}
656+
657+
@VisibleForTesting
658+
public Map<String, String> getSecurityConfigs() {
659+
return securityConfigs;
660+
}
655661
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alibaba.fluss.metadata.TablePath;
2828
import com.alibaba.fluss.row.GenericRow;
2929

30+
import org.apache.flink.annotation.VisibleForTesting;
3031
import org.apache.flink.table.api.ValidationException;
3132
import org.apache.flink.table.catalog.Column;
3233
import org.apache.flink.table.connector.ChangelogMode;
@@ -373,4 +374,9 @@ private Set<String> getPrimaryKeyNames() {
373374
}
374375
return pkNames;
375376
}
377+
378+
@VisibleForTesting
379+
public List<String> getBucketKeys() {
380+
return bucketKeys;
381+
}
376382
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.alibaba.fluss.metadata.TablePath;
3232
import com.alibaba.fluss.types.RowType;
3333

34+
import org.apache.flink.annotation.VisibleForTesting;
3435
import org.apache.flink.api.common.typeinfo.TypeInformation;
3536
import org.apache.flink.api.connector.source.Source;
3637
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -515,4 +516,25 @@ private int[] getKeyRowProjection() {
515516
}
516517
return projection;
517518
}
519+
520+
@VisibleForTesting
521+
@Nullable
522+
public LookupCache getCache() {
523+
return cache;
524+
}
525+
526+
@VisibleForTesting
527+
public int[] getPrimaryKeyIndexes() {
528+
return primaryKeyIndexes;
529+
}
530+
531+
@VisibleForTesting
532+
public int[] getBucketKeyIndexes() {
533+
return bucketKeyIndexes;
534+
}
535+
536+
@VisibleForTesting
537+
public int[] getPartitionKeyIndexes() {
538+
return partitionKeyIndexes;
539+
}
518540
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.catalog;
18+
19+
import com.alibaba.fluss.config.ConfigOptions;
20+
import com.alibaba.fluss.flink.FlinkConnectorOptions;
21+
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.table.api.ValidationException;
24+
import org.apache.flink.table.catalog.CommonCatalogOptions;
25+
import org.apache.flink.table.factories.FactoryUtil;
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
34+
35+
/** Test for {@link FlinkCatalogFactory}. */
36+
public class FlinkCatalogFactoryTest {
37+
38+
private static final String CATALOG_NAME = "my_catalog";
39+
private static final String BOOTSTRAP_SERVERS_NAME = "localhost:9092";
40+
private static final String DB_NAME = "my_db";
41+
42+
@Test
43+
public void testCreateCatalog() {
44+
Map<String, String> options = new HashMap<>();
45+
options.put(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), BOOTSTRAP_SERVERS_NAME);
46+
options.put(FlinkCatalogOptions.DEFAULT_DATABASE.key(), DB_NAME);
47+
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), FlinkCatalogFactory.IDENTIFIER);
48+
49+
// test create catalog
50+
FlinkCatalog actualCatalog =
51+
(FlinkCatalog)
52+
FactoryUtil.createCatalog(
53+
CATALOG_NAME,
54+
options,
55+
new Configuration(),
56+
Thread.currentThread().getContextClassLoader());
57+
58+
FlinkCatalog flinkCatalog =
59+
new FlinkCatalog(
60+
CATALOG_NAME,
61+
DB_NAME,
62+
BOOTSTRAP_SERVERS_NAME,
63+
Thread.currentThread().getContextClassLoader(),
64+
Collections.emptyMap());
65+
66+
checkEquals(flinkCatalog, actualCatalog);
67+
68+
// test security configs
69+
Map<String, String> securityMap = new HashMap<>();
70+
securityMap.put(ConfigOptions.CLIENT_SECURITY_PROTOCOL.key(), "username_password");
71+
securityMap.put("client.security.username_password.username", "root");
72+
securityMap.put("client.security.username_password.password", "password");
73+
74+
options.putAll(securityMap);
75+
FlinkCatalog actualCatalog2 =
76+
(FlinkCatalog)
77+
FactoryUtil.createCatalog(
78+
CATALOG_NAME,
79+
options,
80+
new Configuration(),
81+
Thread.currentThread().getContextClassLoader());
82+
83+
assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
84+
}
85+
86+
@Test
87+
public void testOptions() {
88+
Map<String, String> options = new HashMap<>();
89+
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), FlinkCatalogFactory.IDENTIFIER);
90+
91+
// test required options
92+
assertThatThrownBy(
93+
() ->
94+
FactoryUtil.createCatalog(
95+
CATALOG_NAME,
96+
options,
97+
new Configuration(),
98+
Thread.currentThread().getContextClassLoader()))
99+
.rootCause()
100+
.isInstanceOf(ValidationException.class)
101+
.hasMessageContaining(
102+
"Missing required options are:\n" + "\n" + "bootstrap.servers");
103+
104+
// test op options
105+
options.put(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), BOOTSTRAP_SERVERS_NAME);
106+
FlinkCatalog actualCatalog =
107+
(FlinkCatalog)
108+
FactoryUtil.createCatalog(
109+
CATALOG_NAME,
110+
options,
111+
new Configuration(),
112+
Thread.currentThread().getContextClassLoader());
113+
114+
assertThat(actualCatalog.getDefaultDatabase())
115+
.isEqualTo(FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue());
116+
}
117+
118+
private static void checkEquals(FlinkCatalog c1, FlinkCatalog c2) {
119+
assertThat(c2.getName()).isEqualTo(c1.getName());
120+
assertThat(c2.getDefaultDatabase()).isEqualTo(c1.getDefaultDatabase());
121+
}
122+
}

0 commit comments

Comments
 (0)