Skip to content

Commit f94f8e2

Browse files
[flink] Support Flink 2.1 (#1176)
1. support flink 2.1 in module fluss-flink-2.1 2. skip compilation and test for flink 2.1 module when Java8 is activated 3. split CI matrix into core, flink and lake
1 parent ae7387e commit f94f8e2

File tree

29 files changed

+856
-36
lines changed

29 files changed

+856
-36
lines changed

.github/workflows/ci-template.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
strategy:
3737
fail-fast: false
3838
matrix:
39-
module: [ core, flink ]
39+
module: [ core, flink, lake ]
4040
name: "${{ matrix.module }}"
4141
steps:
4242
- name: Checkout code

.github/workflows/stage.sh

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,17 @@
1919

2020
STAGE_CORE="core"
2121
STAGE_FLINK="flink"
22+
STAGE_LAKE="lake"
2223

2324
MODULES_FLINK="\
2425
fluss-flink,\
2526
fluss-flink/fluss-flink-common,\
27+
fluss-flink/fluss-flink-2.1,\
2628
fluss-flink/fluss-flink-1.20,\
29+
"
30+
31+
# we move Flink legacy version tests to "lake" module for balancing testing time
32+
MODULES_LAKE="\
2733
fluss-flink/fluss-flink-1.19,\
2834
fluss-flink/fluss-flink-1.18,\
2935
fluss-lake,\
@@ -36,7 +42,10 @@ function get_test_modules_for_stage() {
3642
local stage=$1
3743

3844
local modules_flink=$MODULES_FLINK
39-
local modules_core=\!${MODULES_FLINK//,/,\!}
45+
local modules_lake=$MODULES_LAKE
46+
local negated_flink=\!${MODULES_FLINK//,/,\!}
47+
local negated_lake=\!${MODULES_LAKE//,/,\!}
48+
local modules_core="$negated_flink,$negated_lake"
4049

4150
case ${stage} in
4251
(${STAGE_CORE})
@@ -45,6 +54,9 @@ function get_test_modules_for_stage() {
4554
(${STAGE_FLINK})
4655
echo "-pl fluss-test-coverage,$modules_flink"
4756
;;
57+
(${STAGE_LAKE})
58+
echo "-pl fluss-test-coverage,$modules_lake"
59+
;;
4860
esac
4961
}
5062

fluss-flink/fluss-flink-1.18/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
<groupId>org.apache.flink</groupId>
106106
<artifactId>flink-connector-base</artifactId>
107107
<version>${flink.minor.version}</version>
108-
<scope>test</scope>
108+
<scope>provided</scope>
109109
</dependency>
110110

111111

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.flink.adapter;
19+
20+
import org.apache.flink.table.api.Schema;
21+
import org.apache.flink.table.catalog.CatalogTable;
22+
23+
import javax.annotation.Nullable;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* A adapter for {@link CatalogTable} constructor. TODO: remove this class when no longer support
30+
* flink 1.18 and 1.19.
31+
*/
32+
public class CatalogTableAdapter {
33+
public static CatalogTable toCatalogTable(
34+
Schema schema,
35+
@Nullable String comment,
36+
List<String> partitionKeys,
37+
Map<String, String> options) {
38+
return CatalogTable.of(schema, comment, partitionKeys, options);
39+
}
40+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.flink.adapter;
19+
20+
import org.apache.flink.api.connector.source.SourceSplit;
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
23+
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
24+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
25+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
26+
27+
import java.util.Collection;
28+
import java.util.function.Consumer;
29+
import java.util.function.Supplier;
30+
31+
/**
32+
* Adapter for {@link SingleThreadFetcherManager}.TODO: remove it until not supported in flink 1.18.
33+
*/
34+
public class SingleThreadFetcherManagerAdapter<E, SplitT extends SourceSplit>
35+
extends SingleThreadFetcherManager<E, SplitT> {
36+
public SingleThreadFetcherManagerAdapter(
37+
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
38+
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
39+
Configuration configuration,
40+
Consumer<Collection<String>> splitFinishedHook) {
41+
super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.flink.adapter;
19+
20+
import org.apache.flink.api.connector.source.SourceReaderContext;
21+
import org.apache.flink.api.connector.source.SourceSplit;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.connector.base.source.reader.RecordEmitter;
24+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
25+
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
26+
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
27+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
28+
29+
/**
30+
* Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it until not supported in
31+
* flink 1.18.
32+
*/
33+
public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
34+
E, T, SplitT extends SourceSplit, SplitStateT>
35+
extends SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT> {
36+
public SingleThreadMultiplexSourceReaderBaseAdapter(
37+
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
38+
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
39+
RecordEmitter<E, T, SplitStateT> recordEmitter,
40+
Configuration config,
41+
SourceReaderContext context) {
42+
super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
43+
}
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.flink.adapter;
19+
20+
import org.apache.flink.table.api.Schema;
21+
import org.apache.flink.table.catalog.CatalogTable;
22+
23+
import javax.annotation.Nullable;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* A adapter for {@link CatalogTable} constructor. TODO: remove this class when no longer support
30+
* flink 1.18 and 1.19.
31+
*/
32+
public class CatalogTableAdapter {
33+
public static CatalogTable toCatalogTable(
34+
Schema schema,
35+
@Nullable String comment,
36+
List<String> partitionKeys,
37+
Map<String, String> options) {
38+
return CatalogTable.of(schema, comment, partitionKeys, options);
39+
}
40+
}

0 commit comments

Comments
 (0)