Skip to content

Commit 1f236fe

Browse files
committed
Remove obsolete offset within range check
1 parent e0b9d03 commit 1f236fe

File tree

3 files changed

+6
-53
lines changed

3 files changed

+6
-53
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

-12
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ public boolean start() throws IOException {
151151
@Override
152152
public boolean advance() throws IOException {
153153
/* Read first record (if any). we need to loop here because :
154-
* - (a) some records initially need to be skipped if they are before consumedOffset
155154
* - (b) if curBatch is empty, we want to fetch next batch and then advance.
156155
* - (c) curBatch is an iterator of iterators. we interleave the records from each.
157156
* curBatch.next() might return an empty iterator.
@@ -176,17 +175,6 @@ public boolean advance() throws IOException {
176175
long expected = pState.nextOffset;
177176
long offset = rawRecord.offset();
178177

179-
if (offset < expected) { // -- (a)
180-
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
181-
// should we check if the offset is way off from consumedOffset (say > 1M)?
182-
LOG.warn(
183-
"{}: ignoring already consumed offset {} for {}",
184-
this,
185-
offset,
186-
pState.topicPartition);
187-
continue;
188-
}
189-
190178
// Apply user deserializers. User deserializers might throw, which will be propagated up
191179
// and 'curRecord' remains unchanged. The runner should close this reader.
192180
// TODO: write records that can't be deserialized to a "dead-letter" additional output.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

+2-35
Original file line numberDiff line numberDiff line change
@@ -440,12 +440,9 @@ public ProcessContinuation processElement(
440440
LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor);
441441
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
442442
consumer.assign(ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
443-
long startOffset = tracker.currentRestriction().getFrom();
444-
long expectedOffset = startOffset;
445-
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
443+
long expectedOffset = tracker.currentRestriction().getFrom();
444+
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), expectedOffset);
446445
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
447-
long skippedRecords = 0L;
448-
final Stopwatch sw = Stopwatch.createStarted();
449446

450447
while (true) {
451448
// Fetch the record size accumulator.
@@ -465,36 +462,6 @@ public ProcessContinuation processElement(
465462
return ProcessContinuation.resume();
466463
}
467464
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
468-
// If the Kafka consumer returns a record with an offset that is already processed
469-
// the record can be safely skipped. This is needed because there is a possibility
470-
// that the seek() above fails to move the offset to the desired position. In which
471-
// case poll() would return records that are already cnsumed.
472-
if (rawRecord.offset() < startOffset) {
473-
// If the start offset is not reached even after skipping the records for 10 seconds
474-
// then the processing is stopped with a backoff to give the Kakfa server some time
475-
// catch up.
476-
if (sw.elapsed().getSeconds() > 10L) {
477-
LOG.error(
478-
"The expected offset ({}) was not reached even after"
479-
+ " skipping consumed records for 10 seconds. The offset we could"
480-
+ " reach was {}. The processing of this bundle will be attempted"
481-
+ " at a later time.",
482-
expectedOffset,
483-
rawRecord.offset());
484-
return ProcessContinuation.resume()
485-
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
486-
}
487-
skippedRecords++;
488-
continue;
489-
}
490-
if (skippedRecords > 0L) {
491-
LOG.warn(
492-
"{} records were skipped due to seek returning an"
493-
+ " earlier position than requested position of {}",
494-
skippedRecords,
495-
expectedOffset);
496-
skippedRecords = 0L;
497-
}
498465
if (!tracker.tryClaim(rawRecord.offset())) {
499466
return ProcessContinuation.stop();
500467
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertThrows;
2223
import static org.junit.Assert.assertTrue;
2324

2425
import java.nio.charset.StandardCharsets;
@@ -523,12 +524,9 @@ public void testProcessElementWithEarlierOffset() throws Exception {
523524
new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
524525
KafkaSourceDescriptor descriptor =
525526
KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
526-
ProcessContinuation result =
527-
dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver);
528-
assertEquals(ProcessContinuation.stop(), result);
529-
assertEquals(
530-
createExpectedRecords(descriptor, startOffset, 3, "key", "value"),
531-
receiver.getGoodRecords());
527+
assertThrows(
528+
IllegalArgumentException.class,
529+
() -> dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver));
532530
}
533531

534532
@Test

0 commit comments

Comments
 (0)