Skip to content

Commit 10775fe

Browse files
authored
Provide kafka unbounded reader to checkpoint mark when offset based deduplication is supported. (#34669)
1 parent 7b27e8a commit 10775fe

File tree

2 files changed

+4
-3
lines changed

2 files changed

+4
-3
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
*/
3636
@DefaultCoder(AvroCoder.class)
3737
public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
38+
private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
3839

3940
private List<PartitionMark> partitions;
4041

@@ -44,8 +45,6 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
4445
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
4546
private KafkaCheckpointMark() {} // for Avro
4647

47-
private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
48-
4948
public KafkaCheckpointMark(
5049
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
5150
this.partitions = partitions;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ public CheckpointMark getCheckpointMark() {
268268
p.nextOffset,
269269
p.lastWatermark.getMillis()))
270270
.collect(Collectors.toList()),
271-
source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty());
271+
source.getSpec().isCommitOffsetsInFinalizeEnabled() || offsetBasedDeduplicationSupported()
272+
? Optional.of(this)
273+
: Optional.empty());
272274
}
273275

274276
@Override

0 commit comments

Comments
 (0)