diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 4271d6f72a03..1ef0f42bf8ea 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -35,6 +35,7 @@ */ @DefaultCoder(AvroCoder.class) public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { + private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1; private List partitions; @@ -44,8 +45,6 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private KafkaCheckpointMark() {} // for Avro - private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1; - public KafkaCheckpointMark( List partitions, Optional> reader) { this.partitions = partitions; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 074bba54ac21..4680304d5a6c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -268,7 +268,9 @@ public CheckpointMark getCheckpointMark() { p.nextOffset, p.lastWatermark.getMillis())) .collect(Collectors.toList()), - source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty()); + source.getSpec().isCommitOffsetsInFinalizeEnabled() || offsetBasedDeduplicationSupported() + ? Optional.of(this) + : Optional.empty()); } @Override