Skip to content

Commit 598eb13

Browse files
authored
KAFKA-15370: ACL changes to support 2PC (KIP-939) (#19364)
This patch adds ACL support for 2PC as a part of KIP-939 A new value will be added to the enum AclOperation: TWO_PHASE_COMMIT ((byte) 15 . When InitProducerId comes with enable2Pc=true, it would have to have both WRITE and TWO_PHASE_COMMIT operation enabled on the transactional id resource. The kafka-acls.sh tool is going to support a new --operation TwoPhaseCommit. Reviewers: Artem Livshits <[email protected]>, PoAn Yang <[email protected]>, Justine Olshan <[email protected]>
1 parent f737ef3 commit 598eb13

File tree

7 files changed

+149
-7
lines changed

7 files changed

+149
-7
lines changed

Diff for: clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,12 @@ public enum AclOperation {
112112
/**
113113
* DESCRIBE_TOKENS operation.
114114
*/
115-
DESCRIBE_TOKENS((byte) 14);
115+
DESCRIBE_TOKENS((byte) 14),
116+
117+
/**
118+
* TWO_PHASE_COMMIT operation.
119+
*/
120+
TWO_PHASE_COMMIT((byte) 15);
116121

117122
// Note: we cannot have more than 30 ACL operations without modifying the format used
118123
// to describe ACL operations in MetadataResponse.

Diff for: clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ private static class AclOperationTestInfo {
5050
new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false),
5151
new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false),
5252
new AclOperationTestInfo(AclOperation.CREATE_TOKENS, 13, "create_tokens", false),
53-
new AclOperationTestInfo(AclOperation.DESCRIBE_TOKENS, 14, "describe_tokens", false)
53+
new AclOperationTestInfo(AclOperation.DESCRIBE_TOKENS, 14, "describe_tokens", false),
54+
new AclOperationTestInfo(AclOperation.TWO_PHASE_COMMIT, 15, "two_phase_commit", false)
5455
};
5556

5657
@Test

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+4
Original file line numberDiff line numberDiff line change
@@ -1527,6 +1527,10 @@ class KafkaApis(val requestChannel: RequestChannel,
15271527
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
15281528
return
15291529
}
1530+
if (initProducerIdRequest.enable2Pc() && !authHelper.authorize(request.context, TWO_PHASE_COMMIT, TRANSACTIONAL_ID, transactionalId)) {
1531+
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
1532+
return
1533+
}
15301534
} else if (!authHelper.authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME, true, false)
15311535
&& !authHelper.authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) {
15321536
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)

Diff for: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,9 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
395395
@ValueSource(strings = Array(KRAFT))
396396
def testAclInheritance(quorum: String): Unit = {
397397
testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
398-
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS))
398+
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
399399
testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
400-
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS))
400+
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
401401
testImplicationsOfAllow(READ, Set(DESCRIBE))
402402
testImplicationsOfAllow(WRITE, Set(DESCRIBE))
403403
testImplicationsOfAllow(DELETE, Set(DESCRIBE))

Diff for: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+130
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,136 @@ class KafkaApisTest extends Logging {
18021802
}
18031803
}
18041804

