Skip to content

KAFKA-19147: Start authorizer before group coordinator to ensure coordinator authorizes regex topics #19488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ class BrokerServer(
/* start token manager */
tokenManager = new DelegationTokenManager(new DelegationTokenManagerConfigs(config), tokenCache)

// Create and initialize an authorizer if one is configured.
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)

/* initializing the groupConfigManager */
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))

Expand Down Expand Up @@ -412,9 +415,6 @@ class BrokerServer(
logManager.readBrokerEpochFromCleanShutdownFiles()
)

// Create and initialize an authorizer if one is configured.
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)

// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange
val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
Expand Down Expand Up @@ -2547,6 +2548,118 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeaWithRegex(quorum: String): Unit = {
createTopicWithBrokerPrincipal(topic)
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)

val response = sendAndReceiveFirstRegexHeartbeat(Uuid.randomUuid.toString, listenerName)
sendAndReceiveRegexHeartbeat(response, listenerName, Some(1))
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeaWithRegexWithoutTopicDescribeAcl(quorum: String): Unit = {
createTopicWithBrokerPrincipal(topic)
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)

val response = sendAndReceiveFirstRegexHeartbeat(Uuid.randomUuid.toString, listenerName)
sendAndReceiveRegexHeartbeat(response, listenerName, None)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeaWithRegexWithDifferentMemberAcls(quorum: String): Unit = {
createTopicWithBrokerPrincipal(topic, numPartitions = 2)
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)

// Member on inter-broker listener has all access and is assigned the matching topic
var member1Response = sendAndReceiveFirstRegexHeartbeat("memberWithAllAccess", interBrokerListenerName)
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(2))

// Member on client listener has no topic describe access, but is assigned a partition of the
// unauthorized topic. This is leaking unauthorized topic metadata to member2. Simply filtering out
// the topic from the assignment in the response is not sufficient since different assignment states
// in the broker and client can lead to other issues. This needs to be fixed properly by using
// member permissions while computing assignments.
var member2Response = sendAndReceiveFirstRegexHeartbeat("memberWithLimitedAccess", listenerName)
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(1))
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, None, fullRequest = true)
member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some(1))

// Create another topic and send heartbeats on member1 to trigger regex refresh
createTopicWithBrokerPrincipal("topic2", numPartitions = 2)
TestUtils.retry(15000) {
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(2))
}
// This is leaking unauthorized topic metadata to member2.
member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some(2))

// Create another topic and send heartbeats on member2 to trigger regex refresh
createTopicWithBrokerPrincipal("topic3", numPartitions = 2)
TestUtils.retry(15000) {
member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some(0), fullRequest = true)
}
// This removes all topics from member1 since member2's permissions were used to refresh regex.
sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(0), fullRequest = true)
}

private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = {
val request = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(group)
.setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setTopicPartitions(Collections.emptyList())
.setSubscribedTopicRegex("^top.*")).build()
val resource = Set[ResourceType](GROUP, TOPIC)
val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true, listenerName = listenerName)
.data.asInstanceOf[ConsumerGroupHeartbeatResponseData]
assertEquals(Errors.NONE.code, response.errorCode, s"Unexpected response $response")
assertEquals(0, response.assignment.topicPartitions.size, s"Unexpected assignment $response")
response
}

private def sendAndReceiveRegexHeartbeat(lastResponse: ConsumerGroupHeartbeatResponseData,
listenerName: ListenerName,
expectedAssignmentSize: Option[Int],
fullRequest: Boolean = false): ConsumerGroupHeartbeatResponseData = {
var data = new ConsumerGroupHeartbeatRequestData()
.setGroupId(group)
.setMemberId(lastResponse.memberId)
.setMemberEpoch(lastResponse.memberEpoch)
if (fullRequest) {
val partitions = Option(lastResponse.assignment).map(_.topicPartitions.asScala.map(p =>
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(p.topicId)
.setPartitions(p.partitions)
)).getOrElse(List())
data = data
.setTopicPartitions(partitions.asJava)
.setSubscribedTopicRegex("^top.*")
}
val request = new ConsumerGroupHeartbeatRequest.Builder(data).build()
val resource = Set[ResourceType](GROUP, TOPIC)
val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true, listenerName = listenerName)
.data.asInstanceOf[ConsumerGroupHeartbeatResponseData]
assertEquals(Errors.NONE.code, response.errorCode, s"Unexpected response $response")
expectedAssignmentSize match {
case Some(size) =>
assertNotNull(response.assignment, s"Unexpected assignment $response")
assertEquals(size, response.assignment.topicPartitions.asScala.map(_.partitions.size).sum, s"Unexpected assignment $response")
case None =>
assertNull(response.assignment, s"Unexpected assignment $response")
}
response
}

private def createConsumerGroupToDescribe(): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
Expand Down Expand Up @@ -2651,9 +2764,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
resources: Set[ResourceType],
isAuthorized: Boolean,
topicExists: Boolean = true,
topicNames: Map[Uuid, String] = getTopicNames()): AbstractResponse = {
topicNames: Map[Uuid, String] = getTopicNames(),
listenerName: ListenerName = listenerName): AbstractResponse = {
val apiKey = request.apiKey
val response = connectAndReceive[AbstractResponse](request)
val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName)
val error = requestKeyToError(topicNames, request.version())(apiKey).asInstanceOf[AbstractResponse => Errors](response)

val authorizationErrors = resources.flatMap { resourceType =>
Expand Down