Skip to content

Commit 346781a

Browse files
committed
tests
1 parent e706d37 commit 346781a

File tree

2 files changed

+114
-0
lines changed

2 files changed

+114
-0
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
3939
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
4040
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
41+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
4142
import static org.junit.jupiter.api.Assertions.assertEquals;
4243
import static org.junit.jupiter.api.Assertions.assertNotNull;
4344
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -298,6 +299,58 @@ private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol
298299
}
299300
}
300301

302+
@ClusterTest
303+
public void testAsyncPollAfterTopicDeleted() throws Exception {
304+
testPollAfterTopicDeleted(GroupProtocol.CONSUMER);
305+
}
306+
307+
@ClusterTest
308+
public void testClassicPollAfterTopicDeleted() throws Exception {
309+
testPollAfterTopicDeleted(GroupProtocol.CLASSIC);
310+
}
311+
312+
/**
313+
* Test that poll() doesn't fail to retrieve committed offsets after the assigned topic is deleted.
314+
* Validates fix for KAFKA-20165.
315+
*/
316+
private void testPollAfterTopicDeleted(GroupProtocol groupProtocol) throws Exception {
317+
String topicToDelete = "topic-to-delete-assign";
318+
clusterInstance.createTopic(topicToDelete, 1, (short) BROKER_COUNT);
319+
TopicPartition tpToDelete = new TopicPartition(topicToDelete, 0);
320+
321+
int numRecords = 10;
322+
long startingTimestamp = System.currentTimeMillis();
323+
324+
Map<String, Object> consumerConfig = Map.of(
325+
GROUP_PROTOCOL_CONFIG, groupProtocol.name,
326+
GROUP_ID_CONFIG, "test-group",
327+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
328+
329+
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(consumerConfig);
330+
var admin = clusterInstance.admin()) {
331+
332+
// Send records and consume them (this caches the topic ID in the consumer)
333+
ClientsTestUtils.sendRecords(clusterInstance, tpToDelete, numRecords, startingTimestamp);
334+
consumer.assign(List.of(tpToDelete));
335+
consumer.seek(tpToDelete, 0);
336+
ClientsTestUtils.consumeAndVerifyRecords(consumer, tpToDelete, numRecords, 0, 0, startingTimestamp);
337+
consumer.commitSync();
338+
339+
// Delete the topic and wait for deletion to propagate
340+
admin.deleteTopics(List.of(topicToDelete)).all().get();
341+
Thread.sleep(1000);
342+
343+
// Change assignment to force the consumer to fetch committed offsets on next poll()
344+
// The consumer still has the topic ID cached, so it will use version 10+
345+
// and get UNKNOWN_TOPIC_ID from the broker
346+
consumer.unsubscribe();
347+
consumer.assign(List.of(tpToDelete));
348+
349+
// poll() should not throw - internally fetches committed offsets for deleted topic and recovers
350+
assertDoesNotThrow(() -> consumer.poll(java.time.Duration.ofMillis(5000)));
351+
}
352+
}
353+
301354
private static class CountConsumerCommitCallback implements OffsetCommitCallback {
302355
int successCount = 0;
303356
int failCount = 0;

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,67 @@ private void testNoCommittedOffsets(GroupProtocol groupProtocol) {
208208
}
209209
}
210210

211+
/**
212+
* Test that calling consumer.committed() for a deleted topic returns null for each partition.
213+
* This validates the fix for KAFKA-20165.
214+
*/
215+
@ClusterTest
216+
public void testClassicConsumerCommittedDeletedTopic() throws Exception {
217+
testConsumerCommittedDeletedTopic(GroupProtocol.CLASSIC);
218+
}
219+
220+
/**
221+
* Test that calling consumer.committed() for a deleted topic returns null for each partition.
222+
* This validates the fix for KAFKA-20165.
223+
*/
224+
@ClusterTest
225+
public void testAsyncConsumerCommittedDeletedTopic() throws Exception {
226+
testConsumerCommittedDeletedTopic(GroupProtocol.CONSUMER);
227+
}
228+
229+
/**
230+
* Common test logic for both Classic and Async consumer.
231+
* Tests that calling consumer.committed() for a deleted topic returns null.
232+
* As per the Javadoc for committed(): "If any of the partitions requested do not exist, the result
233+
* map will contain null as the value for that partition."
234+
*
235+
* For Classic consumer (topic names only), the GroupCoordinator returns no error for deleted topics (offsets -1).
236+
*
237+
* For Async consumer (topic IDs and topic names), the GroupCoordinator returns UNKNOWN_TOPIC_ID for the deleted topic
238+
* when the client uses topic IDs.
239+
* The consumer handles this as a retriable partition error and eventually returns null,
240+
* keeping the same contract as the ClassicConsumer.
241+
*/
242+
private void testConsumerCommittedDeletedTopic(GroupProtocol groupProtocol) throws Exception {
243+
String topicToDelete = "topic-to-delete";
244+
cluster.createTopic(topicToDelete, 1, (short) BROKER_COUNT);
245+
TopicPartition tpToDelete = new TopicPartition(topicToDelete, 0);
246+
247+
try (var consumer = createConsumer(groupProtocol, false);
248+
var admin = cluster.admin()) {
249+
consumer.assign(List.of(tpToDelete));
250+
251+
// Commit an offset to ensure the consumer has the topic ID cached and there's data to fetch
252+
consumer.commitSync(Map.of(tpToDelete, new OffsetAndMetadata(0)));
253+
254+
// Verify the commit was successful
255+
assertEquals(0, consumer.committed(Set.of(tpToDelete)).get(tpToDelete).offset());
256+
257+
// Delete the topic
258+
admin.deleteTopics(List.of(topicToDelete)).all().get();
259+
260+
// Eventually, the response should return null for the deleted topic partition.
261+
TestUtils.waitForCondition(
262+
() -> {
263+
var committed = consumer.committed(Set.of(tpToDelete), Duration.ofMillis(5000));
264+
return committed.get(tpToDelete) == null;
265+
},
266+
10000,
267+
"Expected null for deleted topic partition"
268+
);
269+
}
270+
}
271+
211272
@ClusterTest
212273
public void testClassicConsumerAsyncCommit() throws InterruptedException {
213274
testAsyncCommit(GroupProtocol.CLASSIC);

0 commit comments

Comments
 (0)