Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.20.3 ]
jdk: [ '8, 11, 17' ]
flink: [ 2.2.1 ]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDK 8 removed correctly per Flink 2.x requirements. Update README.md to document minimum JDK 11 requirement.

jdk: [ '11, 17' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
Expand Down
9 changes: 8 additions & 1 deletion flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,21 @@ under the License.
<artifactId>flink-table-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate flink-table-planner dependency. Remove one.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.connector.pulsar.common.schema;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

Expand Down Expand Up @@ -67,7 +67,7 @@ public boolean isKeyType() {
}

@Override
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
public TypeSerializer<T> createSerializer(SerializerConfig config) {
return new PulsarSchemaTypeSerializer<>(schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public TypeSerializer<T> restoreSerializer() {

@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
TypeSerializerSnapshot<T> oldSerializerSnapshot) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always returning compatibleAsIs() may be too permissive. Document the serializer compatibility assumptions or add validation logic.

return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
Expand Down Expand Up @@ -81,7 +85,7 @@
* @param <IN> The input type of the sink.
*/
@PublicEvolving
public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommittable> {
public class PulsarSink<IN> implements Sink<IN>, SupportsCommitter<PulsarCommittable> {
private static final long serialVersionUID = 4416714587951282119L;

private final SinkConfiguration sinkConfiguration;
Expand Down Expand Up @@ -129,7 +133,7 @@ public static <IN> PulsarSinkBuilder<IN> builder() {

@Internal
@Override
public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext initContext)
public CommittingSinkWriter<IN, PulsarCommittable> createWriter(WriterInitContext initContext)
throws PulsarClientException {
return new PulsarWriter<>(
sinkConfiguration,
Expand All @@ -143,7 +147,7 @@ public PrecommittingSinkWriter<IN, PulsarCommittable> createWriter(InitContext i

@Internal
@Override
public Committer<PulsarCommittable> createCommitter() {
public Committer<PulsarCommittable> createCommitter(CommitterInitContext context) {
return new PulsarCommitter(sinkConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.connector.pulsar.sink.config;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
Expand Down Expand Up @@ -80,9 +80,9 @@ public DeliveryGuarantee getDeliveryGuarantee() {
/**
* Pulsar's transactions have a timeout mechanism for the uncommitted transaction. We use
* transactions for making sure the message could be written only once. Since the checkpoint
* interval couldn't be acquired from {@link InitContext}, we have to expose this option. Make
* sure this value is greater than the checkpoint interval. Create a pulsar producer builder by
* using the given Configuration.
* interval couldn't be acquired from {@link WriterInitContext}, we have to expose this option.
* Make sure this value is greater than the checkpoint interval. Create a pulsar producer
* builder by using the given Configuration.
*/
public long getTransactionTimeoutMillis() {
return transactionTimeoutMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
Expand Down Expand Up @@ -67,7 +68,7 @@
* @param <IN> The type of the input elements.
*/
@Internal
public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
public class PulsarWriter<IN> implements CommittingSinkWriter<IN, PulsarCommittable> {
private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);

private final PulsarSerializationSchema<IN> serializationSchema;
Expand Down Expand Up @@ -101,7 +102,7 @@ public PulsarWriter(
TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
PulsarCrypto pulsarCrypto,
InitContext initContext)
WriterInitContext initContext)
throws PulsarClientException {
checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
Expand Down Expand Up @@ -139,7 +140,8 @@ public PulsarWriter(
}

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
public void write(IN element, SinkWriter.Context context)
throws IOException, InterruptedException {
PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);

// Choose the right topic to send.
Expand Down Expand Up @@ -185,7 +187,8 @@ private void throwSendingException(String topic, Throwable ex) {

@SuppressWarnings({"rawtypes", "unchecked"})
private TypedMessageBuilder<?> createMessageBuilder(
String topic, Context context, PulsarMessage<?> message) throws PulsarClientException {
String topic, SinkWriter.Context context, PulsarMessage<?> message)
throws PulsarClientException {

Schema<?> schema = message.getSchema();
TypedMessageBuilder<?> builder = producerRegister.createMessageBuilder(topic, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
Expand All @@ -38,11 +38,11 @@ public class PulsarSinkContextImpl implements PulsarSinkContext {
private final MetadataListener metadataListener;

public PulsarSinkContextImpl(
InitContext initContext,
WriterInitContext initContext,
SinkConfiguration sinkConfiguration,
MetadataListener metadataListener) {
this.parallelInstanceId = initContext.getSubtaskId();
this.numberOfParallelSubtasks = initContext.getNumberOfParallelSubtasks();
this.parallelInstanceId = initContext.getTaskInfo().getIndexOfThisSubtask();
this.numberOfParallelSubtasks = initContext.getTaskInfo().getNumberOfParallelSubtasks();
this.processingTimeService = initContext.getProcessingTimeService();
this.enableSchemaEvolution = sinkConfiguration.isEnableSchemaEvolution();
this.metadataListener = metadataListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.connector.pulsar.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -434,7 +434,7 @@ public <K, V, T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
* only used for treating messages that was written into pulsar by {@link TypeInformation}.
*/
public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
TypeInformation<T> information, ExecutionConfig config) {
TypeInformation<T> information, SerializerConfig config) {
return setDeserializationSchema(new PulsarTypeInformationWrapper<>(information, config));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;

Expand Down Expand Up @@ -59,16 +56,13 @@ public class PulsarSourceFetcherManager
/**
* Creates a new SplitFetcherManager with multiple I/O threads.
*
* @param elementsQueue The queue that is used to hand over data from the I/O thread (the
* fetchers) to the reader, which emits the records and book-keeps the state. This must be
* the same queue instance that is also passed to the {@link SourceReaderBase}.
* @param splitReaderSupplier The factory for the split reader that connects to the source
* @param configuration The configuration for the fetcher manager
*/
public PulsarSourceFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier,
Configuration configuration) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor signature change is correct for Flink 2.x. Consider adding Javadoc to note the elementsQueue parameter removal.

super(elementsQueue, splitReaderSupplier, configuration);
super(splitReaderSupplier, configuration);
}

/**
Expand All @@ -86,7 +80,7 @@ public void addSplits(List<PulsarPartitionSplit> splitsToAdd) {
}
}

// @Override // to keep compatible with Flink 1.17
@Override
public void removeSplits(List<PulsarPartitionSplit> splitsToRemove) {
// TODO empty - wait for FLINK-31748 to implement it.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
Expand Down Expand Up @@ -84,14 +82,12 @@ public class PulsarSourceReader<OUT>
private ScheduledExecutorService cursorScheduler;

private PulsarSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
PulsarSourceFetcherManager fetcherManager,
PulsarDeserializationSchema<OUT> deserializationSchema,
SourceConfiguration sourceConfiguration,
PulsarClient pulsarClient,
SourceReaderContext context) {
super(
elementsQueue,
fetcherManager,
new PulsarRecordEmitter<>(deserializationSchema),
sourceConfiguration,
Expand Down Expand Up @@ -252,11 +248,6 @@ public static <OUT> PulsarSourceReader<OUT> create(
SourceReaderContext readerContext)
throws Exception {

// Create a message queue with the predefined source option.
int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue =
new FutureCompletingBlockingQueue<>(queueCapacity);

PulsarClient pulsarClient = createClient(sourceConfiguration);

// Initialize the deserialization schema before creating the pulsar reader.
Expand Down Expand Up @@ -287,10 +278,9 @@ public static <OUT> PulsarSourceReader<OUT> create(

PulsarSourceFetcherManager fetcherManager =
new PulsarSourceFetcherManager(
elementsQueue, splitReaderSupplier, readerContext.getConfiguration());
splitReaderSupplier, readerContext.getConfiguration());

return new PulsarSourceReader<>(
elementsQueue,
fetcherManager,
deserializationSchema,
sourceConfiguration,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.pulsar.source.reader.deserializer;

import org.apache.flink.annotation.Internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ default void open(PulsarInitializationContext context, SourceConfiguration confi

/** An interface for providing extra schema initial context for users. */
@PublicEvolving
public interface PulsarInitializationContext extends InitializationContext {
interface PulsarInitializationContext extends InitializationContext {

/** Return the internal client for extra dynamic features. */
PulsarClient getPulsarClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
package org.apache.flink.connector.pulsar.source.reader.deserializer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.util.Collector;

import org.apache.pulsar.client.api.Message;

/**
* Wrap the flink TypeInformation into a {@code PulsarDeserializationSchema}. We would create a
* flink {@code TypeSerializer} by using given ExecutionConfig. This execution config could be
* {@link ExecutionEnvironment#getConfig()}.
* flink {@code TypeSerializer} by using given SerializerConfig.
*/
@Internal
public class PulsarTypeInformationWrapper<T> implements PulsarDeserializationSchema<T> {
Expand All @@ -47,7 +45,7 @@ public class PulsarTypeInformationWrapper<T> implements PulsarDeserializationSch
private final TypeInformation<T> information;
private final TypeSerializer<T> serializer;

public PulsarTypeInformationWrapper(TypeInformation<T> information, ExecutionConfig config) {
public PulsarTypeInformationWrapper(TypeInformation<T> information, SerializerConfig config) {
this.information = information;
this.serializer = information.createSerializer(config);
}
Expand Down
Loading
Loading