@@ -230,6 +230,181 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
230230 }
231231 }
232232
233+ @ ClusterTest (
234+ types = Array (Type .KRAFT ),
235+ serverProperties = Array (
236+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_PARTITIONS_CONFIG , value = " 1" ),
237+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = " 1" )
238+ )
239+ )
240+ def testCoordinatorFailoverAfterCompactingPartitionWithStreamsGroupMemberJoiningAndLeaving (): Unit = {
241+ withAdmin { admin =>
242+ TestUtils .createTopicWithAdminRaw(
243+ admin = admin,
244+ topic = " foo" ,
245+ numPartitions = 3
246+ )
247+
248+ // Create a streams group grp1s with one member. The member joins and leaves. This creates
249+ // a mix of group records with tombstones to delete the member.
250+ withStreamsApp(applicationId = " grp1s" , inputTopic = " foo" )
251+ }
252+
253+ // Force a compaction.
254+ rollAndCompactConsumerOffsets()
255+
256+ // Restart the broker to reload the group coordinator.
257+ cluster.shutdownBroker(0 )
258+ cluster.startBroker(0 )
259+
260+ // Verify the state of the groups to ensure that the group coordinator
261+ // was correctly loaded. If replaying any of the records fails, the
262+ // group coordinator won't be available.
263+ withAdmin { admin =>
264+ val groups = admin
265+ .describeStreamsGroups(java.util.List .of(" grp1s" ))
266+ .describedGroups()
267+ .asScala
268+ .toMap
269+
270+ val group = groups(" grp1s" ).get(10 , TimeUnit .SECONDS )
271+ assertEquals(" grp1s" , group.groupId)
272+ assertEquals(GroupState .EMPTY , group.groupState)
273+ }
274+ }
275+
276+ @ ClusterTest (
277+ types = Array (Type .KRAFT ),
278+ serverProperties = Array (
279+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_PARTITIONS_CONFIG , value = " 1" ),
280+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = " 1" )
281+ )
282+ )
283+ def testCoordinatorFailoverCompactingPartitionWithStreamsGroupMemberLeavingAndRejoining (): Unit = {
284+ withAdmin { admin =>
285+ TestUtils .createTopicWithAdminRaw(
286+ admin = admin,
287+ topic = " foo" ,
288+ numPartitions = 3
289+ )
290+
291+ // Create a streams group grp2s with one member. The member joins, leaves, and rejoins.
292+ // This creates a mix of group records with tombstones and ensures that all the offset
293+ // commit records are before the streams group records due to the rebalance after rejoining.
294+ withStreamsApp(applicationId = " grp2s" , inputTopic = " foo" )
295+ withStreamsApp(applicationId = " grp2s" , inputTopic = " foo" )
296+ }
297+
298+ // Force a compaction.
299+ rollAndCompactConsumerOffsets()
300+
301+ // Restart the broker to reload the group coordinator.
302+ cluster.shutdownBroker(0 )
303+ cluster.startBroker(0 )
304+
305+ // Verify the state of the groups to ensure that the group coordinator
306+ // was correctly loaded. If replaying any of the records fails, the
307+ // group coordinator won't be available.
308+ withAdmin { admin =>
309+ val groups = admin
310+ .describeStreamsGroups(java.util.List .of(" grp2s" ))
311+ .describedGroups()
312+ .asScala
313+ .toMap
314+
315+ val group = groups(" grp2s" ).get(10 , TimeUnit .SECONDS )
316+ assertEquals(" grp2s" , group.groupId)
317+ assertEquals(GroupState .EMPTY , group.groupState)
318+ }
319+ }
320+
321+ @ ClusterTest (
322+ types = Array (Type .KRAFT ),
323+ serverProperties = Array (
324+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_PARTITIONS_CONFIG , value = " 1" ),
325+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = " 1" )
326+ )
327+ )
328+ def testCoordinatorFailoverAfterCompactingPartitionWithStreamsGroupDeleted (): Unit = {
329+ withAdmin { admin =>
330+ TestUtils .createTopicWithAdminRaw(
331+ admin = admin,
332+ topic = " foo" ,
333+ numPartitions = 3
334+ )
335+
336+ // Create a streams group grp3s with one member. The member joins and leaves the group. Then
337+ // the group is deleted. This creates tombstones to delete the member, the group and the offsets.
338+ withStreamsApp(applicationId = " grp3s" , inputTopic = " foo" )
339+
340+ admin
341+ .deleteConsumerGroups(java.util.List .of(" grp3s" ))
342+ .deletedGroups()
343+ .get(" grp3s" )
344+ .get(10 , TimeUnit .SECONDS )
345+ }
346+
347+ // Force a compaction.
348+ rollAndCompactConsumerOffsets()
349+
350+ // Restart the broker to reload the group coordinator.
351+ cluster.shutdownBroker(0 )
352+ cluster.startBroker(0 )
353+
354+ // Verify the state of the groups to ensure that the group coordinator
355+ // was correctly loaded. If replaying any of the records fails, the
356+ // group coordinator won't be available.
357+ withAdmin { admin =>
358+ val groups = admin
359+ .describeStreamsGroups(java.util.List .of(" grp3s" ))
360+ .describedGroups()
361+ .asScala
362+ .toMap
363+
364+ assertDescribedDeadGroup(groups, " grp3s" )
365+ }
366+ }
367+
368+ @ ClusterTest (
369+ types = Array (Type .KRAFT ),
370+ serverProperties = Array (
371+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_PARTITIONS_CONFIG , value = " 1" ),
372+ new ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = " 1" )
373+ )
374+ )
375+ def testRecreatingConsumerOffsetsTopicWithStreamsGroup (): Unit = {
376+ withAdmin { admin =>
377+ TestUtils .createTopicWithAdminRaw(
378+ admin = admin,
379+ topic = " foo" ,
380+ numPartitions = 3
381+ )
382+
383+ withStreamsApp(applicationId = " groups" , inputTopic = " foo" )
384+
385+ admin
386+ .deleteTopics(TopicCollection .ofTopicNames(List (Topic .GROUP_METADATA_TOPIC_NAME ).asJava))
387+ .all()
388+ .get()
389+
390+ TestUtils .waitUntilTrue(() => {
391+ try {
392+ admin
393+ .describeTopics(TopicCollection .ofTopicNames(List (Topic .GROUP_METADATA_TOPIC_NAME ).asJava))
394+ .topicNameValues()
395+ .get(Topic .GROUP_METADATA_TOPIC_NAME )
396+ .get(JTestUtils .DEFAULT_MAX_WAIT_MS , TimeUnit .MILLISECONDS )
397+ false
398+ } catch {
399+ case e : ExecutionException =>
400+ e.getCause.isInstanceOf [UnknownTopicOrPartitionException ]
401+ }
402+ }, msg = s " ${Topic .GROUP_METADATA_TOPIC_NAME } was not deleted " )
403+
404+ withStreamsApp(applicationId = " groups" , inputTopic = " foo" )
405+ }
406+ }
407+
233408 @ ClusterTest (
234409 types = Array (Type .KRAFT ),
235410 serverProperties = Array (
@@ -727,7 +902,7 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
727902 }
728903
729904 private def assertDescribedDeadGroup (
730- groups : Map [String , KafkaFuture [ConsumerGroupDescription ]],
905+ groups : Map [String , _ <: KafkaFuture [_ ]],
731906 groupId : String
732907 ): Unit = {
733908 try {
0 commit comments