@@ -37,7 +37,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
37
37
import org .apache .kafka .common .message .LeaveGroupRequestData .MemberIdentity
38
38
import org .apache .kafka .common .message .ListOffsetsRequestData .{ListOffsetsPartition , ListOffsetsTopic }
39
39
import org .apache .kafka .common .message .OffsetForLeaderEpochRequestData .{OffsetForLeaderPartition , OffsetForLeaderTopic , OffsetForLeaderTopicCollection }
40
- import org .apache .kafka .common .message .{AddOffsetsToTxnRequestData , AlterPartitionReassignmentsRequestData , AlterReplicaLogDirsRequestData , ConsumerGroupDescribeRequestData , ConsumerGroupHeartbeatRequestData , CreateAclsRequestData , CreatePartitionsRequestData , CreateTopicsRequestData , DeleteAclsRequestData , DeleteGroupsRequestData , DeleteRecordsRequestData , DeleteTopicsRequestData , DescribeClusterRequestData , DescribeConfigsRequestData , DescribeGroupsRequestData , DescribeLogDirsRequestData , DescribeProducersRequestData , DescribeTransactionsRequestData , FetchResponseData , FindCoordinatorRequestData , HeartbeatRequestData , IncrementalAlterConfigsRequestData , JoinGroupRequestData , ListPartitionReassignmentsRequestData , ListTransactionsRequestData , MetadataRequestData , OffsetCommitRequestData , ProduceRequestData , SyncGroupRequestData , WriteTxnMarkersRequestData }
40
+ import org .apache .kafka .common .message .{AddOffsetsToTxnRequestData , AlterPartitionReassignmentsRequestData , AlterReplicaLogDirsRequestData , ConsumerGroupDescribeRequestData , ConsumerGroupHeartbeatRequestData , ConsumerGroupHeartbeatResponseData , CreateAclsRequestData , CreatePartitionsRequestData , CreateTopicsRequestData , DeleteAclsRequestData , DeleteGroupsRequestData , DeleteRecordsRequestData , DeleteTopicsRequestData , DescribeClusterRequestData , DescribeConfigsRequestData , DescribeGroupsRequestData , DescribeLogDirsRequestData , DescribeProducersRequestData , DescribeTransactionsRequestData , FetchResponseData , FindCoordinatorRequestData , HeartbeatRequestData , IncrementalAlterConfigsRequestData , JoinGroupRequestData , ListPartitionReassignmentsRequestData , ListTransactionsRequestData , MetadataRequestData , OffsetCommitRequestData , ProduceRequestData , SyncGroupRequestData , WriteTxnMarkersRequestData }
41
+ import org .apache .kafka .common .network .ListenerName
41
42
import org .apache .kafka .common .protocol .{ApiKeys , Errors }
42
43
import org .apache .kafka .common .record .{MemoryRecords , RecordBatch , SimpleRecord }
43
44
import org .apache .kafka .common .requests .OffsetFetchResponse .PartitionData
@@ -2547,6 +2548,118 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
2547
2548
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false )
2548
2549
}
2549
2550
2551
+ @ ParameterizedTest
2552
+ @ ValueSource (strings = Array (" kraft" ))
2553
+ def testConsumerGroupHeartbeaWithRegex (quorum : String ): Unit = {
2554
+ createTopicWithBrokerPrincipal(topic)
2555
+ val allowAllOpsAcl = new AccessControlEntry (clientPrincipalString, WILDCARD_HOST , ALL , ALLOW )
2556
+ addAndVerifyAcls(Set (allowAllOpsAcl), groupResource)
2557
+ addAndVerifyAcls(Set (allowAllOpsAcl), topicResource)
2558
+
2559
+ val response = sendAndReceiveFirstRegexHeartbeat(Uuid .randomUuid.toString, listenerName)
2560
+ sendAndReceiveRegexHeartbeat(response, listenerName, Some (1 ))
2561
+ }
2562
+
2563
+ @ ParameterizedTest
2564
+ @ ValueSource (strings = Array (" kraft" ))
2565
+ def testConsumerGroupHeartbeaWithRegexWithoutTopicDescribeAcl (quorum : String ): Unit = {
2566
+ createTopicWithBrokerPrincipal(topic)
2567
+ val allowAllOpsAcl = new AccessControlEntry (clientPrincipalString, WILDCARD_HOST , ALL , ALLOW )
2568
+ addAndVerifyAcls(Set (allowAllOpsAcl), groupResource)
2569
+
2570
+ val response = sendAndReceiveFirstRegexHeartbeat(Uuid .randomUuid.toString, listenerName)
2571
+ sendAndReceiveRegexHeartbeat(response, listenerName, None )
2572
+ }
2573
+
2574
+ @ ParameterizedTest
2575
+ @ ValueSource (strings = Array (" kraft" ))
2576
+ def testConsumerGroupHeartbeaWithRegexWithDifferentMemberAcls (quorum : String ): Unit = {
2577
+ createTopicWithBrokerPrincipal(topic, numPartitions = 2 )
2578
+ val allowAllOpsAcl = new AccessControlEntry (clientPrincipalString, WILDCARD_HOST , ALL , ALLOW )
2579
+ addAndVerifyAcls(Set (allowAllOpsAcl), groupResource)
2580
+
2581
+ // Member on inter-broker listener has all access and is assigned the matching topic
2582
+ var member1Response = sendAndReceiveFirstRegexHeartbeat(" memberWithAllAccess" , interBrokerListenerName)
2583
+ member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some (2 ))
2584
+
2585
+ // Member on client listener has no topic describe access, but is assigned a partition of the
2586
+ // unauthorized topic. This is leaking unauthorized topic metadata to member2. Simply filtering out
2587
+ // the topic from the assignment in the response is not sufficient since different assignment states
2588
+ // in the broker and client can lead to other issues. This needs to be fixed properly by using
2589
+ // member permissions while computing assignments.
2590
+ var member2Response = sendAndReceiveFirstRegexHeartbeat(" memberWithLimitedAccess" , listenerName)
2591
+ member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some (1 ))
2592
+ member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, None , fullRequest = true )
2593
+ member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some (1 ))
2594
+
2595
+ // Create another topic and send heartbeats on member1 to trigger regex refresh
2596
+ createTopicWithBrokerPrincipal(" topic2" , numPartitions = 2 )
2597
+ TestUtils .retry(15000 ) {
2598
+ member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some (2 ))
2599
+ }
2600
+ // This is leaking unauthorized topic metadata to member2.
2601
+ member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some (2 ))
2602
+
2603
+ // Create another topic and send heartbeats on member2 to trigger regex refresh
2604
+ createTopicWithBrokerPrincipal(" topic3" , numPartitions = 2 )
2605
+ TestUtils .retry(15000 ) {
2606
+ member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some (0 ), fullRequest = true )
2607
+ }
2608
+ // This removes all topics from member1 since member2's permissions were used to refresh regex.
2609
+ sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some (0 ), fullRequest = true )
2610
+ }
2611
+
2612
+ private def sendAndReceiveFirstRegexHeartbeat (memberId : String ,
2613
+ listenerName : ListenerName ): ConsumerGroupHeartbeatResponseData = {
2614
+ val request = new ConsumerGroupHeartbeatRequest .Builder (
2615
+ new ConsumerGroupHeartbeatRequestData ()
2616
+ .setGroupId(group)
2617
+ .setMemberId(memberId)
2618
+ .setMemberEpoch(0 )
2619
+ .setRebalanceTimeoutMs(5 * 60 * 1000 )
2620
+ .setTopicPartitions(Collections .emptyList())
2621
+ .setSubscribedTopicRegex(" ^top.*" )).build()
2622
+ val resource = Set [ResourceType ](GROUP , TOPIC )
2623
+ val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true , listenerName = listenerName)
2624
+ .data.asInstanceOf [ConsumerGroupHeartbeatResponseData ]
2625
+ assertEquals(Errors .NONE .code, response.errorCode, s " Unexpected response $response" )
2626
+ assertEquals(0 , response.assignment.topicPartitions.size, s " Unexpected assignment $response" )
2627
+ response
2628
+ }
2629
+
2630
+ private def sendAndReceiveRegexHeartbeat (lastResponse : ConsumerGroupHeartbeatResponseData ,
2631
+ listenerName : ListenerName ,
2632
+ expectedAssignmentSize : Option [Int ],
2633
+ fullRequest : Boolean = false ): ConsumerGroupHeartbeatResponseData = {
2634
+ var data = new ConsumerGroupHeartbeatRequestData ()
2635
+ .setGroupId(group)
2636
+ .setMemberId(lastResponse.memberId)
2637
+ .setMemberEpoch(lastResponse.memberEpoch)
2638
+ if (fullRequest) {
2639
+ val partitions = Option (lastResponse.assignment).map(_.topicPartitions.asScala.map(p =>
2640
+ new ConsumerGroupHeartbeatRequestData .TopicPartitions ()
2641
+ .setTopicId(p.topicId)
2642
+ .setPartitions(p.partitions)
2643
+ )).getOrElse(List ())
2644
+ data = data
2645
+ .setTopicPartitions(partitions.asJava)
2646
+ .setSubscribedTopicRegex(" ^top.*" )
2647
+ }
2648
+ val request = new ConsumerGroupHeartbeatRequest .Builder (data).build()
2649
+ val resource = Set [ResourceType ](GROUP , TOPIC )
2650
+ val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true , listenerName = listenerName)
2651
+ .data.asInstanceOf [ConsumerGroupHeartbeatResponseData ]
2652
+ assertEquals(Errors .NONE .code, response.errorCode, s " Unexpected response $response" )
2653
+ expectedAssignmentSize match {
2654
+ case Some (size) =>
2655
+ assertNotNull(response.assignment, s " Unexpected assignment $response" )
2656
+ assertEquals(size, response.assignment.topicPartitions.asScala.map(_.partitions.size).sum, s " Unexpected assignment $response" )
2657
+ case None =>
2658
+ assertNull(response.assignment, s " Unexpected assignment $response" )
2659
+ }
2660
+ response
2661
+ }
2662
+
2550
2663
private def createConsumerGroupToDescribe (): Unit = {
2551
2664
createTopicWithBrokerPrincipal(topic)
2552
2665
addAndVerifyAcls(Set (new AccessControlEntry (clientPrincipalString, WILDCARD_HOST , READ , ALLOW )), groupResource)
@@ -2651,9 +2764,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
2651
2764
resources : Set [ResourceType ],
2652
2765
isAuthorized : Boolean ,
2653
2766
topicExists : Boolean = true ,
2654
- topicNames : Map [Uuid , String ] = getTopicNames()): AbstractResponse = {
2767
+ topicNames : Map [Uuid , String ] = getTopicNames(),
2768
+ listenerName : ListenerName = listenerName): AbstractResponse = {
2655
2769
val apiKey = request.apiKey
2656
- val response = connectAndReceive[AbstractResponse ](request)
2770
+ val response = connectAndReceive[AbstractResponse ](request, listenerName = listenerName )
2657
2771
val error = requestKeyToError(topicNames, request.version())(apiKey).asInstanceOf [AbstractResponse => Errors ](response)
2658
2772
2659
2773
val authorizationErrors = resources.flatMap { resourceType =>
0 commit comments