Skip to content

Commit ad6da8c

Browse files
Make the pattern nullable
1 parent 98ae820 commit ad6da8c

File tree

5 files changed

+26
-7
lines changed

5 files changed

+26
-7
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ public ListTransactionsRequest.Builder buildBatchedRequest(
7575
.map(TransactionState::toString)
7676
.collect(Collectors.toList()));
7777
request.setDurationFilter(options.filteredDuration());
78-
request.setTransactionalIdPatternFilter(options.filteredTransactionalIdPattern());
78+
if (!options.filteredTransactionalIdPattern().isEmpty()) {
79+
request.setTransactionalIdPatternFilter(options.filteredTransactionalIdPattern());
80+
}
7981
return new ListTransactionsRequest.Builder(request);
8082
}
8183

Diff for: clients/src/main/resources/common/message/ListTransactionsRequest.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
{ "name": "DurationFilter", "type": "int64", "versions": "1+", "default": -1,
3434
"about": "Duration (in millis) to filter by: if < 0, all transactions will be returned; otherwise, only transactions running longer than this duration will be returned."
3535
},
36-
{ "name": "TransactionalIdPatternFilter", "type": "string", "versions": "2+",
37-
"about": "The transactional ID regular expression pattern to filter by: if empty, all transactions are returned; If non-empty, then only the transactions matching with the pattern will be returned."
36+
{ "name": "TransactionalIdPatternFilter", "type": "string", "versions": "2+", "nullableVersions": "2+", "default": "null",
37+
"about": "The transactional ID regular expression pattern to filter by: if null, all transactions are returned; otherwise, then only the transactions matching with the pattern will be returned."
3838
}
3939
]
4040
}

Diff for: clients/src/test/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandlerTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static java.util.Collections.singleton;
4444
import static org.junit.jupiter.api.Assertions.assertEquals;
4545
import static org.junit.jupiter.api.Assertions.assertNotNull;
46+
import static org.junit.jupiter.api.Assertions.assertNull;
4647
import static org.junit.jupiter.api.Assertions.assertThrows;
4748

4849
public class ListTransactionsHandlerTest {
@@ -100,6 +101,18 @@ public void testBuildRequestWithFilteredTransactionalIdPattern() {
100101
assertEquals(Collections.emptyList(), request.data().stateFilters());
101102
}
102103

104+
@Test
105+
public void testBuildRequestWithEmptyFilteredTransactionalIdPattern() {
106+
int brokerId = 1;
107+
BrokerKey brokerKey = new BrokerKey(OptionalInt.of(brokerId));
108+
String filteredTransactionalIdPattern = "";
109+
ListTransactionsOptions options = new ListTransactionsOptions()
110+
.filterOnTransactionalIdPattern(filteredTransactionalIdPattern);
111+
ListTransactionsHandler handler = new ListTransactionsHandler(options, logContext);
112+
ListTransactionsRequest request = handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build();
113+
assertNull(request.data().transactionalIdPatternFilter());
114+
}
115+
103116
@Test
104117
public void testBuildRequestWithDurationFilter() {
105118
int brokerId = 1;

Diff for: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ class TransactionStateManager(brokerId: Int,
317317
filterProducerIds: Set[Long],
318318
filterStateNames: Set[String],
319319
filterDurationMs: Long,
320-
filteredTransactionalIdPattern: String
320+
filterTransactionalIdPattern: String
321321
): ListTransactionsResponseData = {
322322
inReadLock(stateLock) {
323323
val response = new ListTransactionsResponseData()
@@ -345,8 +345,8 @@ class TransactionStateManager(brokerId: Int,
345345
false
346346
} else if (filterDurationMs >= 0 && (now - txnMetadata.txnStartTimestamp) <= filterDurationMs) {
347347
false
348-
} else if (!filteredTransactionalIdPattern.isEmpty &&
349-
!Pattern.compile(filteredTransactionalIdPattern).matcher(txnMetadata.transactionalId).matches()) {
348+
} else if (!filterTransactionalIdPattern.isEmpty &&
349+
!Pattern.compile(filterTransactionalIdPattern).matcher(txnMetadata.transactionalId).matches()) {
350350
false
351351
} else {
352352
true

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -2472,7 +2472,11 @@ class KafkaApis(val requestChannel: RequestChannel,
24722472
val filteredProducerIds = listTransactionsRequest.data.producerIdFilters.asScala.map(Long.unbox).toSet
24732473
val filteredStates = listTransactionsRequest.data.stateFilters.asScala.toSet
24742474
val durationFilter = listTransactionsRequest.data.durationFilter()
2475-
val transactionalIdPatternFilter = listTransactionsRequest.data.transactionalIdPatternFilter
2475+
val transactionalIdPatternFilter = if (listTransactionsRequest.data.transactionalIdPatternFilter == null) {
2476+
""
2477+
} else {
2478+
listTransactionsRequest.data.transactionalIdPatternFilter
2479+
}
24762480
val response = txnCoordinator.handleListTransactions(
24772481
filteredProducerIds,
24782482
filteredStates,

0 commit comments

Comments
 (0)