Skip to content

Commit f243762

Browse files
authored
Fix handling of ReplayPolicy.IGNORE #87 (#88)
1 parent 4eb525d commit f243762

File tree

2 files changed

+38
-11
lines changed

2 files changed

+38
-11
lines changed

src/main/java/com/salesforce/mirus/MirusSourceTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,9 @@ List<SourceRecord> sourceRecords(ConsumerRecords<byte[], byte[]> pollResult) {
253253
List<SourceRecord> sourceRecords = new ArrayList<>(pollResult.count());
254254
pollResult.forEach(
255255
consumerRecord -> {
256-
if (replayPolicy == ReplayPolicy.FILTER && !isSkippedRecord(consumerRecord)) {
256+
if (replayPolicy == ReplayPolicy.FILTER && isSkippedRecord(consumerRecord)) {
257+
// Skipping duplicate record
258+
} else {
257259
sourceRecords.add(toSourceRecord(consumerRecord));
258260
}
259261
});

src/test/java/com/salesforce/mirus/MirusSourceTaskTest.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,17 @@ public <T> Map<Map<String, T>, Map<String, Object>> offsets(
9898
}
9999
};
100100
mirusSourceTask.initialize(context);
101-
mirusSourceTask.start(mockTaskProperties());
101+
mirusSourceTask.start(mockTaskProperties(ReplayPolicy.FILTER));
102102
}
103103

