-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-19073: add transactional ID pattern filter to ListTransactions #19355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19073: add transactional ID pattern filter to ListTransactions #19355
Conversation
A label of 'needs-attention' was automatically added to this PR in order to raise the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CalvinConfluent thanks for this patch!
@@ -343,6 +345,9 @@ class TransactionStateManager(brokerId: Int, | |||
false | |||
} else if (filterDurationMs >= 0 && (now - txnMetadata.txnStartTimestamp) <= filterDurationMs) { | |||
false | |||
} else if (!filterTransactionalIdPattern.isEmpty && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filterTransactionalIdPattern.nonEmpty
@@ -30,6 +32,9 @@ | |||
}, | |||
{ "name": "DurationFilter", "type": "int64", "versions": "1+", "default": -1, | |||
"about": "Duration (in millis) to filter by: if < 0, all transactions will be returned; otherwise, only transactions running longer than this duration will be returned." | |||
}, | |||
{ "name": "TransactionalIdPatternFilter", "type": "string", "versions": "2+", "nullableVersions": "2+", "default": "null", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add version check to ListTransactionsRequest.Builder#build
?
if (data.transactionalIdPatternFilter() != null && version < 2) {
throw new UnsupportedVersionException("xxxx");
}
@@ -343,6 +345,9 @@ class TransactionStateManager(brokerId: Int, | |||
false | |||
} else if (filterDurationMs >= 0 && (now - txnMetadata.txnStartTimestamp) <= filterDurationMs) { | |||
false | |||
} else if (!filterTransactionalIdPattern.isEmpty && | |||
!Pattern.compile(filterTransactionalIdPattern).matcher(txnMetadata.transactionalId).matches()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we avoid compiling the regex for each transaction id?
@@ -70,6 +71,19 @@ public ListTransactionsOptions filterOnDuration(long durationMs) { | |||
return this; | |||
} | |||
|
|||
/** | |||
* Filter only the transactions that match with the given transactional ID pattern. | |||
* If no filter is specified or if the passed string is empty, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The protocol says if null, all transactions are returned;
Should we align it with null
?
BTW, ConsumerGroupHeartbeatRequest
uses null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CalvinConfluent for this patch, a little comments
ListTransactionsHandler handler = new ListTransactionsHandler(options, logContext); | ||
ListTransactionsRequest request = handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build(); | ||
assertEquals(filteredTransactionalIdPattern, request.data().transactionalIdPatternFilter()); | ||
assertEquals(Collections.emptyList(), request.data().stateFilters()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.emptyList()
can instead of List.of()
ListTransactionsOptions options = new ListTransactionsOptions() | ||
.filterOnTransactionalIdPattern(filteredTransactionalIdPattern); | ||
ListTransactionsHandler handler = new ListTransactionsHandler(options, logContext); | ||
ListTransactionsRequest request = handler.buildBatchedRequest(brokerId, singleton(brokerKey)).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
singleton()
can instead of Set.of()
transactions.put(0, asList( | ||
new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT) | ||
)); | ||
transactions.put(1, singletonList( | ||
new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT) | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asList()
and singletonList
can instead of List.of()
List<String> expectedHeaders = TransactionsCommand.ListTransactionsCommand.HEADERS; | ||
assertEquals(expectedHeaders, table.get(0)); | ||
Set<List<String>> expectedRows = Set.of( | ||
asList("bar", "0", "98765", "PrepareAbort"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asList()
can instead of List.of()
https://issues.apache.org/jira/browse/KAFKA-19073