Skip to content

Commit b85ba5a

Browse files
[flink] Get table info of primary lake table. (apache#2152)
1 parent 03c8602 commit b85ba5a

File tree

4 files changed

+166
-27
lines changed

4 files changed

+166
-27
lines changed

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.fluss.flink.catalog;
2020

21+
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2122
import org.apache.fluss.metadata.TableInfo;
2223

24+
import org.apache.flink.annotation.VisibleForTesting;
2325
import org.apache.flink.table.api.Schema;
2426
import org.apache.flink.table.catalog.CatalogBaseTable;
2527
import org.apache.flink.table.catalog.CatalogTable;
@@ -44,11 +46,29 @@ public Flink21Catalog(
4446
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
4547
}
4648

49+
@VisibleForTesting
50+
public Flink21Catalog(
51+
String name,
52+
String defaultDatabase,
53+
String bootstrapServers,
54+
ClassLoader classLoader,
55+
Map<String, String> securityConfigs,
56+
LakeFlinkCatalog lakeFlinkCatalog) {
57+
super(
58+
name,
59+
defaultDatabase,
60+
bootstrapServers,
61+
classLoader,
62+
securityConfigs,
63+
lakeFlinkCatalog);
64+
}
65+
4766
@Override
4867
public CatalogBaseTable getTable(ObjectPath objectPath)
4968
throws TableNotExistException, CatalogException {
5069
CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
51-
if (!(catalogBaseTable instanceof CatalogTable)) {
70+
if (!(catalogBaseTable instanceof CatalogTable)
71+
|| objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
5272
return catalogBaseTable;
5373
}
5474

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.catalog;
19+
20+
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
21+
22+
import org.apache.flink.table.api.DataTypes;
23+
import org.apache.flink.table.catalog.Column;
24+
import org.apache.flink.table.catalog.DefaultIndex;
25+
import org.apache.flink.table.catalog.ResolvedSchema;
26+
import org.apache.flink.table.catalog.UniqueConstraint;
27+
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
31+
/** Test for {@link Flink21Catalog}. */
32+
public class FlinkCatalog21Test extends FlinkCatalogTest {
33+
34+
@Override
35+
protected FlinkCatalog initCatalog(
36+
String catalogName,
37+
String databaseName,
38+
String bootstrapServers,
39+
LakeFlinkCatalog lakeFlinkCatalog) {
40+
return new Flink21Catalog(
41+
catalogName,
42+
databaseName,
43+
bootstrapServers,
44+
Thread.currentThread().getContextClassLoader(),
45+
Collections.emptyMap(),
46+
lakeFlinkCatalog);
47+
}
48+
49+
protected ResolvedSchema createSchema() {
50+
return new ResolvedSchema(
51+
Arrays.asList(
52+
Column.physical("first", DataTypes.STRING().notNull()),
53+
Column.physical("second", DataTypes.INT()),
54+
Column.physical("third", DataTypes.STRING().notNull())),
55+
Collections.emptyList(),
56+
UniqueConstraint.primaryKey("PK_first_third", Arrays.asList("first", "third")),
57+
Collections.singletonList(
58+
DefaultIndex.newIndex(
59+
"INDEX_first_third", Arrays.asList("first", "third"))));
60+
}
61+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,30 @@ public FlinkCatalog(
124124
String bootstrapServers,
125125
ClassLoader classLoader,
126126
Map<String, String> securityConfigs) {
127+
this(
128+
name,
129+
defaultDatabase,
130+
bootstrapServers,
131+
classLoader,
132+
securityConfigs,
133+
new LakeFlinkCatalog(name, classLoader));
134+
}
135+
136+
@VisibleForTesting
137+
public FlinkCatalog(
138+
String name,
139+
String defaultDatabase,
140+
String bootstrapServers,
141+
ClassLoader classLoader,
142+
Map<String, String> securityConfigs,
143+
LakeFlinkCatalog lakeFlinkCatalog) {
127144
super(name, defaultDatabase);
128145
this.catalogName = name;
129146
this.defaultDatabase = defaultDatabase;
130147
this.bootstrapServers = bootstrapServers;
131148
this.classLoader = classLoader;
132149
this.securityConfigs = securityConfigs;
133-
this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader);
150+
this.lakeFlinkCatalog = lakeFlinkCatalog;
134151
}
135152

136153
@Override

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.exception.IllegalConfigurationException;
2323
import org.apache.fluss.exception.InvalidPartitionException;
2424
import org.apache.fluss.exception.InvalidTableException;
25+
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2526
import org.apache.fluss.flink.utils.FlinkConversionsTest;
2627
import org.apache.fluss.server.testutils.FlussClusterExtension;
2728
import org.apache.fluss.utils.ExceptionUtils;
@@ -36,6 +37,7 @@
3637
import org.apache.flink.table.catalog.CatalogPartitionSpec;
3738
import org.apache.flink.table.catalog.CatalogTable;
3839
import org.apache.flink.table.catalog.Column;
40+
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
3941
import org.apache.flink.table.catalog.IntervalFreshness;
4042
import org.apache.flink.table.catalog.ObjectPath;
4143
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -58,8 +60,7 @@
5860
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
5961
import org.apache.flink.table.factories.Factory;
6062
import org.apache.flink.table.factories.FactoryUtil;
61-
import org.junit.jupiter.api.AfterAll;
62-
import org.junit.jupiter.api.BeforeAll;
63+
import org.junit.jupiter.api.AfterEach;
6364
import org.junit.jupiter.api.BeforeEach;
6465
import org.junit.jupiter.api.Test;
6566
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -78,6 +79,7 @@
7879
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
7980
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
8081
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
82+
import static org.apache.fluss.flink.adapter.CatalogTableAdapter.toCatalogTable;
8183
import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions;
8284
import static org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
8385
import static org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema;
@@ -101,7 +103,8 @@ class FlinkCatalogTest {
101103
private static final FlinkConversionsTest.TestRefreshHandler REFRESH_HANDLER =
102104
new FlinkConversionsTest.TestRefreshHandler("jobID: xxx, clusterId: yyy");
103105

104-
static Catalog catalog;
106+
private Catalog catalog;
107+
private MockLakeFlinkCatalog mockLakeCatalog;
105108
private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB, "t1");
106109

107110
private static Configuration initConfig() {
@@ -110,7 +113,7 @@ private static Configuration initConfig() {
110113
return configuration;
111114
}
112115

113-
private ResolvedSchema createSchema() {
116+
protected ResolvedSchema createSchema() {
114117
return new ResolvedSchema(
115118
Arrays.asList(
116119
Column.physical("first", DataTypes.STRING().notNull()),
@@ -128,7 +131,7 @@ private CatalogTable newCatalogTable(Map<String, String> options) {
128131
private CatalogTable newCatalogTable(
129132
ResolvedSchema resolvedSchema, Map<String, String> options) {
130133
CatalogTable origin =
131-
CatalogTable.of(
134+
toCatalogTable(
132135
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
133136
"test comment",
134137
Collections.emptyList(),
@@ -158,29 +161,36 @@ private CatalogMaterializedTable newCatalogMaterializedTable(
158161
return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
159162
}
160163

161-
@BeforeAll
162-
static void beforeAll() {
164+
protected FlinkCatalog initCatalog(
165+
String catalogName,
166+
String databaseName,
167+
String bootstrapServers,
168+
LakeFlinkCatalog lakeFlinkCatalog) {
169+
return new FlinkCatalog(
170+
catalogName,
171+
databaseName,
172+
bootstrapServers,
173+
Thread.currentThread().getContextClassLoader(),
174+
Collections.emptyMap(),
175+
lakeFlinkCatalog);
176+
}
177+
178+
@BeforeEach
179+
void beforeEach() throws Exception {
163180
// set fluss conf
164181
Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
182+
183+
mockLakeCatalog =
184+
new MockLakeFlinkCatalog(
185+
CATALOG_NAME, Thread.currentThread().getContextClassLoader());
165186
catalog =
166-
new FlinkCatalog(
187+
initCatalog(
167188
CATALOG_NAME,
168189
DEFAULT_DB,
169190
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
170-
Thread.currentThread().getContextClassLoader(),
171-
Collections.emptyMap());
191+
mockLakeCatalog);
172192
catalog.open();
173-
}
174193

175-
@AfterAll
176-
static void afterAll() {
177-
if (catalog != null) {
178-
catalog.close();
179-
}
180-
}
181-
182-
@BeforeEach
183-
void beforeEach() throws Exception {
184194
// First check if database exists, and drop it if it does
185195
if (catalog.databaseExists(DEFAULT_DB)) {
186196
catalog.dropDatabase(DEFAULT_DB, true, true);
@@ -198,6 +208,13 @@ void beforeEach() throws Exception {
198208
}
199209
}
200210

211+
@AfterEach
212+
void afterEach() {
213+
if (catalog != null) {
214+
catalog.close();
215+
}
216+
}
217+
201218
@Test
202219
void testCreateTable() throws Exception {
203220
Map<String, String> options = new HashMap<>();
@@ -270,7 +287,7 @@ void testCreateTable() throws Exception {
270287
ResolvedSchema resolvedSchema = this.createSchema();
271288
CatalogTable table2 =
272289
new ResolvedCatalogTable(
273-
CatalogTable.of(
290+
toCatalogTable(
274291
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
275292
"test comment",
276293
Collections.singletonList("first"),
@@ -306,6 +323,11 @@ void testCreateAlreadyExistsLakeTable() throws Exception {
306323
CatalogTable table = this.newCatalogTable(options);
307324
catalog.createTable(lakeTablePath, table, false);
308325
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
326+
// get the lake table from lake catalog.
327+
mockLakeCatalog.registerLakeTable(lakeTablePath, table);
328+
assertThat((CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, "lake_table$lake")))
329+
.isEqualTo(table);
330+
309331
// drop fluss table
310332
catalog.dropTable(lakeTablePath, false);
311333
assertThat(catalog.tableExists(lakeTablePath)).isFalse();
@@ -363,7 +385,7 @@ void testCreateTableWithWatermarkAndComputedCol() throws Exception {
363385
UniqueConstraint.primaryKey(
364386
"PK_first", Collections.singletonList("first")));
365387
CatalogTable origin =
366-
CatalogTable.of(
388+
toCatalogTable(
367389
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
368390
"test comment",
369391
Collections.emptyList(),
@@ -648,7 +670,7 @@ void testOperatePartitions() throws Exception {
648670
ResolvedSchema resolvedSchema = this.createSchema();
649671
CatalogTable table2 =
650672
new ResolvedCatalogTable(
651-
CatalogTable.of(
673+
toCatalogTable(
652674
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
653675
"test comment",
654676
Collections.singletonList("first"),
@@ -758,7 +780,7 @@ void testCreatePartitions() throws Exception {
758780
ObjectPath partitionedPath = new ObjectPath(DEFAULT_DB, "partitioned_table1");
759781
CatalogTable partitionedTable =
760782
new ResolvedCatalogTable(
761-
CatalogTable.of(
783+
toCatalogTable(
762784
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
763785
"test comment",
764786
Collections.singletonList("first"),
@@ -845,7 +867,7 @@ void testStatisticsOperations() throws Exception {
845867
ResolvedSchema schema = createSchema();
846868
CatalogTable partTable =
847869
new ResolvedCatalogTable(
848-
CatalogTable.of(
870+
toCatalogTable(
849871
Schema.newBuilder().fromResolvedSchema(schema).build(),
850872
"partitioned table for stats",
851873
Collections.singletonList("first"),
@@ -974,4 +996,23 @@ private void createAndCheckAndDropTable(
974996
checkEqualsRespectSchema((CatalogTable) tableCreated, table);
975997
catalog.dropTable(tablePath, false);
976998
}
999+
1000+
private static class MockLakeFlinkCatalog extends LakeFlinkCatalog {
1001+
private final GenericInMemoryCatalog catalog;
1002+
1003+
public MockLakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
1004+
super(catalogName, classLoader);
1005+
catalog = new GenericInMemoryCatalog(catalogName, DEFAULT_DB);
1006+
}
1007+
1008+
@Override
1009+
public Catalog getLakeCatalog(Configuration tableOptions) {
1010+
return catalog;
1011+
}
1012+
1013+
void registerLakeTable(ObjectPath tablePath, CatalogTable table)
1014+
throws TableAlreadyExistException, DatabaseNotExistException {
1015+
catalog.createTable(tablePath, table, false);
1016+
}
1017+
}
9771018
}

0 commit comments

Comments
 (0)