diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh index 49a3ed57e9..18587dc292 100755 --- a/.github/workflows/stage.sh +++ b/.github/workflows/stage.sh @@ -24,7 +24,7 @@ STAGE_LAKE="lake" MODULES_FLINK="\ fluss-flink,\ fluss-flink/fluss-flink-common,\ -fluss-flink/fluss-flink-2.1,\ +fluss-flink/fluss-flink-2.2,\ fluss-flink/fluss-flink-1.20,\ " diff --git a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java new file mode 100644 index 0000000000..575bfdfbd7 --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import java.io.IOException; + +/** + * Flink sink adapter which hide the different version of createWriter method. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public abstract class SinkAdapter implements Sink { + + @Override + public SinkWriter createWriter(InitContext initContext) throws IOException { + return createWriter(initContext.getMailboxExecutor(), initContext.metricGroup()); + } + + protected abstract SinkWriter createWriter( + MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup); +} diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java deleted file mode 100644 index 8cab1cb533..0000000000 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.connector.sink2; - -import org.apache.flink.api.common.operators.MailboxExecutor; -import org.apache.flink.metrics.groups.SinkWriterMetricGroup; - -import java.io.IOException; -import java.io.Serializable; - -/** - * Placeholder class to resolve compatibility issues. This placeholder class can be removed once we - * drop the support of Flink v1.18 which requires the legacy InitContext interface. - */ -public interface Sink extends Serializable { - - /** - * Creates a {@link SinkWriter}. - * - * @param context the runtime context. - * @return A sink writer. - * @throws IOException for any failure during creation. - */ - SinkWriter createWriter(WriterInitContext context) throws IOException; - - /** The interface exposes some runtime info for creating a {@link SinkWriter}. */ - interface InitContext { - - /** - * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task - * thread in between record processing. - * - *

Note that this method should not be used per-record for performance reasons in the - * same way as records should not be sent to the external system individually. Rather, - * implementers are expected to batch records and only enqueue a single {@link Runnable} per - * batch to handle the result. - */ - MailboxExecutor getMailboxExecutor(); - - /** - * @return The metric group this writer belongs to. - */ - SinkWriterMetricGroup metricGroup(); - } -} diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java deleted file mode 100644 index d14e3af6fb..0000000000 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.catalog; - -import org.apache.fluss.flink.lake.LakeFlinkCatalog; -import org.apache.fluss.metadata.TableInfo; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Supplier; - -/** A {@link FlinkCatalog} used for Flink 2.1. */ -public class Flink21Catalog extends FlinkCatalog { - - public Flink21Catalog( - String name, - String defaultDatabase, - String bootstrapServers, - ClassLoader classLoader, - Map securityConfigs, - Supplier> lakeCatalogPropertiesSupplier) { - super( - name, - defaultDatabase, - bootstrapServers, - classLoader, - securityConfigs, - lakeCatalogPropertiesSupplier); - } - - @VisibleForTesting - public Flink21Catalog( - String name, - String defaultDatabase, - String bootstrapServers, - ClassLoader classLoader, - Map securityConfigs, - Supplier> lakeCatalogPropertiesSupplier, - LakeFlinkCatalog lakeFlinkCatalog) { - super( - name, - defaultDatabase, - bootstrapServers, - classLoader, - securityConfigs, - lakeCatalogPropertiesSupplier, - lakeFlinkCatalog); - } - - @Override - public CatalogBaseTable getTable(ObjectPath objectPath) - throws TableNotExistException, CatalogException { - CatalogBaseTable catalogBaseTable = super.getTable(objectPath); - if (!(catalogBaseTable instanceof CatalogTable) - || objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) { - return catalogBaseTable; - } - - CatalogTable table = (CatalogTable) catalogBaseTable; - Optional pkOp = table.getUnresolvedSchema().getPrimaryKey(); - // If there is no pk, return directly. - if (pkOp.isEmpty()) { - return table; - } - - Schema.Builder newSchemaBuilder = - Schema.newBuilder().fromSchema(table.getUnresolvedSchema()); - // Pk is always an index. - newSchemaBuilder.index(pkOp.get().getColumnNames()); - - // Judge whether we can do prefix lookup. - TableInfo tableInfo = connection.getTable(toTablePath(objectPath)).getTableInfo(); - List bucketKeys = tableInfo.getBucketKeys(); - // For partition table, the physical primary key is the primary key that excludes the - // partition key - List physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys(); - List indexKeys = new ArrayList<>(); - if (isPrefixList(physicalPrimaryKeys, bucketKeys)) { - indexKeys.addAll(bucketKeys); - if (tableInfo.isPartitioned()) { - indexKeys.addAll(tableInfo.getPartitionKeys()); - } - } - - if (!indexKeys.isEmpty()) { - newSchemaBuilder.index(indexKeys); - } - return CatalogTable.newBuilder() - .schema(newSchemaBuilder.build()) - .comment(table.getComment()) - .partitionKeys(table.getPartitionKeys()) - .options(table.getOptions()) - .snapshot(table.getSnapshot().orElse(null)) - .distribution(table.getDistribution().orElse(null)) - .build(); - } - - private static boolean isPrefixList(List fullList, List prefixList) { - if (fullList.size() <= prefixList.size()) { - return false; - } - - for (int i = 0; i < prefixList.size(); i++) { - if (!fullList.get(i).equals(prefixList.get(i))) { - return false; - } - } - return true; - } -} diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java deleted file mode 100644 index cff44ab866..0000000000 --- a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.catalog; - -/** A {@link FlinkCatalogFactory} used for Flink 2.1. */ -public class Flink21CatalogFactory extends FlinkCatalogFactory { - - @Override - public FlinkCatalog createCatalog(Context context) { - FlinkCatalog catalog = super.createCatalog(context); - return new Flink21Catalog( - catalog.catalogName, - catalog.defaultDatabase, - catalog.bootstrapServers, - catalog.classLoader, - catalog.securityConfigs, - catalog.lakeCatalogPropertiesSupplier); - } -} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java deleted file mode 100644 index 9b1e908daf..0000000000 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.source; - -import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.row.InternalRow; - -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.List; - -import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; -import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; -import static org.apache.fluss.testutils.DataTestUtils.row; -import static org.assertj.core.api.Assertions.assertThat; - -/** IT case for {@link FlinkTableSource} in Flink 2.1. */ -public class Flink21TableSourceITCase extends FlinkTableSourceITCase { - - @Test - void testDeltaJoin() throws Exception { - // start two jobs for this test: one for DML involving the delta join, and the other for DQL - // to query the results of the sink table - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); - - String leftTableName = "left_table"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " b1 varchar, " - + " c1 bigint, " - + " d1 int, " - + " e1 bigint, " - + " primary key (c1, d1) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c1', " - // currently, delta join only support append-only source - + " 'table.merge-engine' = 'first_row' " - + ")", - leftTableName)); - List rows1 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v2", 200L, 2, 20000L), - row(3, "v1", 300L, 3, 30000L), - row(4, "v4", 400L, 4, 40000L)); - // write records - TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); - writeRows(conn, leftTablePath, rows1, false); - - String rightTableName = "right_table"; - tEnv.executeSql( - String.format( - "create table %s (" - + " a2 int, " - + " b2 varchar, " - + " c2 bigint, " - + " d2 int, " - + " e2 bigint, " - + " primary key (c2, d2) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss', " - + " 'bucket.key' = 'c2', " - // currently, delta join only support append-only source - + " 'table.merge-engine' = 'first_row' " - + ")", - rightTableName)); - List rows2 = - Arrays.asList( - row(1, "v1", 100L, 1, 10000L), - row(2, "v3", 200L, 2, 20000L), - row(3, "v4", 300L, 4, 30000L), - row(4, "v4", 500L, 4, 50000L)); - // write records - TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); - writeRows(conn, rightTablePath, rows2, false); - - String sinkTableName = "sink_table"; - tEnv.executeSql( - String.format( - "create table %s ( " - + " a1 int, " - + " b1 varchar, " - + " c1 bigint, " - + " d1 int, " - + " e1 bigint, " - + " a2 int, " - + " b2 varchar, " - + " c2 bigint, " - + " d2 int, " - + " e2 bigint, " - + " primary key (c1, d1, c2, d2) NOT ENFORCED" - + ") with (" - + " 'connector' = 'fluss' " - + ")", - sinkTableName)); - - tEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, - OptimizerConfigOptions.DeltaJoinStrategy.FORCE); - - String sql = - String.format( - "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", - sinkTableName, leftTableName, rightTableName); - - assertThat(tEnv.explainSql(sql)) - .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); - - tEnv.executeSql(sql); - - CloseableIterator collected = - tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); - List expected = - Arrays.asList( - "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", - "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", - "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", - "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]"); - assertResultsIgnoreOrder(collected, expected, true); - } -} diff --git a/fluss-flink/fluss-flink-2.1/pom.xml b/fluss-flink/fluss-flink-2.2/pom.xml similarity index 93% rename from fluss-flink/fluss-flink-2.1/pom.xml rename to fluss-flink/fluss-flink-2.2/pom.xml index 5b5475f3df..c6527e2822 100644 --- a/fluss-flink/fluss-flink-2.1/pom.xml +++ b/fluss-flink/fluss-flink-2.2/pom.xml @@ -26,11 +26,11 @@ 0.9-SNAPSHOT - fluss-flink-2.1 - Fluss : Engine Flink : 2.1 + fluss-flink-2.2 + Fluss : Engine Flink : 2.2 - 2.1 - 2.1.1 + 2.2 + 2.2.0 @@ -68,6 +68,20 @@ provided + + org.apache.flink + flink-streaming-java + ${flink.minor.version} + provided + + + + org.apache.flink + flink-runtime + ${flink.minor.version} + provided + + org.apache.fluss diff --git a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java similarity index 100% rename from fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java rename to fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java diff --git a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java similarity index 53% rename from fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java rename to fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java index 6caa32d7c6..8f6492bd16 100644 --- a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java +++ b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java @@ -15,7 +15,29 @@ * limitations under the License. */ -package org.apache.flink.api.connector.sink2; +package org.apache.fluss.flink.adapter; -/** Placeholder class to resolve compatibility issues. */ -public interface WriterInitContext extends Sink.InitContext {} +import org.apache.flink.table.api.Schema; + +import java.util.List; + +/** + * An adapter for the schema with Index. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public class SchemaAdapter { + private SchemaAdapter() {} + + public static Schema withIndex(Schema unresolvedSchema, List> indexes) { + Schema.Builder newSchemaBuilder = Schema.newBuilder().fromSchema(unresolvedSchema); + for (List index : indexes) { + newSchemaBuilder.index(index); + } + return newSchemaBuilder.build(); + } + + public static boolean supportIndex() { + return true; + } +} diff --git a/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java new file mode 100644 index 0000000000..8d6e512501 --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import java.io.IOException; + +/** + * Flink sink adapter which hide the different version of createWriter method. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public abstract class SinkAdapter implements Sink { + + @Override + public SinkWriter createWriter(WriterInitContext writerInitContext) throws IOException { + return createWriter( + writerInitContext.getMailboxExecutor(), writerInitContext.metricGroup()); + } + + protected abstract SinkWriter createWriter( + MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup); +} diff --git a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 93% rename from fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory rename to fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index f13f71331e..d5aca2d53b 100644 --- a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,4 +16,4 @@ # limitations under the License. # -org.apache.fluss.flink.catalog.Flink21CatalogFactory \ No newline at end of file +org.apache.fluss.flink.catalog.FlinkCatalogFactory diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java similarity index 87% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java index f3922bcbda..4d06fe901f 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.adapter; -/** Test for {@link MultipleParameterToolAdapter} in flink 2.1. */ -public class Flink21MultipleParameterToolTest extends FlinkMultipleParameterToolTest {} +/** Test for {@link MultipleParameterToolAdapter} in flink 2.2. */ +public class Flink22MultipleParameterToolTest extends FlinkMultipleParameterToolTest {} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..f868a84db6 --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; + +/** + * Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in + * flink 2.2. However, this constructor only used in test. + * + *

TODO: remove it until ... is + * fixed. + */ +public class ResolvedCatalogMaterializedTableAdapter { + + public static ResolvedCatalogMaterializedTable create( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness freshness) { + return new ResolvedCatalogMaterializedTable(origin, resolvedSchema, refreshMode, freshness); + } +} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java similarity index 91% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java index 62bf5b9aa0..b87965727a 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java @@ -21,31 +21,12 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -/** IT case for catalog in Flink 2.1. */ -public class Flink21CatalogITCase extends FlinkCatalogITCase { - - @BeforeAll - static void beforeAll() { - FlinkCatalogITCase.beforeAll(); - - // close the old one and open a new one later - catalog.close(); - - catalog = - new Flink21Catalog( - catalog.catalogName, - catalog.defaultDatabase, - catalog.bootstrapServers, - catalog.classLoader, - catalog.securityConfigs, - catalog.lakeCatalogPropertiesSupplier); - catalog.open(); - } +/** IT case for catalog in Flink 2.2. */ +public class Flink22CatalogITCase extends FlinkCatalogITCase { @Test void testGetTableWithIndex() throws Exception { diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java similarity index 73% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java index d6aa6ef564..bff2b77d60 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java @@ -17,8 +17,6 @@ package org.apache.fluss.flink.catalog; -import org.apache.fluss.flink.lake.LakeFlinkCatalog; - import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; @@ -28,24 +26,8 @@ import java.util.Arrays; import java.util.Collections; -/** Test for {@link Flink21Catalog}. */ -public class FlinkCatalog21Test extends FlinkCatalogTest { - - @Override - protected FlinkCatalog initCatalog( - String catalogName, - String databaseName, - String bootstrapServers, - LakeFlinkCatalog lakeFlinkCatalog) { - return new Flink21Catalog( - catalogName, - databaseName, - bootstrapServers, - Thread.currentThread().getContextClassLoader(), - Collections.emptyMap(), - Collections::emptyMap, - lakeFlinkCatalog); - } +/** Test for {@link FlinkCatalog}. */ +public class Flink22CatalogTest extends FlinkCatalogTest { protected ResolvedSchema createSchema() { return new ResolvedSchema( diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java similarity index 88% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java index ee30419960..36240466c4 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.catalog; -/** IT case for materialized table in Flink 2.1. */ -public class Flink21MaterializedTableITCase extends MaterializedTableITCase {} +/** IT case for materialized table in Flink 2.2. */ +public class Flink22MaterializedTableITCase extends MaterializedTableITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java similarity index 88% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java index 961c69d2fb..37d795e0bd 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.metrics; -/** IT case for metrics in Flink 2.1. */ -public class Flink21MetricsITCase extends FlinkMetricsITCase {} +/** IT case for metrics in Flink 2.2. */ +public class Flink22MetricsITCase extends FlinkMetricsITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java similarity index 88% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java index c95c47f0ff..84612dc94b 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.procedure; -/** IT case for procedure in Flink 2.1. */ -public class Flink21ProcedureITCase extends FlinkProcedureITCase {} +/** IT case for procedure in Flink 2.2. */ +public class Flink22ProcedureITCase extends FlinkProcedureITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java similarity index 88% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java index 66aefab8e0..192be6b902 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.security.acl; -/** IT case for authorization in Flink 2.1. */ -public class Flink21AuthorizationITCase extends FlinkAuthorizationITCase {} +/** IT case for authorization in Flink 2.2. */ +public class Flink22AuthorizationITCase extends FlinkAuthorizationITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java similarity index 87% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java index 7b2ed44b2b..cbb1f9a966 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.sink; -/** Integration tests for Array type support in Flink 2.1. */ -public class Flink21ComplexTypeITCase extends FlinkComplexTypeITCase {} +/** Integration tests for Array type support in Flink 2.2. */ +public class Flink22ComplexTypeITCase extends FlinkComplexTypeITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java similarity index 87% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java index b040476f4d..9aa830096a 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.sink; -/** IT case for {@link FlinkTableSink} in Flink 2.1. */ -public class Flink21TableSinkITCase extends FlinkTableSinkITCase {} +/** IT case for {@link FlinkTableSink} in Flink 2.2. */ +public class Flink22TableSinkITCase extends FlinkTableSinkITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java similarity index 88% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java index a881b6b31c..f65966ea6f 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.source; -/** IT case for batch source in Flink 2.1. */ -public class Flink21TableSourceBatchITCase extends FlinkTableSourceBatchITCase {} +/** IT case for batch source in Flink 2.2. */ +public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase {} diff --git a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java similarity index 87% rename from fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java rename to fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java index 630470f288..d909bc5365 100644 --- a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java @@ -17,5 +17,5 @@ package org.apache.fluss.flink.source; -/** IT case for source failover and recovery in Flink 2.1. */ -public class Flink21TableSourceFailOverITCase extends FlinkTableSourceFailOverITCase {} +/** IT case for source failover and recovery in Flink 2.2. */ +public class Flink22TableSourceFailOverITCase extends FlinkTableSourceFailOverITCase {} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java new file mode 100644 index 0000000000..7253720481 --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for {@link FlinkTableSource} in Flink 2.2. */ +public class Flink22TableSourceITCase extends FlinkTableSourceITCase { + + @Test + void testDeltaJoin() throws Exception { + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + // currently, delta join only support append-only source + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L), + row(4, "v4", 400L, 4, 40000L)); + // write records + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + // currently, delta join only support append-only source + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 300L, 4, 30000L), + row(4, "v4", 500L, 4, 50000L)); + // write records + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c1, d1, c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = + Arrays.asList( + "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]", + "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", + "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]", + "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithProjectionAndFilter() throws Exception { + // start two jobs for this test: one for DML involving the delta join, and the other for DQL + // to query the results of the sink table + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_proj"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " b1 varchar, " + + " c1 bigint, " + + " d1 int, " + + " e1 bigint, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List rows1 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v2", 200L, 2, 20000L), + row(3, "v1", 300L, 3, 30000L)); + TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName); + writeRows(conn, leftTablePath, rows1, false); + + String rightTableName = "right_table_proj"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " b2 varchar, " + + " c2 bigint, " + + " d2 int, " + + " e2 bigint, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row' " + + ")", + rightTableName)); + List rows2 = + Arrays.asList( + row(1, "v1", 100L, 1, 10000L), + row(2, "v3", 200L, 2, 20000L), + row(3, "v4", 300L, 4, 30000L)); + TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName); + writeRows(conn, rightTablePath, rows2, false); + + String sinkTableName = "sink_table_proj"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " a2 int, " + + " primary key (c1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + // Test with projection and filter + String sql = + String.format( + "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN %s ON c1 = c2 AND d1 = d2 WHERE a1 > 1", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 2]", "+U[2, 200, 2]"); + assertResultsIgnoreOrder(collected, expected, true); + } + + @Test + void testDeltaJoinWithLookupCache() throws Exception { + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + + String leftTableName = "left_table_cache"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " c1 bigint, " + + " d1 int, " + + " primary key (c1, d1) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c1', " + + " 'table.merge-engine' = 'first_row' " + + ")", + leftTableName)); + List rows1 = Arrays.asList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false); + + String rightTableName = "right_table_cache"; + tEnv.executeSql( + String.format( + "create table %s (" + + " a2 int, " + + " c2 bigint, " + + " d2 int, " + + " primary key (c2, d2) NOT ENFORCED" + + ") with (" + + " 'connector' = 'fluss', " + + " 'bucket.key' = 'c2', " + + " 'table.merge-engine' = 'first_row', " + + " 'lookup.cache' = 'partial', " + + " 'lookup.partial-cache.max-rows' = '100' " + + ")", + rightTableName)); + List rows2 = Arrays.asList(row(1, 100L, 1)); + writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, false); + + String sinkTableName = "sink_table_cache"; + tEnv.executeSql( + String.format( + "create table %s ( " + + " a1 int, " + + " a2 int, " + + " primary key (a1) NOT ENFORCED" // Dummy PK + + ") with (" + + " 'connector' = 'fluss' " + + ")", + sinkTableName)); + + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.FORCE); + + String sql = + String.format( + "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2", + sinkTableName, leftTableName, rightTableName); + + assertThat(tEnv.explainSql(sql)) + .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) AND (d1 = d2))]"); + + tEnv.executeSql(sql); + + CloseableIterator collected = + tEnv.executeSql(String.format("select * from %s", sinkTableName)).collect(); + List expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 1]"); + assertResultsIgnoreOrder(collected, expected, true); + } +} diff --git a/fluss-flink/fluss-flink-2.1/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/fluss-flink/fluss-flink-2.2/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension similarity index 100% rename from fluss-flink/fluss-flink-2.1/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension rename to fluss-flink/fluss-flink-2.2/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension diff --git a/fluss-flink/fluss-flink-2.1/src/test/resources/log4j2-test.properties b/fluss-flink/fluss-flink-2.2/src/test/resources/log4j2-test.properties similarity index 100% rename from fluss-flink/fluss-flink-2.1/src/test/resources/log4j2-test.properties rename to fluss-flink/fluss-flink-2.2/src/test/resources/log4j2-test.properties diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java new file mode 100644 index 0000000000..ad48cc99e7 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.api.Schema; + +import java.util.List; + +/** + * An adapter for the schema with Index. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public class SchemaAdapter { + private SchemaAdapter() {} + + public static Schema withIndex(Schema unresolvedSchema, List> indexes) { + Schema.Builder newSchemaBuilder = Schema.newBuilder().fromSchema(unresolvedSchema); + if (!indexes.isEmpty()) { + throw new UnsupportedOperationException("Index is not supported."); + } + return newSchemaBuilder.build(); + } + + public static boolean supportIndex() { + return false; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java index 72a4540eab..d5a4ff0aa5 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java @@ -29,6 +29,8 @@ /** * Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it until not supported in * flink 1.18. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. */ public abstract class SingleThreadMultiplexSourceReaderBaseAdapter< E, T, SplitT extends SourceSplit, SplitStateT> diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java new file mode 100644 index 0000000000..5f7e9e76c3 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import java.io.IOException; + +/** + * Flink sink adapter which hide the different version of createWriter method. + * + *

TODO: remove this class when no longer support all the Flink 1.x series. + */ +public abstract class SinkAdapter implements Sink { + + @Override + public SinkWriter createWriter(InitContext initContext) throws IOException { + return createWriter(initContext.getMailboxExecutor(), initContext.metricGroup()); + } + + @Override + public SinkWriter createWriter(WriterInitContext writerInitContext) throws IOException { + return createWriter( + writerInitContext.getMailboxExecutor(), writerInitContext.metricGroup()); + } + + protected abstract SinkWriter createWriter( + MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup); +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index fb7a41157c..37e9f7c5e8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -38,6 +38,7 @@ import org.apache.fluss.utils.IOUtils; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; @@ -86,6 +87,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.FlinkConnectorOptions.ALTER_DISALLOW_OPTIONS; +import static org.apache.fluss.flink.adapter.SchemaAdapter.supportIndex; +import static org.apache.fluss.flink.adapter.SchemaAdapter.withIndex; import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionAlreadyExists; import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionInvalid; import static org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionNotExist; @@ -344,7 +347,11 @@ public CatalogBaseTable getTable(ObjectPath objectPath) } } if (CatalogBaseTable.TableKind.TABLE == catalogBaseTable.getTableKind()) { - return ((CatalogTable) catalogBaseTable).copy(newOptions); + CatalogTable table = ((CatalogTable) catalogBaseTable).copy(newOptions); + if (supportIndex()) { + table = wrapWithIndexes(table, tableInfo); + } + return table; } else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == catalogBaseTable.getTableKind()) { return ((CatalogMaterializedTable) catalogBaseTable).copy(newOptions); @@ -806,4 +813,55 @@ public Map getLakeCatalogProperties() { } return lakeCatalogProperties; } + + private CatalogTable wrapWithIndexes(CatalogTable table, TableInfo tableInfo) { + + Optional pkOp = table.getUnresolvedSchema().getPrimaryKey(); + // If there is no pk, return directly. + if (!pkOp.isPresent()) { + return table; + } + + List> indexes = new ArrayList<>(); + // Pk is always an index. + indexes.add(pkOp.get().getColumnNames()); + + // Judge whether we can do prefix lookup. + List bucketKeys = tableInfo.getBucketKeys(); + // For partition table, the physical primary key is the primary key that excludes the + // partition key + List physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys(); + List indexKeys = new ArrayList<>(); + if (isPrefixList(physicalPrimaryKeys, bucketKeys)) { + indexKeys.addAll(bucketKeys); + if (tableInfo.isPartitioned()) { + indexKeys.addAll(tableInfo.getPartitionKeys()); + } + } + + if (!indexKeys.isEmpty()) { + indexes.add(indexKeys); + } + return CatalogTable.newBuilder() + .schema(withIndex(table.getUnresolvedSchema(), indexes)) + .comment(table.getComment()) + .partitionKeys(table.getPartitionKeys()) + .options(table.getOptions()) + .snapshot(table.getSnapshot().orElse(null)) + .distribution(table.getDistribution().orElse(null)) + .build(); + } + + private static boolean isPrefixList(List fullList, List prefixList) { + if (fullList.size() <= prefixList.size()) { + return false; + } + + for (int i = 0; i < prefixList.size(); i++) { + if (!fullList.get(i).equals(prefixList.get(i))) { + return false; + } + } + return true; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java index a35d481176..fcdf9ab42e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java @@ -19,6 +19,7 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.adapter.SinkAdapter; import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema; import org.apache.fluss.flink.sink.writer.AppendSinkWriter; import org.apache.fluss.flink.sink.writer.FlinkSinkWriter; @@ -27,9 +28,8 @@ import org.apache.fluss.metadata.TablePath; import org.apache.flink.api.common.operators.MailboxExecutor; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; import org.apache.flink.streaming.api.datastream.DataStream; @@ -37,7 +37,6 @@ import javax.annotation.Nullable; -import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -45,7 +44,7 @@ import static org.apache.fluss.flink.utils.FlinkConversions.toFlussRowType; /** Flink sink for Fluss. */ -class FlinkSink implements Sink, SupportsPreWriteTopology { +class FlinkSink extends SinkAdapter implements SupportsPreWriteTopology { private static final long serialVersionUID = 1L; @@ -55,20 +54,11 @@ class FlinkSink implements Sink, SupportsPreWriteTopology createWriter(InitContext context) throws IOException { - FlinkSinkWriter flinkSinkWriter = - builder.createWriter(context.getMailboxExecutor()); - flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup())); - return flinkSinkWriter; - } - - @Override - public SinkWriter createWriter(WriterInitContext context) throws IOException { - FlinkSinkWriter flinkSinkWriter = - builder.createWriter(context.getMailboxExecutor()); - flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup())); + protected SinkWriter createWriter( + MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup) { + FlinkSinkWriter flinkSinkWriter = builder.createWriter(mailboxExecutor); + flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(metricGroup)); return flinkSinkWriter; } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java new file mode 100644 index 0000000000..7a018bc027 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.adapter; + +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; + +/** + * Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in + * flink 2.2. However, this constructor only used in test. + * + *

TODO: remove it until ... is + * fixed. + */ +public class ResolvedCatalogMaterializedTableAdapter { + + public static ResolvedCatalogMaterializedTable create( + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + CatalogMaterializedTable.RefreshMode refreshMode, + IntervalFreshness freshness) { + return new ResolvedCatalogMaterializedTable(origin, resolvedSchema); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 9b915a74d5..6c65544c4a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.exception.InvalidPartitionException; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.flink.adapter.ResolvedCatalogMaterializedTableAdapter; import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.utils.FlinkConversionsTest; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -40,7 +41,6 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; @@ -158,7 +158,11 @@ private CatalogMaterializedTable newCatalogMaterializedTable( .refreshMode(refreshMode) .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) .build(); - return new ResolvedCatalogMaterializedTable(origin, resolvedSchema); + return ResolvedCatalogMaterializedTableAdapter.create( + origin, + resolvedSchema, + refreshMode, + IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND)); } protected FlinkCatalog initCatalog( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index 7f3894f965..f295fff5f6 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -281,11 +281,7 @@ void testCountPushDown(boolean partitionTable) throws Exception { String query = String.format("SELECT COUNT(*) FROM %s", tableName); assertThat(tEnv.explainSql(query)) .contains( - String.format( - "TableSourceScan(table=[[testcatalog, defaultdb, %s, project=[id], " - + "aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], " - + "fields=[count1$0])", - tableName)); + "aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]"); CloseableIterator iterRows = tEnv.executeSql(query).collect(); List collected = collectRowsWithTimeout(iterRows, 1); List expected = Collections.singletonList(String.format("+I[%s]", expectedRows)); @@ -401,7 +397,7 @@ private String prepareLogTable() throws Exception { return tableName; } - private String preparePartitionedLogTable() throws Exception { + protected String preparePartitionedLogTable() throws Exception { String tableName = String.format("test_partitioned_log_table_%s", RandomUtils.nextInt()); tEnv.executeSql( String.format( diff --git a/fluss-flink/pom.xml b/fluss-flink/pom.xml index 8bd4c11ff5..d759f53563 100644 --- a/fluss-flink/pom.xml +++ b/fluss-flink/pom.xml @@ -36,7 +36,7 @@ fluss-flink-1.20 fluss-flink-1.19 fluss-flink-1.18 - fluss-flink-2.1 + fluss-flink-2.2 fluss-flink-tiering diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 46c34da57a..de2800d5e2 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -70,11 +70,12 @@ org.apache.fluss - fluss-flink-2.1 + fluss-flink-2.2 ${project.version} compile + org.apache.fluss fluss-flink-1.20