diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index f5cdcd79..58782055 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,8 +28,8 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 1.20.3 ] - jdk: [ '8, 11, 17' ] + flink: [ 2.2.1 ] + jdk: [ '11, 17' ] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/flink-connector-pulsar/pom.xml b/flink-connector-pulsar/pom.xml index 9acbd1c3..b79e6d44 100644 --- a/flink-connector-pulsar/pom.xml +++ b/flink-connector-pulsar/pom.xml @@ -133,14 +133,21 @@ under the License. flink-table-test-utils ${flink.version} test + + + org.apache.flink + flink-table-planner-loader + + org.apache.flink - flink-table-api-scala-bridge_${scala.binary.version} + flink-table-planner_${scala.binary.version} ${flink.version} test + org.apache.flink flink-table-planner_${scala.binary.version} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformation.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformation.java index ef616f09..32a3a162 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformation.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformation.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.pulsar.common.schema; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -67,7 +67,7 @@ public boolean isKeyType() { } @Override - public TypeSerializer createSerializer(ExecutionConfig config) { + public TypeSerializer createSerializer(SerializerConfig config) { return new PulsarSchemaTypeSerializer<>(schema); } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java index ef5c9f04..f632531b 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java @@ -202,7 +202,7 @@ public TypeSerializer restoreSerializer() { @Override public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( - TypeSerializer newSerializer) { + TypeSerializerSnapshot oldSerializerSnapshot) { return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java index 559da3ae..5a2ba8fd 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java @@ -21,7 +21,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.connector.sink2.CommittingSinkWriter; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; @@ -81,7 +85,7 @@ * @param The input type of the sink. */ @PublicEvolving -public class PulsarSink implements TwoPhaseCommittingSink { +public class PulsarSink implements Sink, SupportsCommitter { private static final long serialVersionUID = 4416714587951282119L; private final SinkConfiguration sinkConfiguration; @@ -129,7 +133,7 @@ public static PulsarSinkBuilder builder() { @Internal @Override - public PrecommittingSinkWriter createWriter(InitContext initContext) + public CommittingSinkWriter createWriter(WriterInitContext initContext) throws PulsarClientException { return new PulsarWriter<>( sinkConfiguration, @@ -143,7 +147,7 @@ public PrecommittingSinkWriter createWriter(InitContext i @Internal @Override - public Committer createCommitter() { + public Committer createCommitter(CommitterInitContext context) { return new PulsarCommitter(sinkConfiguration); } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java index 3bc7a42e..ba35f0dd 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.pulsar.sink.config; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration; @@ -80,9 +80,9 @@ public DeliveryGuarantee getDeliveryGuarantee() { /** * Pulsar's transactions have a timeout mechanism for the uncommitted transaction. We use * transactions for making sure the message could be written only once. Since the checkpoint - * interval couldn't be acquired from {@link InitContext}, we have to expose this option. Make - * sure this value is greater than the checkpoint interval. Create a pulsar producer builder by - * using the given Configuration. + * interval couldn't be acquired from {@link WriterInitContext}, we have to expose this option. + * Make sure this value is greater than the checkpoint interval. Create a pulsar producer + * builder by using the given Configuration. */ public long getTransactionTimeoutMillis() { return transactionTimeoutMillis; diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java index 5c93ef3b..61bdee19 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java @@ -22,8 +22,9 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; -import org.apache.flink.api.connector.sink2.Sink.InitContext; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.api.connector.sink2.CommittingSinkWriter; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; @@ -67,7 +68,7 @@ * @param The type of the input elements. */ @Internal -public class PulsarWriter implements PrecommittingSinkWriter { +public class PulsarWriter implements CommittingSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class); private final PulsarSerializationSchema serializationSchema; @@ -101,7 +102,7 @@ public PulsarWriter( TopicRouter topicRouter, MessageDelayer messageDelayer, PulsarCrypto pulsarCrypto, - InitContext initContext) + WriterInitContext initContext) throws PulsarClientException { checkNotNull(sinkConfiguration); this.serializationSchema = checkNotNull(serializationSchema); @@ -139,7 +140,8 @@ public PulsarWriter( } @Override - public void write(IN element, Context context) throws IOException, InterruptedException { + public void write(IN element, SinkWriter.Context context) + throws IOException, InterruptedException { PulsarMessage message = serializationSchema.serialize(element, sinkContext); // Choose the right topic to send. @@ -185,7 +187,8 @@ private void throwSendingException(String topic, Throwable ex) { @SuppressWarnings({"rawtypes", "unchecked"}) private TypedMessageBuilder createMessageBuilder( - String topic, Context context, PulsarMessage message) throws PulsarClientException { + String topic, SinkWriter.Context context, PulsarMessage message) + throws PulsarClientException { Schema schema = message.getSchema(); TypedMessageBuilder builder = producerRegister.createMessageBuilder(topic, schema); diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java index 8c827db0..b1c5c967 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.ProcessingTimeService; -import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; import org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; @@ -38,11 +38,11 @@ public class PulsarSinkContextImpl implements PulsarSinkContext { private final MetadataListener metadataListener; public PulsarSinkContextImpl( - InitContext initContext, + WriterInitContext initContext, SinkConfiguration sinkConfiguration, MetadataListener metadataListener) { - this.parallelInstanceId = initContext.getSubtaskId(); - this.numberOfParallelSubtasks = initContext.getNumberOfParallelSubtasks(); + this.parallelInstanceId = initContext.getTaskInfo().getIndexOfThisSubtask(); + this.numberOfParallelSubtasks = initContext.getTaskInfo().getNumberOfParallelSubtasks(); this.processingTimeService = initContext.getProcessingTimeService(); this.enableSchemaEvolution = sinkConfiguration.isEnableSchemaEvolution(); this.metadataListener = metadataListener; diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index ac9695ad..2925f051 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.pulsar.source; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.configuration.ConfigOption; @@ -434,7 +434,7 @@ public PulsarSourceBuilder setDeserializationSchema( * only used for treating messages that was written into pulsar by {@link TypeInformation}. */ public PulsarSourceBuilder setDeserializationSchema( - TypeInformation information, ExecutionConfig config) { + TypeInformation information, SerializerConfig config) { return setDeserializationSchema(new PulsarTypeInformationWrapper<>(information, config)); } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceFetcherManager.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceFetcherManager.java index 2680ba70..6c9c3e0e 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceFetcherManager.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceFetcherManager.java @@ -20,12 +20,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; -import org.apache.flink.connector.base.source.reader.SourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; -import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; @@ -59,16 +56,13 @@ public class PulsarSourceFetcherManager /** * Creates a new SplitFetcherManager with multiple I/O threads. * - * @param elementsQueue The queue that is used to hand over data from the I/O thread (the - * fetchers) to the reader, which emits the records and book-keeps the state. This must be - * the same queue instance that is also passed to the {@link SourceReaderBase}. * @param splitReaderSupplier The factory for the split reader that connects to the source + * @param configuration The configuration for the fetcher manager */ public PulsarSourceFetcherManager( - FutureCompletingBlockingQueue>> elementsQueue, Supplier, PulsarPartitionSplit>> splitReaderSupplier, Configuration configuration) { - super(elementsQueue, splitReaderSupplier, configuration); + super(splitReaderSupplier, configuration); } /** @@ -86,7 +80,7 @@ public void addSplits(List splitsToAdd) { } } - // @Override // to keep compatible with Flink 1.17 + @Override public void removeSplits(List splitsToRemove) { // TODO empty - wait for FLINK-31748 to implement it. } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java index 014d3038..69f426de 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java @@ -22,10 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SourceReaderBase; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; -import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; import org.apache.flink.connector.pulsar.common.schema.BytesSchema; import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; @@ -84,14 +82,12 @@ public class PulsarSourceReader private ScheduledExecutorService cursorScheduler; private PulsarSourceReader( - FutureCompletingBlockingQueue>> elementsQueue, PulsarSourceFetcherManager fetcherManager, PulsarDeserializationSchema deserializationSchema, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, SourceReaderContext context) { super( - elementsQueue, fetcherManager, new PulsarRecordEmitter<>(deserializationSchema), sourceConfiguration, @@ -252,11 +248,6 @@ public static PulsarSourceReader create( SourceReaderContext readerContext) throws Exception { - // Create a message queue with the predefined source option. - int queueCapacity = sourceConfiguration.getMessageQueueCapacity(); - FutureCompletingBlockingQueue>> elementsQueue = - new FutureCompletingBlockingQueue<>(queueCapacity); - PulsarClient pulsarClient = createClient(sourceConfiguration); // Initialize the deserialization schema before creating the pulsar reader. @@ -287,10 +278,9 @@ public static PulsarSourceReader create( PulsarSourceFetcherManager fetcherManager = new PulsarSourceFetcherManager( - elementsQueue, splitReaderSupplier, readerContext.getConfiguration()); + splitReaderSupplier, readerContext.getConfiguration()); return new PulsarSourceReader<>( - elementsQueue, fetcherManager, deserializationSchema, sourceConfiguration, diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchema.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchema.java index 05a1f77a..0ba7b0f4 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchema.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchema.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.pulsar.source.reader.deserializer; import org.apache.flink.annotation.Internal; diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java index e47b804c..7e851ab0 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java @@ -70,7 +70,7 @@ default void open(PulsarInitializationContext context, SourceConfiguration confi /** An interface for providing extra schema initial context for users. */ @PublicEvolving - public interface PulsarInitializationContext extends InitializationContext { + interface PulsarInitializationContext extends InitializationContext { /** Return the internal client for extra dynamic features. */ PulsarClient getPulsarClient(); diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarTypeInformationWrapper.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarTypeInformationWrapper.java index 59f77d7d..392276df 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarTypeInformationWrapper.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarTypeInformationWrapper.java @@ -18,10 +18,9 @@ package org.apache.flink.connector.pulsar.source.reader.deserializer; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.util.Collector; @@ -29,8 +28,7 @@ /** * Wrap the flink TypeInformation into a {@code PulsarDeserializationSchema}. We would create a - * flink {@code TypeSerializer} by using given ExecutionConfig. This execution config could be - * {@link ExecutionEnvironment#getConfig()}. + * flink {@code TypeSerializer} by using given SerializerConfig. */ @Internal public class PulsarTypeInformationWrapper implements PulsarDeserializationSchema { @@ -47,7 +45,7 @@ public class PulsarTypeInformationWrapper implements PulsarDeserializationSch private final TypeInformation information; private final TypeSerializer serializer; - public PulsarTypeInformationWrapper(TypeInformation information, ExecutionConfig config) { + public PulsarTypeInformationWrapper(TypeInformation information, SerializerConfig config) { this.information = information; this.serializer = information.createSerializer(config); } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/TableDataTypeUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/TableDataTypeUtils.java new file mode 100644 index 00000000..d7957a92 --- /dev/null +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/TableDataTypeUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Utility methods for working with table {@link DataType}s. + * + *

This class contains methods that were removed from Flink's {@code DataTypeUtils} in Flink 2.x. + * They are copied here to maintain the connector's functionality. + */ +@Internal +public class TableDataTypeUtils { + + private TableDataTypeUtils() {} + + /** + * Removes a prefix from all field names in a row data type. + * + * @param dataType The row data type whose field names should be stripped. + * @param prefix The prefix to remove from each field name. + * @return A new data type with the prefix removed from field names. + */ + public static DataType stripRowPrefix(DataType dataType, String prefix) { + if (!dataType.getLogicalType().is(LogicalTypeRoot.ROW)) { + throw new IllegalArgumentException("Row data type expected."); + } + + final RowType rowType = (RowType) dataType.getLogicalType(); + final List newFieldNames = + rowType.getFieldNames().stream() + .map( + s -> { + if (s.startsWith(prefix)) { + return s.substring(prefix.length()); + } + return s; + }) + .collect(Collectors.toList()); + final LogicalType newRowType = renameRowFields(rowType, newFieldNames); + return new FieldsDataType( + newRowType, dataType.getConversionClass(), dataType.getChildren()); + } + + /** + * Renames the fields in a {@link RowType}. + * + * @param rowType The row type to rename fields in. + * @param newFieldNames The new field names to use. + * @return A new row type with the renamed fields. + */ + public static RowType renameRowFields(RowType rowType, List newFieldNames) { + if (rowType.getFieldCount() != newFieldNames.size()) { + throw new IllegalArgumentException( + "Row field count and new field name count must match."); + } + + final List newFields = + IntStream.range(0, rowType.getFieldCount()) + .mapToObj( + i -> { + RowType.RowField oldField = rowType.getFields().get(i); + return new RowType.RowField( + newFieldNames.get(i), + oldField.getType(), + oldField.getDescription().orElse(null)); + }) + .collect(Collectors.toList()); + + return new RowType(rowType.isNullable(), newFields); + } +} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/sink/PulsarTableSerializationSchemaFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/sink/PulsarTableSerializationSchemaFactory.java index 95c443bf..68fef3ee 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/sink/PulsarTableSerializationSchemaFactory.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/sink/PulsarTableSerializationSchemaFactory.java @@ -16,13 +16,13 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema; +import org.apache.flink.connector.pulsar.table.TableDataTypeUtils; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.utils.DataTypeUtils; import javax.annotation.Nullable; @@ -107,7 +107,8 @@ public PulsarSerializationSchema createPulsarSerializationSchema( } DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType); if (prefix != null) { - physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); + physicalFormatDataType = + TableDataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); } return format.createRuntimeEncoder(context, physicalFormatDataType); } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java index 32294b86..8040f8c4 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java @@ -21,13 +21,13 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.table.TableDataTypeUtils; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.utils.DataTypeUtils; import javax.annotation.Nullable; @@ -117,7 +117,8 @@ public PulsarTableDeserializationSchemaFactory( DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType); if (prefix != null) { - physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); + physicalFormatDataType = + TableDataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); } return format.createRuntimeDecoder(context, physicalFormatDataType); } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java index 83a7f36e..c02fa14c 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java @@ -48,8 +48,8 @@ import java.util.stream.Collectors; import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL; +import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH; import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.METRIC_FETCHER_UPDATE_INTERVAL_MS; -import static org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH; /** Test environment for running jobs on Flink mini-cluster. */ @Experimental @@ -91,7 +91,7 @@ public StreamExecutionEnvironment createExecutionEnvironment( TestEnvironmentSettings envOptions) { Configuration configuration = new Configuration(); if (envOptions.getSavepointRestorePath() != null) { - configuration.setString(SAVEPOINT_PATH, envOptions.getSavepointRestorePath()); + configuration.set(SAVEPOINT_PATH, envOptions.getSavepointRestorePath()); } return new TestStreamEnvironment( this.miniCluster.getMiniCluster(), diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformationTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformationTest.java index 2f56f902..3b35a1c4 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformationTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeInformationTest.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.pulsar.common.schema; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.connector.pulsar.testutils.SampleData.Bar; import org.apache.flink.util.InstantiationUtil; @@ -40,7 +40,7 @@ void pulsarTypeInfoSerializationAndCreation() throws Exception { PulsarSchemaTypeInformation clonedInfo = InstantiationUtil.clone(info); assertThat(clonedInfo).isEqualTo(info).isNotSameAs(info); - assertThatCode(() -> info.createSerializer(new ExecutionConfig())) + assertThatCode(() -> info.createSerializer(new SerializerConfigImpl())) .doesNotThrowAnyException(); assertThat(clonedInfo.getTypeClass()).isEqualTo(info.getTypeClass()); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactoryTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactoryTest.java index 14d907db..07d0e435 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactoryTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/factories/AvroSchemaFactoryTest.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.pulsar.common.schema.factories; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestInputView; import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestOutputView; @@ -91,7 +91,7 @@ void createAvroTypeInformationAndSerializeValues() throws Exception { // Serialize by type information. TypeSerializer serializer = - information.createSerializer(new ExecutionConfig()); + information.createSerializer(new SerializerConfigImpl()); // TypeInformation serialization. assertThatCode(() -> InstantiationUtil.clone(information)).doesNotThrowAnyException(); assertThatCode(() -> InstantiationUtil.clone(serializer)).doesNotThrowAnyException(); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java index d24f2305..8c57e56c 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java @@ -18,15 +18,14 @@ package org.apache.flink.connector.pulsar.sink.writer; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobInfo; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; @@ -162,7 +161,7 @@ public TopicPartition route( } } - private static class MockInitContext implements InitContext { + private static class MockInitContext implements WriterInitContext { private final MetricListener metricListener; private final OperatorIOMetricGroup ioMetricGroup; @@ -196,37 +195,58 @@ public ProcessingTimeService getProcessingTimeService() { } @Override - public int getSubtaskId() { - return 0; + public boolean isObjectReuseEnabled() { + return false; } @Override - public int getNumberOfParallelSubtasks() { - return 1; + public TypeSerializer createInputSerializer() { + return null; } @Override - public int getAttemptNumber() { - return 0; + public JobInfo getJobInfo() { + return null; } - // The following three methods are for compatibility with - // https://github.com/apache/flink/commit/4f5b2fb5736f5a1c098a7dc1d448a879f36f801b - // . Removed the commented out `@Override` when we move to 1.18. + @Override + public TaskInfo getTaskInfo() { + return new TaskInfo() { + @Override + public String getTaskName() { + return "test"; + } - // @Override - public boolean isObjectReuseEnabled() { - return false; - } + @Override + public int getMaxNumberOfParallelSubtasks() { + return 1; + } - // @Override - public TypeSerializer createInputSerializer() { - return null; - } + @Override + public int getIndexOfThisSubtask() { + return 0; + } - // @Override - public JobID getJobId() { - return null; + @Override + public int getNumberOfParallelSubtasks() { + return 1; + } + + @Override + public int getAttemptNumber() { + return 0; + } + + @Override + public String getTaskNameWithSubtasks() { + return "test (1/1)"; + } + + @Override + public String getAllocationIDAsString() { + return "test-alloc"; + } + }; } @Override @@ -254,16 +274,6 @@ public UserCodeClassLoader getUserCodeClassLoader() { } }; } - - @Override - public JobInfo getJobInfo() { - return null; - } - - @Override - public TaskInfo getTaskInfo() { - return null; - } } private static class MockSinkWriterContext implements SinkWriter.Context { diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java index ecdd820e..993069d9 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.pulsar.source.reader.deserializer; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; @@ -104,7 +104,7 @@ void createFromPulsarSchema() throws Exception { @Test void createFromFlinkTypeInformation() throws Exception { PulsarDeserializationSchema schema = - new PulsarTypeInformationWrapper<>(Types.STRING, new ExecutionConfig()); + new PulsarTypeInformationWrapper<>(Types.STRING, new SerializerConfigImpl()); schema.open(new PulsarTestingDeserializationContext(), sourceConfig); assertThatCode(() -> InstantiationUtil.clone(schema)).doesNotThrowAnyException(); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java index eb839679..fc0b9eab 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java @@ -22,7 +22,7 @@ import org.apache.flink.connector.pulsar.table.testutils.TestingUser; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.test.util.SuccessException; diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java index cecd3e27..42b3ccc2 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.pulsar.table; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.connector.pulsar.common.MiniClusterTestEnvironment; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; @@ -58,7 +57,6 @@ public void beforeAll() throws Exception { // run env env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); tableEnv = StreamTableEnvironment.create(env); tableEnv.getConfig() .getConfiguration() diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/TestingUser.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/TestingUser.java index b597f3fa..e0b64f2b 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/TestingUser.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/TestingUser.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.pulsar.table.testutils; import java.io.Serializable; diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java index 63db956b..fb9211fa 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.testutils.junit.SharedObjectsExtension; import org.apache.flink.testutils.junit.SharedReference; diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/remote/PulsarRemoteRuntime.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/remote/PulsarRemoteRuntime.java index 4fddd624..b118d0b4 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/remote/PulsarRemoteRuntime.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/remote/PulsarRemoteRuntime.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.pulsar.testutils.runtime.remote; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java index 3c384ddd..137f3bbc 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.pulsar.testutils.sink; import org.apache.flink.api.common.typeinfo.TypeInformation; diff --git a/pom.xml b/pom.xml index 6f5bc93b..ed19d386 100644 --- a/pom.xml +++ b/pom.xml @@ -51,14 +51,11 @@ under the License. - 1.20.3 + 2.2.1 3.0.5 2.12 1.69 - 2.12.7 - 2.12.7 - 1.3.9 5.9.1 3.23.1 @@ -66,8 +63,7 @@ under the License. 1.0.1 1.21.4 - false - 3.0.0-1.16 + true 3.25.5 1.7.36 @@ -77,7 +73,6 @@ under the License. 3.14.0 2.15.1 1.12.20 - 2.24.0 3.3 2.13.4.20221013 1.1.10.4 @@ -417,12 +412,6 @@ under the License. ${byte-buddy.version} - - com.esotericsoftware.kryo - kryo - ${kryo.version} - - org.objenesis objenesis @@ -435,18 +424,6 @@ under the License. ${protobuf.version} - - org.scala-lang - scala-reflect - ${scala-reflect.version} - - - - org.scala-lang - scala-library - ${scala-library.version} - - org.xerial.snappy snappy-java