Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 936f849

Browse files
eolivelligaoran10
authored andcommitted
[transactions] Implement KIP-664 listTransactions (#76)
(cherry picked from commit 5ef4a85)
1 parent 844adb4 commit 936f849

File tree

9 files changed

+278
-29
lines changed

9 files changed

+278
-29
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

+6
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
325325
case LIST_GROUPS:
326326
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
327327
break;
328+
case LIST_TRANSACTIONS:
329+
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
330+
break;
328331
case DELETE_GROUPS:
329332
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
330333
break;
@@ -583,6 +586,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
583586
protected abstract void
584587
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
585588

589+
protected abstract void
590+
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
591+
586592
protected abstract void
587593
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);
588594

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
3333
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
3434
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
35+
import io.streamnative.pulsar.handlers.kop.scala.Either;
3536
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
3637
import io.streamnative.pulsar.handlers.kop.security.Session;
3738
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
@@ -127,6 +128,7 @@
127128
import org.apache.kafka.common.message.LeaveGroupRequestData;
128129
import org.apache.kafka.common.message.ListOffsetsRequestData;
129130
import org.apache.kafka.common.message.ListOffsetsResponseData;
131+
import org.apache.kafka.common.message.ListTransactionsResponseData;
130132
import org.apache.kafka.common.message.OffsetCommitRequestData;
131133
import org.apache.kafka.common.message.ProduceRequestData;
132134
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
@@ -175,6 +177,8 @@
175177
import org.apache.kafka.common.requests.ListOffsetRequestV0;
176178
import org.apache.kafka.common.requests.ListOffsetsRequest;
177179
import org.apache.kafka.common.requests.ListOffsetsResponse;
180+
import org.apache.kafka.common.requests.ListTransactionsRequest;
181+
import org.apache.kafka.common.requests.ListTransactionsResponse;
178182
import org.apache.kafka.common.requests.MetadataRequest;
179183
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
180184
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
@@ -2017,8 +2021,26 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
20172021
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
20182022
CompletableFuture<AbstractResponse> resultFuture) {
20192023
checkArgument(listGroups.getRequest() instanceof ListGroupsRequest);
2020-
KeyValue<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
2021-
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue()));
2024+
Either<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
2025+
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult));
2026+
}
2027+
2028+
@Override
2029+
protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions,
2030+
CompletableFuture<AbstractResponse> resultFuture) {
2031+
checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest);
2032+
ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest();
2033+
List<String> stateFilters = request.data().stateFilters();
2034+
if (stateFilters == null) {
2035+
stateFilters = Collections.emptyList();
2036+
}
2037+
List<Long> producerIdFilters = request.data().producerIdFilters();
2038+
if (producerIdFilters == null) {
2039+
producerIdFilters = Collections.emptyList();
2040+
}
2041+
ListTransactionsResponseData listResult = getTransactionCoordinator()
2042+
.handleListTransactions(stateFilters, producerIdFilters);
2043+
resultFuture.complete(new ListTransactionsResponse(listResult));
20222044
}
20232045

20242046
@Override

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview;
3030
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
3131
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
32+
import io.streamnative.pulsar.handlers.kop.scala.Either;
3233
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
3334
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey;
3435
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey;
@@ -832,22 +833,16 @@ public KeyValue<Errors, Map<TopicPartition, PartitionData>> handleFetchOffsets(
832833
);
833834
}
834835

835-
public KeyValue<Errors, List<GroupOverview>> handleListGroups() {
836+
public Either<Errors, List<GroupOverview>> handleListGroups() {
836837
if (!isActive.get()) {
837-
return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>());
838+
return Either.left(Errors.COORDINATOR_NOT_AVAILABLE);
838839
} else {
839-
Errors errors;
840840
if (groupManager.isLoading()) {
841-
errors = Errors.COORDINATOR_LOAD_IN_PROGRESS;
842-
} else {
843-
errors = Errors.NONE;
841+
return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS);
844842
}
845843
List<GroupOverview> overviews = new ArrayList<>();
846844
groupManager.currentGroups().forEach(group -> overviews.add(group.overview()));
847-
return new KeyValue<>(
848-
errors,
849-
overviews
850-
);
845+
return Either.right(overviews);
851846
}
852847
}
853848

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java

