Skip to content

Commit 715997d

Browse files
committed
supplier
1 parent 51760d9 commit 715997d

File tree

7 files changed

+22
-19
lines changed

7 files changed

+22
-19
lines changed

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Optional;
34+
import java.util.function.Supplier;
3435

3536
/** A {@link FlinkCatalog} used for Flink 2.1. */
3637
public class Flink21Catalog extends FlinkCatalog {
@@ -41,7 +42,7 @@ public Flink21Catalog(
4142
String bootstrapServers,
4243
ClassLoader classLoader,
4344
Map<String, String> securityConfigs,
44-
Map<String, String> lakeCatalogProperties) {
45+
Supplier<Map<String, String>> lakeCatalogProperties) {
4546
super(
4647
name,
4748
defaultDatabase,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.Map;
8181
import java.util.Objects;
8282
import java.util.Optional;
83+
import java.util.function.Supplier;
8384
import java.util.stream.Collectors;
8485

8586
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -115,7 +116,7 @@ public class FlinkCatalog extends AbstractCatalog {
115116
protected final String bootstrapServers;
116117
protected final Map<String, String> securityConfigs;
117118
protected final LakeFlinkCatalog lakeFlinkCatalog;
118-
protected final Map<String, String> lakeCatalogProperties;
119+
protected final Supplier<Map<String, String>> lakeCatalogProperties;
119120
protected Connection connection;
120121
protected Admin admin;
121122

@@ -125,7 +126,7 @@ public FlinkCatalog(
125126
String bootstrapServers,
126127
ClassLoader classLoader,
127128
Map<String, String> securityConfigs,
128-
Map<String, String> lakeCatalogProperties) {
129+
Supplier<Map<String, String>> lakeCatalogProperties) {
129130
super(name, defaultDatabase);
130131
this.catalogName = name;
131132
this.defaultDatabase = defaultDatabase;
@@ -302,7 +303,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
302303
objectPath.getDatabaseName(),
303304
tableName,
304305
tableInfo.getProperties(),
305-
lakeCatalogProperties);
306+
lakeCatalogProperties.get());
306307
} else {
307308
tableInfo = admin.getTableInfo(tablePath).get();
308309
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,18 @@ public FlinkCatalog createCatalog(Context context) {
7171
Map<String, String> options = context.getOptions();
7272
Map<String, String> securityConfigs = extractPrefix(options, CLIENT_SECURITY_PREFIX);
7373

74-
Map<String, String> lakeCatalogProperties = new HashMap<>();
75-
for (DataLakeFormat lakeFormat : DataLakeFormat.values()) {
76-
lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString()));
77-
}
78-
7974
return new FlinkCatalog(
8075
context.getName(),
8176
helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE),
8277
helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS),
8378
context.getClassLoader(),
8479
securityConfigs,
85-
lakeCatalogProperties);
80+
() -> {
81+
Map<String, String> lakeCatalogProperties = new HashMap<>();
82+
for (DataLakeFormat lakeFormat : DataLakeFormat.values()) {
83+
lakeCatalogProperties.putAll(extractPrefix(options, lakeFormat.toString()));
84+
}
85+
return lakeCatalogProperties;
86+
});
8687
}
8788
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void testCreateCatalog() {
6565
BOOTSTRAP_SERVERS_NAME,
6666
Thread.currentThread().getContextClassLoader(),
6767
Collections.emptyMap(),
68-
Collections.emptyMap());
68+
Collections::emptyMap);
6969

7070
checkEquals(flinkCatalog, actualCatalog);
7171

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ static void beforeAll() {
116116
bootstrapServers,
117117
Thread.currentThread().getContextClassLoader(),
118118
Collections.emptyMap(),
119-
Collections.emptyMap());
119+
Collections::emptyMap);
120120
catalog.open();
121121
}
122122

@@ -771,7 +771,7 @@ void testAuthentication() throws Exception {
771771
bootstrapServers,
772772
Thread.currentThread().getContextClassLoader(),
773773
Collections.emptyMap(),
774-
Collections.emptyMap());
774+
Collections::emptyMap);
775775
Catalog finalAuthenticateCatalog = authenticateCatalog;
776776
assertThatThrownBy(finalAuthenticateCatalog::open)
777777
.cause()
@@ -790,7 +790,7 @@ void testAuthentication() throws Exception {
790790
bootstrapServers,
791791
Thread.currentThread().getContextClassLoader(),
792792
clientConfig,
793-
Collections.emptyMap());
793+
Collections::emptyMap);
794794
authenticateCatalog.open();
795795
assertThat(authenticateCatalog.listDatabases())
796796
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB));

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ static void beforeAll() {
169169
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
170170
Thread.currentThread().getContextClassLoader(),
171171
Collections.emptyMap(),
172-
Collections.emptyMap());
172+
Collections::emptyMap);
173173
catalog.open();
174174
}
175175

@@ -625,7 +625,7 @@ void testDatabase() throws Exception {
625625
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
626626
Thread.currentThread().getContextClassLoader(),
627627
Collections.emptyMap(),
628-
Collections.emptyMap()))
628+
Collections::emptyMap))
629629
.hasMessageContaining("defaultDatabase cannot be null or empty");
630630
}
631631

@@ -814,7 +814,7 @@ void testConnectionFailureHandling() {
814814
"invalid-bootstrap-server:9092",
815815
Thread.currentThread().getContextClassLoader(),
816816
Collections.emptyMap(),
817-
Collections.emptyMap());
817+
Collections::emptyMap);
818818

819819
// Test open() throws proper exception
820820
assertThatThrownBy(() -> badCatalog.open())
@@ -945,7 +945,7 @@ void testSecurityConfigsIntegration() throws Exception {
945945
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
946946
Thread.currentThread().getContextClassLoader(),
947947
securityConfigs,
948-
Collections.emptyMap());
948+
Collections::emptyMap);
949949
securedCatalog.open();
950950

951951
try {

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void buildCatalog() {
103103
bootstrapServers,
104104
Thread.currentThread().getContextClassLoader(),
105105
Collections.emptyMap(),
106-
Collections.emptyMap());
106+
Collections::emptyMap);
107107
catalog.open();
108108
}
109109
}

0 commit comments

Comments
 (0)