Skip to content

Commit ee33fa2

Browse files
authored
[flink] Support Delta Join on Flink 2.1 (#1726)
1 parent 997b146 commit ee33fa2

File tree

7 files changed

+457
-6
lines changed

7 files changed

+457
-6
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.catalog;
20+
21+
import org.apache.fluss.metadata.TableInfo;
22+
23+
import org.apache.flink.table.api.Schema;
24+
import org.apache.flink.table.catalog.CatalogBaseTable;
25+
import org.apache.flink.table.catalog.CatalogTable;
26+
import org.apache.flink.table.catalog.ObjectPath;
27+
import org.apache.flink.table.catalog.exceptions.CatalogException;
28+
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Optional;
34+
35+
/** A {@link FlinkCatalog} used for Flink 2.1. */
36+
public class Flink21Catalog extends FlinkCatalog {
37+
38+
public Flink21Catalog(
39+
String name,
40+
String defaultDatabase,
41+
String bootstrapServers,
42+
ClassLoader classLoader,
43+
Map<String, String> securityConfigs) {
44+
super(name, defaultDatabase, bootstrapServers, classLoader, securityConfigs);
45+
}
46+
47+
@Override
48+
public CatalogBaseTable getTable(ObjectPath objectPath)
49+
throws TableNotExistException, CatalogException {
50+
CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
51+
if (!(catalogBaseTable instanceof CatalogTable)) {
52+
return catalogBaseTable;
53+
}
54+
55+
CatalogTable table = (CatalogTable) catalogBaseTable;
56+
Optional<Schema.UnresolvedPrimaryKey> pkOp = table.getUnresolvedSchema().getPrimaryKey();
57+
// If there is no pk, return directly.
58+
if (pkOp.isEmpty()) {
59+
return table;
60+
}
61+
62+
Schema.Builder newSchemaBuilder =
63+
Schema.newBuilder().fromSchema(table.getUnresolvedSchema());
64+
// Pk is always an index.
65+
newSchemaBuilder.index(pkOp.get().getColumnNames());
66+
67+
// Judge whether we can do prefix lookup.
68+
TableInfo tableInfo = connection.getTable(toTablePath(objectPath)).getTableInfo();
69+
List<String> bucketKeys = tableInfo.getBucketKeys();
70+
// For partition table, the physical primary key is the primary key that excludes the
71+
// partition key
72+
List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
73+
List<String> indexKeys = new ArrayList<>();
74+
if (isPrefixList(physicalPrimaryKeys, bucketKeys)) {
75+
indexKeys.addAll(bucketKeys);
76+
if (tableInfo.isPartitioned()) {
77+
indexKeys.addAll(tableInfo.getPartitionKeys());
78+
}
79+
}
80+
81+
if (!indexKeys.isEmpty()) {
82+
newSchemaBuilder.index(indexKeys);
83+
}
84+
return CatalogTable.newBuilder()
85+
.schema(newSchemaBuilder.build())
86+
.comment(table.getComment())
87+
.partitionKeys(table.getPartitionKeys())
88+
.options(table.getOptions())
89+
.snapshot(table.getSnapshot().orElse(null))
90+
.distribution(table.getDistribution().orElse(null))
91+
.build();
92+
}
93+
94+
private static boolean isPrefixList(List<String> fullList, List<String> prefixList) {
95+
if (fullList.size() <= prefixList.size()) {
96+
return false;
97+
}
98+
99+
for (int i = 0; i < prefixList.size(); i++) {
100+
if (!fullList.get(i).equals(prefixList.get(i))) {
101+
return false;
102+
}
103+
}
104+
return true;
105+
}
106+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.catalog;
20+
21+
/** A {@link FlinkCatalogFactory} used for Flink 2.1. */
22+
public class Flink21CatalogFactory extends FlinkCatalogFactory {
23+
24+
@Override
25+
public FlinkCatalog createCatalog(Context context) {
26+
FlinkCatalog catalog = super.createCatalog(context);
27+
return new Flink21Catalog(
28+
catalog.catalogName,
29+
catalog.defaultDatabase,
30+
catalog.bootstrapServers,
31+
catalog.classLoader,
32+
catalog.securityConfigs);
33+
}
34+
}

fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
# limitations under the License.
1717
#
1818

19-
org.apache.fluss.flink.catalog.FlinkCatalogFactory
19+
org.apache.fluss.flink.catalog.Flink21CatalogFactory

fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java

Lines changed: 173 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,177 @@
1717

1818
package org.apache.fluss.flink.catalog;
1919

20+
import org.apache.flink.table.api.DataTypes;
21+
import org.apache.flink.table.api.Schema;
22+
import org.apache.flink.table.catalog.CatalogTable;
23+
import org.apache.flink.table.catalog.ObjectPath;
24+
import org.junit.jupiter.api.BeforeAll;
25+
import org.junit.jupiter.api.Test;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
2029
/** IT case for catalog in Flink 2.1. */
21-
public class Flink21CatalogITCase extends FlinkCatalogITCase {}
30+
public class Flink21CatalogITCase extends FlinkCatalogITCase {
31+
32+
@BeforeAll
33+
static void beforeAll() {
34+
FlinkCatalogITCase.beforeAll();
35+
36+
// close the old one and open a new one later
37+
catalog.close();
38+
39+
catalog =
40+
new Flink21Catalog(
41+
catalog.catalogName,
42+
catalog.defaultDatabase,
43+
catalog.bootstrapServers,
44+
catalog.classLoader,
45+
catalog.securityConfigs);
46+
catalog.open();
47+
}
48+
49+
@Test
50+
void testGetTableWithIndex() throws Exception {
51+
String tableName = "table_with_pk_only";
52+
tEnv.executeSql(
53+
String.format(
54+
"create table %s ( "
55+
+ " a int, "
56+
+ " b varchar, "
57+
+ " c bigint, "
58+
+ " primary key (a, b) NOT ENFORCED"
59+
+ ") with ( "
60+
+ " 'connector' = 'fluss' "
61+
+ ")",
62+
tableName));
63+
CatalogTable table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName));
64+
Schema expectedSchema =
65+
Schema.newBuilder()
66+
.column("a", DataTypes.INT().notNull())
67+
.column("b", DataTypes.STRING().notNull())
68+
.column("c", DataTypes.BIGINT())
69+
.primaryKey("a", "b")
70+
.index("a", "b")
71+
.build();
72+
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
73+
74+
tableName = "table_with_prefix_bucket_key";
75+
tEnv.executeSql(
76+
String.format(
77+
"create table %s ( "
78+
+ " a int, "
79+
+ " b varchar, "
80+
+ " c bigint, "
81+
+ " primary key (a, b) NOT ENFORCED"
82+
+ ") with ( "
83+
+ " 'connector' = 'fluss', "
84+
+ " 'bucket.key' = 'a'"
85+
+ ")",
86+
tableName));
87+
88+
table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName));
89+
expectedSchema =
90+
Schema.newBuilder()
91+
.column("a", DataTypes.INT().notNull())
92+
.column("b", DataTypes.STRING().notNull())
93+
.column("c", DataTypes.BIGINT())
94+
.primaryKey("a", "b")
95+
.index("a", "b")
96+
.index("a")
97+
.build();
98+
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
99+
100+
tableName = "table_with_bucket_key_is_not_prefix_pk";
101+
tEnv.executeSql(
102+
String.format(
103+
"create table %s ( "
104+
+ " a int, "
105+
+ " b varchar, "
106+
+ " c bigint, "
107+
+ " primary key (a, b) NOT ENFORCED"
108+
+ ") with ( "
109+
+ " 'connector' = 'fluss', "
110+
+ " 'bucket.key' = 'b'"
111+
+ ")",
112+
tableName));
113+
114+
table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName));
115+
expectedSchema =
116+
Schema.newBuilder()
117+
.column("a", DataTypes.INT().notNull())
118+
.column("b", DataTypes.STRING().notNull())
119+
.column("c", DataTypes.BIGINT())
120+
.primaryKey("a", "b")
121+
.index("a", "b")
122+
.build();
123+
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
124+
125+
tableName = "table_with_partition_1";
126+
tEnv.executeSql(
127+
String.format(
128+
"create table %s ( "
129+
+ " a int, "
130+
+ " b varchar, "
131+
+ " c bigint, "
132+
+ " dt varchar, "
133+
+ " primary key (a, b, dt) NOT ENFORCED "
134+
+ ") "
135+
+ " partitioned by (dt) "
136+
+ " with ( "
137+
+ " 'connector' = 'fluss', "
138+
+ " 'bucket.key' = 'a'"
139+
+ ")",
140+
tableName));
141+
142+
table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName));
143+
expectedSchema =
144+
Schema.newBuilder()
145+
.column("a", DataTypes.INT().notNull())
146+
.column("b", DataTypes.STRING().notNull())
147+
.column("c", DataTypes.BIGINT())
148+
.column("dt", DataTypes.STRING().notNull())
149+
.primaryKey("a", "b", "dt")
150+
.index("a", "b", "dt")
151+
.index("a", "dt")
152+
.build();
153+
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
154+
155+
tableName = "table_with_partition_2";
156+
tEnv.executeSql(
157+
String.format(
158+
"create table %s ( "
159+
+ " a int, "
160+
+ " b varchar, "
161+
+ " c bigint, "
162+
+ " dt varchar, "
163+
+ " primary key (dt, a, b) NOT ENFORCED "
164+
+ ") "
165+
+ " partitioned by (dt) "
166+
+ " with ( "
167+
+ " 'connector' = 'fluss', "
168+
+ " 'bucket.key' = 'a'"
169+
+ ")",
170+
tableName));
171+
172+
table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, tableName));
173+
expectedSchema =
174+
Schema.newBuilder()
175+
.column("a", DataTypes.INT().notNull())
176+
.column("b", DataTypes.STRING().notNull())
177+
.column("c", DataTypes.BIGINT())
178+
.column("dt", DataTypes.STRING().notNull())
179+
.primaryKey("dt", "a", "b")
180+
.index("dt", "a", "b")
181+
.index("a", "dt")
182+
.build();
183+
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
184+
}
185+
186+
@Override
187+
protected void addDefaultIndexKey(Schema.Builder schemaBuilder) {
188+
super.addDefaultIndexKey(schemaBuilder);
189+
190+
Schema currentSchema = schemaBuilder.build();
191+
currentSchema.getPrimaryKey().ifPresent(pk -> schemaBuilder.index(pk.getColumnNames()));
192+
}
193+
}

0 commit comments

Comments
 (0)