+16
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.streamnative.pulsar.handlers.kop.storage.PulsarPartitionedTopicProducerStateManagerSnapshotBuffer;
3333
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
3434
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
35+
import java.util.List;
3536
import java.util.Optional;
3637
import java.util.Set;
3738
import java.util.concurrent.CompletableFuture;
@@ -52,6 +53,7 @@
5253
import org.apache.commons.lang3.StringUtils;
5354
import org.apache.kafka.common.TopicPartition;
5455
import org.apache.kafka.common.internals.Topic;
56+
import org.apache.kafka.common.message.ListTransactionsResponseData;
5557
import org.apache.kafka.common.protocol.Errors;
5658
import org.apache.kafka.common.record.RecordBatch;
5759
import org.apache.kafka.common.requests.TransactionResult;
@@ -80,6 +82,8 @@ public class TransactionCoordinator {
8082

8183
private final Time time;
8284

85+
private final AtomicBoolean isActive = new AtomicBoolean(false);
86+
8387
private static final BiConsumer<TransactionStateManager.TransactionalIdAndProducerIdEpoch, Errors>
8488
onEndTransactionComplete =
8589
(txnIdAndPidEpoch, errors) -> {
@@ -218,6 +222,17 @@ public static String getTopicPartitionName(String topicPartitionName, int partit
218222
return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId;
219223
}
220224

225+
public ListTransactionsResponseData handleListTransactions(List<String> filteredStates,
226+
List<Long> filteredProducerIds) {
227+
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
228+
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259
229+
if (!isActive.get()) {
230+
log.warn("The transaction coordinator is not active, so it will reject list transaction request");
231+
return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code());
232+
}
233+
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
234+
}
235+
221236
@Data
222237
@EqualsAndHashCode
223238
@AllArgsConstructor
@@ -956,6 +971,7 @@ public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration)
956971

957972
return this.producerIdManager.initialize().thenCompose(ignored -> {
958973
log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata);
974+
isActive.set(true);
959975
return CompletableFuture.completedFuture(null);
960976
});
961977
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java

+22
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,26 @@ public boolean isExpirationAllowed() {
117117
return false;
118118
}
119119
}
120+
121+
public org.apache.kafka.clients.admin.TransactionState toAdminState() {
122+
switch (this) {
123+
case EMPTY:
124+
return org.apache.kafka.clients.admin.TransactionState.EMPTY;
125+
case ONGOING:
126+
return org.apache.kafka.clients.admin.TransactionState.ONGOING;
127+
case PREPARE_COMMIT:
128+
return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT;
129+
case PREPARE_ABORT:
130+
return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT;
131+
case COMPLETE_COMMIT:
132+
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT;
133+
case COMPLETE_ABORT:
134+
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT;
135+
case PREPARE_EPOCH_FENCE:
136+
return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE;
137+
case DEAD:
138+
default:
139+
return org.apache.kafka.clients.admin.TransactionState.UNKNOWN;
140+
}
141+
}
120142
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java

+67
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.ByteBuffer;
2626
import java.util.ArrayList;
2727
import java.util.HashMap;
28+
import java.util.HashSet;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Optional;
@@ -40,6 +41,7 @@
4041
import lombok.extern.slf4j.Slf4j;
4142
import org.apache.bookkeeper.common.concurrent.FutureUtils;
4243
import org.apache.kafka.common.TopicPartition;
44+
import org.apache.kafka.common.message.ListTransactionsResponseData;
4345
import org.apache.kafka.common.protocol.Errors;
4446
import org.apache.kafka.common.protocol.types.SchemaException;
4547
import org.apache.kafka.common.requests.ProduceResponse;
@@ -245,6 +247,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs
245247
<= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs());
246248
}
247249

