Skip to content

Commit 4096401

Browse files
committed
Fix build errors
1 parent 0b63ffe commit 4096401

File tree

4 files changed

+101
-6
lines changed

4 files changed

+101
-6
lines changed

fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/catalog/Flink22Catalog.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.fluss.flink.catalog;
2020

21+
import org.apache.fluss.annotation.VisibleForTesting;
22+
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2123
import org.apache.fluss.metadata.TableInfo;
2224

2325
import org.apache.flink.table.api.Schema;
@@ -31,6 +33,7 @@
3133
import java.util.List;
3234
import java.util.Map;
3335
import java.util.Optional;
36+
import java.util.function.Supplier;
3437

3538
/** A {@link FlinkCatalog} used for Flink 2.2. */
3639
public class Flink22Catalog extends FlinkCatalog {
@@ -40,15 +43,42 @@ public Flink22Catalog(
4043
String defaultDatabase,
4144
String bootstrapServers,
4245
ClassLoader classLoader,
43-
Map<String, String> securityConfigs) {
44-
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
46+
Map<String, String> securityConfigs,
47+
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
48+
super(
49+
name,
50+
defaultDatabase,
51+
bootstrapServers,
52+
classLoader,
53+
securityConfigs,
54+
lakeCatalogPropertiesSupplier);
55+
}
56+
57+
@VisibleForTesting
58+
public Flink22Catalog(
59+
String name,
60+
String defaultDatabase,
61+
String bootstrapServers,
62+
ClassLoader classLoader,
63+
Map<String, String> securityConfigs,
64+
Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
65+
LakeFlinkCatalog lakeFlinkCatalog) {
66+
super(
67+
name,
68+
defaultDatabase,
69+
bootstrapServers,
70+
classLoader,
71+
securityConfigs,
72+
lakeCatalogPropertiesSupplier,
73+
lakeFlinkCatalog);
4574
}
4675

4776
@Override
4877
public CatalogBaseTable getTable(ObjectPath objectPath)
4978
throws TableNotExistException, CatalogException {
5079
CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
51-
if (!(catalogBaseTable instanceof CatalogTable)) {
80+
if (!(catalogBaseTable instanceof CatalogTable)
81+
|| objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
5282
return catalogBaseTable;
5383
}
5484

@@ -67,7 +97,8 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
6797
// Judge whether we can do prefix lookup.
6898
TableInfo tableInfo = connection.getTable(toTablePath(objectPath)).getTableInfo();
6999
List<String> bucketKeys = tableInfo.getBucketKeys();
70-
// For partition table, the physical primary key is the primary key that excludes the
100+
// For partition table, the physical primary key is the primary key that
101+
// excludes the
71102
// partition key
72103
List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
73104
List<String> indexKeys = new ArrayList<>();

fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/catalog/Flink22CatalogFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public FlinkCatalog createCatalog(Context context) {
2929
catalog.defaultDatabase,
3030
catalog.bootstrapServers,
3131
catalog.classLoader,
32-
catalog.securityConfigs);
32+
catalog.securityConfigs,
33+
catalog.lakeCatalogPropertiesSupplier);
3334
}
3435
}

fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ static void beforeAll() {
4242
catalog.defaultDatabase,
4343
catalog.bootstrapServers,
4444
catalog.classLoader,
45-
catalog.securityConfigs);
45+
catalog.securityConfigs,
46+
catalog.lakeCatalogPropertiesSupplier);
4647
catalog.open();
4748
}
4849

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.catalog;
19+
20+
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
21+
22+
import org.apache.flink.table.api.DataTypes;
23+
import org.apache.flink.table.catalog.Column;
24+
import org.apache.flink.table.catalog.DefaultIndex;
25+
import org.apache.flink.table.catalog.ResolvedSchema;
26+
import org.apache.flink.table.catalog.UniqueConstraint;
27+
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
31+
/** Test for {@link Flink21Catalog}. */
32+
public class FlinkCatalog22Test extends FlinkCatalogTest {
33+
34+
@Override
35+
protected FlinkCatalog initCatalog(
36+
String catalogName,
37+
String databaseName,
38+
String bootstrapServers,
39+
LakeFlinkCatalog lakeFlinkCatalog) {
40+
return new Flink22Catalog(
41+
catalogName,
42+
databaseName,
43+
bootstrapServers,
44+
Thread.currentThread().getContextClassLoader(),
45+
Collections.emptyMap(),
46+
Collections::emptyMap,
47+
lakeFlinkCatalog);
48+
}
49+
50+
protected ResolvedSchema createSchema() {
51+
return new ResolvedSchema(
52+
Arrays.asList(
53+
Column.physical("first", DataTypes.STRING().notNull()),
54+
Column.physical("second", DataTypes.INT()),
55+
Column.physical("third", DataTypes.STRING().notNull())),
56+
Collections.emptyList(),
57+
UniqueConstraint.primaryKey("PK_first_third", Arrays.asList("first", "third")),
58+
Collections.singletonList(
59+
DefaultIndex.newIndex(
60+
"INDEX_first_third", Arrays.asList("first", "third"))));
61+
}
62+
}

0 commit comments

Comments
 (0)