1805+
@Test
1806+
def testInitProducerIdWithEnable2PcFailsWithoutTwoPhaseCommitAcl(): Unit = {
1807+
val transactionalId = "txnId"
1808+
addTopicToMetadataCache("topic", numPartitions = 1)
1809+
1810+
val initProducerIdRequest = new InitProducerIdRequest.Builder(
1811+
new InitProducerIdRequestData()
1812+
.setTransactionalId(transactionalId)
1813+
.setTransactionTimeoutMs(TimeUnit.MINUTES.toMillis(15).toInt)
1814+
.setEnable2Pc(true)
1815+
.setProducerId(RecordBatch.NO_PRODUCER_ID)
1816+
.setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
1817+
).build(6.toShort) // Use version 6 which supports enable2Pc
1818+
1819+
val request = buildRequest(initProducerIdRequest)
1820+
val requestLocal = RequestLocal.withThreadConfinedCaching
1821+
val authorizer: Authorizer = mock(classOf[Authorizer])
1822+
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
1823+
1824+
// Allow WRITE but deny TWO_PHASE_COMMIT
1825+
when(authorizer.authorize(
1826+
any(),
1827+
ArgumentMatchers.eq(Collections.singletonList(new Action(
1828+
AclOperation.WRITE,
1829+
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL),
1830+
1,
1831+
true,
1832+
true)))
1833+
)).thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
1834+
1835+
when(authorizer.authorize(
1836+
any(),
1837+
ArgumentMatchers.eq(Collections.singletonList(new Action(
1838+
AclOperation.TWO_PHASE_COMMIT,
1839+
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL),
1840+
1,
1841+
true,
1842+
true)))
1843+
)).thenReturn(Collections.singletonList(AuthorizationResult.DENIED))
1844+
1845+
val capturedResponse = ArgumentCaptor.forClass(classOf[InitProducerIdResponse])
1846+
1847+
kafkaApis.handleInitProducerIdRequest(request, requestLocal)
1848+
1849+
verify(requestChannel).sendResponse(
1850+
ArgumentMatchers.eq(request),
1851+
capturedResponse.capture(),
1852+
ArgumentMatchers.eq(None)
1853+
)
1854+
1855+
assertEquals(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code, capturedResponse.getValue.data.errorCode)
1856+
}
1857+
1858+
@Test
1859+
def testInitProducerIdWithEnable2PcSucceedsWithTwoPhaseCommitAcl(): Unit = {
1860+
val transactionalId = "txnId"
1861+
addTopicToMetadataCache("topic", numPartitions = 1)
1862+
1863+
val initProducerIdRequest = new InitProducerIdRequest.Builder(
1864+
new InitProducerIdRequestData()
1865+
.setTransactionalId(transactionalId)
1866+
.setTransactionTimeoutMs(TimeUnit.MINUTES.toMillis(15).toInt)
1867+
.setEnable2Pc(true)
1868+
.setProducerId(RecordBatch.NO_PRODUCER_ID)
1869+
.setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
1870+
).build(6.toShort) // Use version 6 which supports enable2Pc
1871+
1872+
val request = buildRequest(initProducerIdRequest)
1873+
val requestLocal = RequestLocal.withThreadConfinedCaching
1874+
val authorizer: Authorizer = mock(classOf[Authorizer])
1875+
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
1876+
1877+
// Both permissions are allowed
1878+
when(authorizer.authorize(
1879+
any(),
1880+
ArgumentMatchers.eq(Collections.singletonList(new Action(
1881+
AclOperation.WRITE,
1882+
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL),
1883+
1,
1884+
true,
1885+
true)))
1886+
)).thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
1887+
1888+
when(authorizer.authorize(
1889+
any(),
1890+
ArgumentMatchers.eq(Collections.singletonList(new Action(
1891+
AclOperation.TWO_PHASE_COMMIT,
1892+
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL),
1893+
1,
1894+
true,
1895+
true)))
1896+
)).thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
1897+
1898+
val responseCallback = ArgumentCaptor.forClass(classOf[InitProducerIdResult => Unit])
1899+
1900+
when(txnCoordinator.handleInitProducerId(
1901+
ArgumentMatchers.eq(transactionalId),
1902+
anyInt(),
1903+
ArgumentMatchers.eq(true), // enable2Pc = true
1904+
anyBoolean(),
1905+
any(),
1906+
responseCallback.capture(),
1907+
ArgumentMatchers.eq(requestLocal)
1908+
)).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(15L, 0.toShort, Errors.NONE)))
1909+
1910+
kafkaApis.handleInitProducerIdRequest(request, requestLocal)
1911+
1912+
// Verify coordinator was called with enable2Pc=true
1913+
verify(txnCoordinator).handleInitProducerId(
1914+
ArgumentMatchers.eq(transactionalId),
1915+
anyInt(),
1916+
ArgumentMatchers.eq(true), // enable2Pc = true
1917+
anyBoolean(),
1918+
any(),
1919+
any(),
1920+
ArgumentMatchers.eq(requestLocal)
1921+
)
1922+
1923+
val capturedResponse = ArgumentCaptor.forClass(classOf[InitProducerIdResponse])
1924+
verify(requestChannel).sendResponse(
1925+
ArgumentMatchers.eq(request),
1926+
capturedResponse.capture(),
1927+
ArgumentMatchers.eq(None)
1928+
)
1929+
1930+
assertEquals(Errors.NONE.code, capturedResponse.getValue.data.errorCode)
1931+
assertEquals(15L, capturedResponse.getValue.data.producerId)
1932+
assertEquals(0, capturedResponse.getValue.data.producerEpoch)
1933+
}
1934+
18051935
@Test
18061936
def testBatchedAddPartitionsToTxnRequest(): Unit = {
18071937
val topic = "topic"

Diff for: server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
3939
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
4040
import static org.apache.kafka.common.acl.AclOperation.READ;
41+
import static org.apache.kafka.common.acl.AclOperation.TWO_PHASE_COMMIT;
4142
import static org.apache.kafka.common.acl.AclOperation.WRITE;
4243

4344
public class AclEntry {
@@ -59,7 +60,7 @@ public static Set<AclOperation> supportedOperations(ResourceType resourceType) {
5960
case CLUSTER:
6061
return new HashSet<>(Arrays.asList(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE));
6162
case TRANSACTIONAL_ID:
62-
return new HashSet<>(Arrays.asList(DESCRIBE, WRITE));
63+
return new HashSet<>(Arrays.asList(DESCRIBE, WRITE, TWO_PHASE_COMMIT));
6364
case DELEGATION_TOKEN:
6465
return Set.of(DESCRIBE);
6566
case USER:

Diff for: tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
6666
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
6767
import static org.apache.kafka.common.acl.AclOperation.READ;
68+
import static org.apache.kafka.common.acl.AclOperation.TWO_PHASE_COMMIT;
6869
import static org.apache.kafka.common.acl.AclOperation.WRITE;
6970
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
7071
import static org.apache.kafka.common.acl.AclPermissionType.DENY;
@@ -160,8 +161,8 @@ public class AclCommandTest {
160161
Set.of(READ, DESCRIBE, DELETE),
161162
List.of(OPERATION, "Read", OPERATION, "Describe", OPERATION, "Delete")),
162163
TRANSACTIONAL_ID_RESOURCES, Map.entry(
163-
Set.of(DESCRIBE, WRITE),
164-
List.of(OPERATION, "Describe", OPERATION, "Write")),
164+
Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT),
165+
List.of(OPERATION, "Describe", OPERATION, "Write", OPERATION, "TwoPhaseCommit")),
165166
TOKEN_RESOURCES, Map.entry(
166167
Set.of(DESCRIBE),
167168
List.of(OPERATION, "Describe")),

0 commit comments

Comments
 (0)