Skip to content

Provide kafka unbounded reader to checkpoint mark when offset based deduplication is supported. #34669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 19, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import org.apache.avro.reflect.AvroIgnore;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
Expand All @@ -35,21 +34,24 @@
*/
@DefaultCoder(AvroCoder.class)
public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

private List<PartitionMark> partitions;

@AvroIgnore
private Optional<KafkaUnboundedReader<?, ?>> reader; // Present when offsets need to be committed.
@AvroIgnore private KafkaUnboundedReader<?, ?> reader;

private boolean commitOffsetsInFinalize;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
List<PartitionMark> partitions,
KafkaUnboundedReader<?, ?> reader,
boolean commitOffsetsInFinalize) {
this.partitions = partitions;
this.reader = reader;
this.commitOffsetsInFinalize = commitOffsetsInFinalize;
}

public List<PartitionMark> getPartitions() {
Expand All @@ -58,7 +60,10 @@ public List<PartitionMark> getPartitions() {

@Override
public void finalizeCheckpoint() {
reader.ifPresent(r -> r.finalizeCheckpointMarkAsync(this));
if (!commitOffsetsInFinalize) {
return;
}
reader.finalizeCheckpointMarkAsync(this);
// Is it ok to commit asynchronously, or should we wait till this (or newer) is committed?
// Often multiple marks would be finalized at once, since we only need to finalize the latest,
// it is better to wait a little while. Currently maximum delay is same as KAFKA_POLL_TIMEOUT
Expand All @@ -72,11 +77,7 @@ public String toString() {

@Override
public byte[] getOffsetLimit() {
if (!reader.isPresent()) {
throw new RuntimeException(
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
}
if (!reader.get().offsetBasedDeduplicationSupported()) {
if (!reader.offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public CheckpointMark getCheckpointMark() {
p.nextOffset,
p.lastWatermark.getMillis()))
.collect(Collectors.toList()),
source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty());
this,
source.getSpec().isCommitOffsetsInFinalizeEnabled());
}

@Override
Expand Down
Loading