Skip to content

Commit b9b92d4

Browse files
authored
[flink] Fix not compatible with Flink 1.18 and 1.19 (java.lang.NoSuchFieldError: MATERIALIZED_TABLE) (apache#2083)
1 parent de2d9a0 commit b9b92d4

File tree

9 files changed

+134
-237
lines changed

9 files changed

+134
-237
lines changed

fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java

Lines changed: 0 additions & 115 deletions
This file was deleted.

fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.adapter;
1919

2020
import org.apache.flink.table.api.Schema;
21+
import org.apache.flink.table.catalog.CatalogBaseTable;
2122
import org.apache.flink.table.catalog.CatalogTable;
2223

2324
import javax.annotation.Nullable;
@@ -26,8 +27,8 @@
2627
import java.util.Map;
2728

2829
/**
29-
* A adapter for {@link CatalogTable} constructor. TODO: remove this class when no longer support
30-
* flink 1.18 and 1.19.
30+
* A adapter for {@link CatalogTable} constructor, and adapter MATERIALIZED_TABLE for {@link
31+
* CatalogBaseTable.TableKind} TODO: remove this class when no longer support flink 1.18 and 1.19.
3132
*/
3233
public class CatalogTableAdapter {
3334
public static CatalogTable toCatalogTable(
@@ -37,4 +38,9 @@ public static CatalogTable toCatalogTable(
3738
Map<String, String> options) {
3839
return CatalogTable.of(schema, comment, partitionKeys, options);
3940
}
41+
42+
public static boolean isMaterializedTable(CatalogBaseTable.TableKind tableKind) {
43+
// flink 1.18 not support MaterializedTable
44+
return false;
45+
}
4046
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.table.catalog.CatalogBaseTable;
21+
import org.junit.jupiter.api.Test;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
/** Test for {@link CatalogTableAdapter} in flink 1.18. */
26+
public class Flink118CatalogTableTest extends FlinkCatalogTableTest {
27+
28+
@Test
29+
public void testIsMaterializedTable() {
30+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
31+
.isEqualTo(false);
32+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
33+
.isEqualTo(false);
34+
}
35+
}

fluss-flink/fluss-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java

Lines changed: 0 additions & 115 deletions
This file was deleted.

fluss-flink/fluss-flink-1.19/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.adapter;
1919

2020
import org.apache.flink.table.api.Schema;
21+
import org.apache.flink.table.catalog.CatalogBaseTable;
2122
import org.apache.flink.table.catalog.CatalogTable;
2223

2324
import javax.annotation.Nullable;
@@ -26,8 +27,8 @@
2627
import java.util.Map;
2728

2829
/**
29-
* A adapter for {@link CatalogTable} constructor. TODO: remove this class when no longer support
30-
* flink 1.18 and 1.19.
30+
* A adapter for {@link CatalogTable} constructor, and adapter MATERIALIZED_TABLE for {@link
31+
* CatalogBaseTable.TableKind} TODO: remove this class when no longer support flink 1.18 and 1.19.
3132
*/
3233
public class CatalogTableAdapter {
3334
public static CatalogTable toCatalogTable(
@@ -37,4 +38,9 @@ public static CatalogTable toCatalogTable(
3738
Map<String, String> options) {
3839
return CatalogTable.of(schema, comment, partitionKeys, options);
3940
}
41+
42+
public static boolean isMaterializedTable(CatalogBaseTable.TableKind tableKind) {
43+
// flink 1.19 not support MaterializedTable
44+
return false;
45+
}
4046
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.table.catalog.CatalogBaseTable;
21+
import org.junit.jupiter.api.Test;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
/** Test for {@link CatalogTableAdapter} in flink 1.19. */
26+
public class Flink119CatalogTableTest extends FlinkCatalogTableTest {
27+
28+
@Test
29+
public void testIsMaterializedTable() {
30+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.VIEW))
31+
.isEqualTo(false);
32+
assertThat(CatalogTableAdapter.isMaterializedTable(CatalogBaseTable.TableKind.TABLE))
33+
.isEqualTo(false);
34+
}
35+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/CatalogTableAdapter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.adapter;
1919

2020
import org.apache.flink.table.api.Schema;
21+
import org.apache.flink.table.catalog.CatalogBaseTable;
2122
import org.apache.flink.table.catalog.CatalogTable;
2223

2324
import javax.annotation.Nullable;
@@ -26,8 +27,8 @@
2627
import java.util.Map;
2728

2829
/**
29-
* A adapter for {@link CatalogTable} constructor. TODO: remove this class when no longer support
30-
* flink 1.18 and 1.19.
30+
* A adapter for {@link CatalogTable} constructor, and adapter MATERIALIZED_TABLE for {@link
31+
* CatalogBaseTable.TableKind} TODO: remove this class when no longer support flink 1.18 and 1.19.
3132
*/
3233
public class CatalogTableAdapter {
3334
public static CatalogTable toCatalogTable(
@@ -42,4 +43,8 @@ public static CatalogTable toCatalogTable(
4243
.options(options)
4344
.build();
4445
}
46+
47+
public static boolean isMaterializedTable(CatalogBaseTable.TableKind tableKind) {
48+
return CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind;
49+
}
4550
}

0 commit comments

Comments
 (0)