104-
private Map<String, String> mockTaskProperties() {
104+
private Map<String, String> mockTaskProperties(ReplayPolicy replayPolicy) {
105105
Map<String, String> properties = new HashMap<>();
106106
List<TopicPartition> topicPartitionList = new ArrayList<>();
107107
topicPartitionList.add(new TopicPartition(TOPIC, 0));
108108
topicPartitionList.add(new TopicPartition(TOPIC, 1));
109109
properties.put(
110110
TaskConfigDefinition.PARTITION_LIST, TopicPartitionSerDe.toJson(topicPartitionList));
111-
properties.put(TaskConfigDefinition.REPLAY_POLICY, ReplayPolicy.FILTER.toString());
111+
properties.put(TaskConfigDefinition.REPLAY_POLICY, replayPolicy.toString());
112112
properties.put(TaskConfigDefinition.REPLAY_WINDOW_RECORDS, "0");
113113
return properties;
114114
}
@@ -186,7 +186,7 @@ public void testSourceRecordsWorksWithHeadersWithHeaderConverter() {
186186
final int offset = 123;
187187
final long timestamp = 314159;
188188

189-
Map<String, String> properties = mockTaskProperties();
189+
Map<String, String> properties = mockTaskProperties(ReplayPolicy.FILTER);
190190
properties.put(
191191
SourceConfigDefinition.SOURCE_HEADER_CONVERTER.getKey(),
192192
"org.apache.kafka.connect.json.JsonConverter");
@@ -251,7 +251,7 @@ public void testSourceRecordsWorksWithNoHeaders() {
251251

252252
@Test
253253
public void testJsonConverterRecord() {
254-
Map<String, String> properties = mockTaskProperties();
254+
Map<String, String> properties = mockTaskProperties(ReplayPolicy.FILTER);
255255
properties.put(
256256
SourceConfigDefinition.SOURCE_KEY_CONVERTER.getKey(),
257257
"org.apache.kafka.connect.json.JsonConverter");
@@ -305,6 +305,31 @@ public void testReplayFilterOnePartition() {
305305
assertThat(result.get(0).sourceOffset().get(MirusSourceTask.KEY_OFFSET), is(4L));
306306
}
307307

308+
@Test
309+
public void testReplayFilterIgnoreOnePartition() {
310+
mirusSourceTask.start(mockTaskProperties(ReplayPolicy.IGNORE));
311+
312+
mockConsumer.updateBeginningOffsets(Collections.singletonMap(new TopicPartition(TOPIC, 0), 0L));
313+
314+
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[] {}, new byte[] {}));
315+
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, new byte[] {}, new byte[] {}));
316+
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 2, new byte[] {}, new byte[] {}));
317+
List<SourceRecord> result = mirusSourceTask.poll();
318+
assertThat(result.size(), is(3));
319+
320+
// Simulate an offset reset
321+
mockConsumer.seekToBeginning(Collections.singletonList(new TopicPartition(TOPIC, 0)));
322+
323+
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[] {}, new byte[] {}));
324+
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, new byte[] {}, new byte[] {}));
325+
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 2, new byte[] {}, new byte[] {}));
326+
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 3, new byte[] {}, new byte[] {}));
327+
result = mirusSourceTask.poll();
328+
329+
assertThat(result.size(), is(4));
330+
assertThat(result.get(0).sourceOffset().get(MirusSourceTask.KEY_OFFSET), is(1L));
331+
}
332+
308333
@Test
309334
public void testReplayFilterTwoPartitions() {
310335

@@ -341,7 +366,7 @@ public void testReplayFilterTwoPartitions() {
341366
@Test
342367
public void testReplayFilterWindow() {
343368

344-
Map<String, String> properties = mockTaskProperties();
369+
Map<String, String> properties = mockTaskProperties(ReplayPolicy.FILTER);
345370
properties.put(TaskConfigDefinition.REPLAY_WINDOW_RECORDS, "2");
346371
mirusSourceTask.start(properties);
347372

@@ -380,7 +405,7 @@ public void shouldThrowExceptionWhenCommitFailed() {
380405
mirusSourceTask.commit();
381406

382407
// poll success but commit failed
383-
TaskConfig config = new TaskConfig(mockTaskProperties());
408+
TaskConfig config = new TaskConfig(mockTaskProperties(ReplayPolicy.IGNORE));
384409
long elapseTime = config.getCommitFailureRestartMs() / 2;
385410
when(mockTime.milliseconds()).thenReturn(currentMillis + elapseTime);
386411
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, new byte[] {}, new byte[] {}));
@@ -410,7 +435,7 @@ public void shouldNotThrowExceptionIfNotTimeToRestart() {
410435
mirusSourceTask.commit();
411436

412437
// poll success but commit failed
413-
TaskConfig config = new TaskConfig(mockTaskProperties());
438+
TaskConfig config = new TaskConfig(mockTaskProperties(ReplayPolicy.IGNORE));
414439
long elapseTime = config.getCommitFailureRestartMs() - 10;
415440
when(mockTime.milliseconds()).thenReturn(currentMillis + elapseTime);
416441
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, new byte[] {}, new byte[] {}));
@@ -435,7 +460,7 @@ public void shouldNotThrowExceptionIfNoNewDataInCommitWindow() {
435460
mirusSourceTask.commit();
436461

437462
// poll success but commit failed
438-
TaskConfig config = new TaskConfig(mockTaskProperties());
463+
TaskConfig config = new TaskConfig(mockTaskProperties(ReplayPolicy.IGNORE));
439464
// no new data
440465
long elapseTime = config.getCommitFailureRestartMs() + 10;
441466
when(mockTime.milliseconds()).thenReturn(currentMillis + elapseTime);
@@ -455,7 +480,7 @@ public void testConsumerClosedOnException() {
455480
when(localConsumer.poll(eq(1000L))).thenThrow(new KafkaException("Exception in poll"));
456481
MirusSourceTask mirusSourceTask = new MirusSourceTask(consumerProperties -> localConsumer);
457482
mirusSourceTask.initialize(context);
458-
mirusSourceTask.start(mockTaskProperties());
483+
mirusSourceTask.start(mockTaskProperties(ReplayPolicy.IGNORE));
459484

460485
// Mimic behaviour of WorkerSourceTask.execute()
461486
try {

0 commit comments

Comments
 (0)