KAFKA-19183: Remove kafka.utils.Pool and refactor usages #19531
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Problem
The
kafka.utils.Pool
class is a thin Scala wrapper aroundjava.util.concurrent.ConcurrentHashMap
. While it provides a convenientgetAndMaybePut
method, the underlying functionality is largely replicated byConcurrentHashMap
's standard methods, particularlycomputeIfAbsent
.Analysis / Motivation
As discussed in KAFKA-19183, the
Pool
class adds an unnecessary layer of abstraction. The primary addition over rawConcurrentHashMap
is thegetAndMaybePut
method, which can be effectively replaced withConcurrentHashMap.computeIfAbsent
.While
Pool
allows defining a default value factory at construction, analysis showed this feature is only used in a limited number of locations. These usages can be easily refactored to pass the value creation logic explicitly tocomputeIfAbsent
at the call site.Removing the
Pool
class:Changes
Removal of Files
kafka/utils/Pool.scala
— Removedkafka/utils/PoolTest.scala
— Removed (tested the now-removedPool
class)Refactoring of Usages
All instances of
kafka.utils.Pool
have been replaced withjava.util.concurrent.ConcurrentHashMap
.Calls to
Pool.getAndMaybePut(key, createValue)
have been replaced with:concurrentHashMap.computeIfAbsent(key, _ => createValue)
Calls to
Pool.getAndMaybePut(key)
(which used the constructor factory) have been replaced with:concurrentHashMap.computeIfAbsent(key, k => valueFactory(k))
where
valueFactory
is now locally defined or accessible in scope.Calls accessing collection views (
.keys
,.values
,.iterator
) have been updated to use Java API methods (.keySet()
,.values()
,.entrySet().iterator()
) followed by.asScala
fromscala.jdk.CollectionConverters
to maintain the Scala collection interface where necessary.Other minor method name updates:
.size
→.size()
.putIfNotExists
→.putIfAbsent
The
addLoadedTransactionsToCache
method signature inTransactionStateManager
has been updated to acceptConcurrentHashMap
instead ofPool
.Testing
PoolTest.scala
was removed.Pool
(e.g.,FetcherLagStats
,TransactionStateManager
,ReplicaManager
) provide sufficient coverage for the refactored code paths.Compatibility
Checklist
PoolTest
, relying on existing component tests)