Skip to content

Commit 9e505e5

Browse files
committed
hongshun's advice:
1. merge flink21Catalog and flink22Catalog to FlinkCatalog. 2. add flink-2.2 to ci tests of flink
1 parent b31ca56 commit 9e505e5

File tree

29 files changed

+518
-758
lines changed

29 files changed

+518
-758
lines changed

.github/workflows/stage.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ MODULES_FLINK="\
2525
fluss-flink,\
2626
fluss-flink/fluss-flink-common,\
2727
fluss-flink/fluss-flink-2.1,\
28+
fluss-flink/fluss-flink-2.2,\
2829
fluss-flink/fluss-flink-1.20,\
2930
"
3031

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java

Lines changed: 0 additions & 61 deletions
This file was deleted.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.api.common.operators.MailboxExecutor;
21+
import org.apache.flink.api.connector.sink2.Sink;
22+
import org.apache.flink.api.connector.sink2.SinkWriter;
23+
import org.apache.flink.api.connector.sink2.WriterInitContext;
24+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
25+
26+
import java.io.IOException;
27+
28+
/**
29+
* Flink sink adapter which hide the different version of createWriter method.
30+
*
31+
* <p>TODO: remove this class when no longer support all the Flink 1.x series.
32+
*/
33+
public abstract class FlinkSinkAdapter<InputT> implements Sink<InputT> {
34+
35+
@Override
36+
public SinkWriter<InputT> createWriter(WriterInitContext writerInitContext) throws IOException {
37+
return createWriter(
38+
writerInitContext.getMailboxExecutor(), writerInitContext.metricGroup());
39+
}
40+
41+
protected abstract SinkWriter<InputT> createWriter(
42+
MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup);
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.table.api.Schema;
21+
22+
import java.util.List;
23+
24+
/**
25+
* An adapter for the schema with Index.
26+
*
27+
* <p>TODO: remove this class when no longer support all the Flink 1.x series.
28+
*/
29+
public class SchemaAdapter {
30+
private SchemaAdapter() {}
31+
32+
public static Schema withIndex(Schema unresolvedSchema, List<List<String>> indexes) {
33+
Schema.Builder newSchemaBuilder = Schema.newBuilder().fromSchema(unresolvedSchema);
34+
for (List<String> index : indexes) {
35+
newSchemaBuilder.index(index);
36+
}
37+
return newSchemaBuilder.build();
38+
}
39+
40+
public static boolean supportIndex() {
41+
return true;
42+
}
43+
}

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java

Lines changed: 0 additions & 136 deletions
This file was deleted.

fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
# limitations under the License.
1717
#
1818

19-
org.apache.fluss.flink.catalog.Flink21CatalogFactory
19+
org.apache.fluss.flink.catalog.FlinkCatalogFactory

fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,32 +21,13 @@
2121
import org.apache.flink.table.api.Schema;
2222
import org.apache.flink.table.catalog.CatalogTable;
2323
import org.apache.flink.table.catalog.ObjectPath;
24-
import org.junit.jupiter.api.BeforeAll;
2524
import org.junit.jupiter.api.Test;
2625

2726
import static org.assertj.core.api.Assertions.assertThat;
2827

2928
/** IT case for catalog in Flink 2.1. */
3029
public class Flink21CatalogITCase extends FlinkCatalogITCase {
3130

32-
@BeforeAll
33-
static void beforeAll() {
34-
FlinkCatalogITCase.beforeAll();
35-
36-
// close the old one and open a new one later
37-
catalog.close();
38-
39-
catalog =
40-
new Flink21Catalog(
41-
catalog.catalogName,
42-
catalog.defaultDatabase,
43-
catalog.bootstrapServers,
44-
catalog.classLoader,
45-
catalog.securityConfigs,
46-
catalog.lakeCatalogPropertiesSupplier);
47-
catalog.open();
48-
}
49-
5031
@Test
5132
void testGetTableWithIndex() throws Exception {
5233
String tableName = "table_with_pk_only";

0 commit comments

Comments
 (0)