From b5e68e6c84e08985c49b4977688298fad28207f0 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Fri, 18 Apr 2025 09:30:55 -0700 Subject: [PATCH 1/4] KafkaIO: provide reader to checkpoint mark --- .../sdk/io/kafka/KafkaCheckpointMark.java | 24 ++++++++++--------- .../sdk/io/kafka/KafkaUnboundedReader.java | 3 ++- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 4271d6f72a03..56ebeb0f51b4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.List; -import java.util.Optional; import org.apache.avro.reflect.AvroIgnore; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -35,21 +34,25 @@ */ @DefaultCoder(AvroCoder.class) public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { + private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1; private List partitions; @AvroIgnore - private Optional> reader; // Present when offsets need to be committed. + private KafkaUnboundedReader reader; // Present when offsets need to be committed. + + private boolean commitOffsetsInFinalize; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private KafkaCheckpointMark() {} // for Avro - private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1; - public KafkaCheckpointMark( - List partitions, Optional> reader) { + List partitions, + KafkaUnboundedReader reader, + boolean commitOffsetsInFinalize) { this.partitions = partitions; this.reader = reader; + this.commitOffsetsInFinalize = commitOffsetsInFinalize; } public List getPartitions() { @@ -58,7 +61,10 @@ public List getPartitions() { @Override public void finalizeCheckpoint() { - reader.ifPresent(r -> r.finalizeCheckpointMarkAsync(this)); + if (!commitOffsetsInFinalize) { + return; + } + reader.finalizeCheckpointMarkAsync(this); // Is it ok to commit asynchronously, or should we wait till this (or newer) is committed? // Often multiple marks would be finalized at once, since we only need to finalize the latest, // it is better to wait a little while. Currently maximum delay is same as KAFKA_POLL_TIMEOUT @@ -72,11 +78,7 @@ public String toString() { @Override public byte[] getOffsetLimit() { - if (!reader.isPresent()) { - throw new RuntimeException( - "KafkaCheckpointMark reader is not present while calling getOffsetLimit()."); - } - if (!reader.get().offsetBasedDeduplicationSupported()) { + if (!reader.offsetBasedDeduplicationSupported()) { throw new RuntimeException( "Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication."); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 074bba54ac21..028dcc5a9ae1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -268,7 +268,8 @@ public CheckpointMark getCheckpointMark() { p.nextOffset, p.lastWatermark.getMillis())) .collect(Collectors.toList()), - source.getSpec().isCommitOffsetsInFinalizeEnabled() ? Optional.of(this) : Optional.empty()); + this, + source.getSpec().isCommitOffsetsInFinalizeEnabled()); } @Override From b60a31ba1362d763140ece60d452ec269d79a2c4 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Fri, 18 Apr 2025 09:42:18 -0700 Subject: [PATCH 2/4] Remove outdated comment --- .../java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 56ebeb0f51b4..1b3f838edae6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -39,7 +39,7 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { private List partitions; @AvroIgnore - private KafkaUnboundedReader reader; // Present when offsets need to be committed. + private KafkaUnboundedReader reader; private boolean commitOffsetsInFinalize; From f7bec15a44c69d6caa3e8a1d8d610640366a2517 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Fri, 18 Apr 2025 10:03:51 -0700 Subject: [PATCH 3/4] Run spotless apply --- .../java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 1b3f838edae6..d99533b970ea 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -38,8 +38,7 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { private List partitions; - @AvroIgnore - private KafkaUnboundedReader reader; + @AvroIgnore private KafkaUnboundedReader reader; private boolean commitOffsetsInFinalize; From a9a4c18ddc140ffb58ab3fc9e98b33ab69561f07 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Fri, 18 Apr 2025 14:18:49 -0700 Subject: [PATCH 4/4] Revert removal of Optional on reader in checkpoint --- .../sdk/io/kafka/KafkaCheckpointMark.java | 22 +++++++++---------- .../sdk/io/kafka/KafkaUnboundedReader.java | 5 +++-- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index d99533b970ea..1ef0f42bf8ea 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.List; +import java.util.Optional; import org.apache.avro.reflect.AvroIgnore; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -38,20 +39,16 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark { private List partitions; - @AvroIgnore private KafkaUnboundedReader reader; - - private boolean commitOffsetsInFinalize; + @AvroIgnore + private Optional> reader; // Present when offsets need to be committed. @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private KafkaCheckpointMark() {} // for Avro public KafkaCheckpointMark( - List partitions, - KafkaUnboundedReader reader, - boolean commitOffsetsInFinalize) { + List partitions, Optional> reader) { this.partitions = partitions; this.reader = reader; - this.commitOffsetsInFinalize = commitOffsetsInFinalize; } public List getPartitions() { @@ -60,10 +57,7 @@ public List getPartitions() { @Override public void finalizeCheckpoint() { - if (!commitOffsetsInFinalize) { - return; - } - reader.finalizeCheckpointMarkAsync(this); + reader.ifPresent(r -> r.finalizeCheckpointMarkAsync(this)); // Is it ok to commit asynchronously, or should we wait till this (or newer) is committed? // Often multiple marks would be finalized at once, since we only need to finalize the latest, // it is better to wait a little while. Currently maximum delay is same as KAFKA_POLL_TIMEOUT @@ -77,7 +71,11 @@ public String toString() { @Override public byte[] getOffsetLimit() { - if (!reader.offsetBasedDeduplicationSupported()) { + if (!reader.isPresent()) { + throw new RuntimeException( + "KafkaCheckpointMark reader is not present while calling getOffsetLimit()."); + } + if (!reader.get().offsetBasedDeduplicationSupported()) { throw new RuntimeException( "Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication."); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 028dcc5a9ae1..4680304d5a6c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -268,8 +268,9 @@ public CheckpointMark getCheckpointMark() { p.nextOffset, p.lastWatermark.getMillis())) .collect(Collectors.toList()), - this, - source.getSpec().isCommitOffsetsInFinalizeEnabled()); + source.getSpec().isCommitOffsetsInFinalizeEnabled() || offsetBasedDeduplicationSupported() + ? Optional.of(this) + : Optional.empty()); } @Override