250+
private static boolean shouldInclude(TransactionMetadata txnMetadata,
251+
List<Long> filterProducerIds, Set<String> filterStateNames) {
252+
if (txnMetadata.getState() == TransactionState.DEAD) {
253+
// We filter the `Dead` state since it is a transient state which
254+
// indicates that the transactionalId and its metadata are in the
255+
// process of expiration and removal.
256+
return false;
257+
} else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) {
258+
return false;
259+
} else if (!filterStateNames.isEmpty() && !filterStateNames.contains(
260+
txnMetadata.getState().toAdminState().toString())) {
261+
return false;
262+
} else {
263+
return true;
264+
}
265+
}
266+
267+
public ListTransactionsResponseData listTransactionStates(List<Long> filteredProducerIds,
268+
List<String> filteredStates) {
269+
return CoreUtils.inReadLock(stateLock, () -> {
270+
ListTransactionsResponseData response = new ListTransactionsResponseData();
271+
if (!loadingPartitions.isEmpty()) {
272+
response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
273+
} else {
274+
Set<String> filterStates = new HashSet<>();
275+
for (TransactionState stateName : TransactionState.values()) {
276+
String nameForTheClient = stateName.toAdminState().toString();
277+
if (filteredStates.contains(nameForTheClient)) {
278+
filterStates.add(nameForTheClient);
279+
} else {
280+
response.unknownStateFilters().add(nameForTheClient);
281+
}
282+
}
283+
List<ListTransactionsResponseData.TransactionState> states = new ArrayList<>();
284+
transactionMetadataCache.forEach((__, cache) -> {
285+
cache.values().forEach(txnMetadata -> {
286+
txnMetadata.inLock(() -> {
287+
// use toString() to get the name of the state according to the protocol
288+
ListTransactionsResponseData.TransactionState transactionState =
289+
new ListTransactionsResponseData.TransactionState()
290+
.setTransactionalId(txnMetadata.getTransactionalId())
291+
.setProducerId(txnMetadata.getProducerId())
292+
.setTransactionState(txnMetadata.getState().toAdminState().toString());
293+
294+
if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) {
295+
if (log.isDebugEnabled()) {
296+
log.debug("add transaction state: {}", transactionState);
297+
}
298+
states.add(transactionState);
299+
} else {
300+
if (log.isDebugEnabled()) {
301+
log.debug("Skip transaction state: {}", transactionState);
302+
}
303+
}
304+
return null;
305+
});
306+
});
307+
});
308+
response.setErrorCode(Errors.NONE.code())
309+
.setTransactionStates(states);
310+
}
311+
return response;
312+
});
313+
}
314+
248315
@Data
249316
@AllArgsConstructor
250317
private static class TransactionalIdCoordinatorEpochAndMetadata {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.streamnative.pulsar.handlers.kop.ApiVersion;
1717
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
18+
import io.streamnative.pulsar.handlers.kop.scala.Either;
1819
import java.util.ArrayList;
1920
import java.util.Collections;
2021
import java.util.List;
@@ -269,14 +270,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) {
269270
return new LeaveGroupResponse(data);
270271
}
271272

272-
public static ListGroupsResponse newListGroups(Errors errors,
273-
List<GroupMetadata.GroupOverview> groups) {
273+
public static ListGroupsResponse newListGroups(Either<Errors, List<GroupMetadata.GroupOverview>> results) {
274274
ListGroupsResponseData data = new ListGroupsResponseData();
275-
data.setErrorCode(errors.code());
276-
data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup()
277-
.setGroupId(overView.groupId())
278-
.setProtocolType(overView.protocolType()))
279-
.collect(Collectors.toList()));
275+
data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code());
276+
if (!results.isLeft()) {
277+
data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup()
278+
.setGroupId(overView.groupId())
279+
.setProtocolType(overView.protocolType()))
280+
.collect(Collectors.toList()));
281+
282+
} else {
283+
data.setGroups(Collections.emptyList());
284+
}
280285
return new ListGroupsResponse(data);
281286
}
282287

tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static org.mockito.Mockito.spy;
1717
import static org.testng.Assert.assertEquals;
18+
import static org.testng.Assert.assertFalse;
1819
import static org.testng.Assert.assertNotEquals;
1920
import static org.testng.Assert.assertTrue;
2021
import static org.testng.Assert.fail;
@@ -28,6 +29,7 @@
2829
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
2930
import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary;
3031
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
32+
import io.streamnative.pulsar.handlers.kop.scala.Either;
3133
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
3234
import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer;
3335
import java.util.ArrayList;
@@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception {
218220
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey());
219221

220222
// ListGroups
221-
KeyValue<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
222-
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey());
223+
Either<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
224+
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft());
223225

224226
// DeleteGroups
225227
Map<String, Errors> deleteGroupsErrors = groupCoordinator.handleDeleteGroups(
@@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols()
16951697
).get();
16961698
assertEquals(Errors.NONE, syncGroupResult.getKey());
16971699

1698-
KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1699-
assertEquals(Errors.NONE, groups.getKey());
1700-
assertEquals(1, groups.getValue().size());
1700+
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1701+
assertFalse(groups.isLeft());
1702+
assertEquals(1, groups.getRight().size());
17011703
assertEquals(
17021704
new GroupOverview("groupId", "consumer"),
1703-
groups.getValue().get(0)
1705+
groups.getRight().get(0)
17041706
);
17051707
}
17061708

@@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols()
17121714
);
17131715
assertEquals(Errors.NONE, joinGroupResult.getError());
17141716

1715-
KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1716-
assertEquals(Errors.NONE, groups.getKey());
1717-
assertEquals(1, groups.getValue().size());
1717+
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
1718+
assertFalse(groups.isLeft());
1719+
assertEquals(1, groups.getRight().size());
17181720
assertEquals(
17191721
new GroupOverview("groupId", "consumer"),
1720-
groups.getValue().get(0)
1722+
groups.getRight().get(0)
17211723
);
17221724
}
17231725

0 commit comments

Comments
 (0)