Skip to content

Commit 2f047be

Browse files
authored
KAFKA-20340 Remove GroupConfigManager dependency from SharePartition (#21861)
`SharePartition` held both a `GroupConfigManager` and a `ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider` already wraps `GroupConfigManager`. This PR removes the redundant `GroupConfigManager` dependency so that `SharePartition` only uses `ShareGroupConfigProvider` for dynamic group configuration lookups, as suggested in [#21627.](#21627 (comment)) Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 153a458 commit 2f047be

7 files changed

Lines changed: 88 additions & 62 deletions

File tree

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import org.apache.kafka.common.record.internal.Record;
4040
import org.apache.kafka.common.record.internal.RecordBatch;
4141
import org.apache.kafka.common.utils.Time;
42-
import org.apache.kafka.coordinator.group.GroupConfig;
43-
import org.apache.kafka.coordinator.group.GroupConfigManager;
4442
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
4543
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
4644
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
@@ -208,11 +206,6 @@ enum SharePartitionState {
208206
*/
209207
private final int defaultMaxDeliveryCount;
210208

211-
/**
212-
* The group config manager is used to retrieve the values for dynamic group configurations
213-
*/
214-
private final GroupConfigManager groupConfigManager;
215-
216209
/**
217210
* The provider used to retrieve share group dynamic configuration values.
218211
*/
@@ -343,11 +336,11 @@ enum SharePartitionState {
343336
Time time,
344337
Persister persister,
345338
ReplicaManager replicaManager,
346-
GroupConfigManager groupConfigManager,
339+
ShareGroupConfigProvider configProvider,
347340
SharePartitionListener listener
348341
) {
349342
this(groupId, topicIdPartition, leaderEpoch, defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
350-
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener,
343+
timer, time, persister, replicaManager, configProvider, SharePartitionState.EMPTY, listener,
351344
new SharePartitionMetrics(groupId, topicIdPartition.topic(), topicIdPartition.partition()));
352345
}
353346

@@ -364,7 +357,7 @@ enum SharePartitionState {
364357
Time time,
365358
Persister persister,
366359
ReplicaManager replicaManager,
367-
GroupConfigManager groupConfigManager,
360+
ShareGroupConfigProvider configProvider,
368361
SharePartitionState sharePartitionState,
369362
SharePartitionListener listener,
370363
SharePartitionMetrics sharePartitionMetrics
@@ -385,8 +378,7 @@ enum SharePartitionState {
385378
this.persister = persister;
386379
this.partitionState = sharePartitionState;
387380
this.replicaManager = replicaManager;
388-
this.groupConfigManager = groupConfigManager;
389-
this.configProvider = new ShareGroupConfigProvider(groupConfigManager);
381+
this.configProvider = configProvider;
390382
this.fetchOffsetMetadata = new OffsetMetadata();
391383
this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition);
392384
this.listener = listener;
@@ -3040,9 +3032,7 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) {
30403032
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
30413033
return partitionDataStartOffset;
30423034
}
3043-
ShareGroupAutoOffsetResetStrategy offsetResetStrategy = groupConfigManager.groupConfig(groupId)
3044-
.map(GroupConfig::shareAutoOffsetReset)
3045-
.orElseGet(GroupConfig::defaultShareAutoOffsetReset);
3035+
ShareGroupAutoOffsetResetStrategy offsetResetStrategy = configProvider.autoOffsetReset(groupId);
30463036

30473037
if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
30483038
return offsetForLatestTimestamp(topicIdPartition, replicaManager, leaderEpoch);

core/src/main/java/kafka/server/share/SharePartitionManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.kafka.common.requests.ShareRequestMetadata;
3232
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
3333
import org.apache.kafka.common.utils.Time;
34-
import org.apache.kafka.coordinator.group.GroupConfigManager;
34+
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
3535
import org.apache.kafka.server.common.ShareVersion;
3636
import org.apache.kafka.server.partition.PartitionListener;
3737
import org.apache.kafka.server.share.CachedSharePartition;
@@ -104,9 +104,9 @@ public class SharePartitionManager implements AutoCloseable {
104104
private final ShareSessionCache cache;
105105

106106
/**
107-
* The group config manager is used to retrieve the values for dynamic group configurations
107+
* The provider used to retrieve share group dynamic configuration values.
108108
*/
109-
private final GroupConfigManager groupConfigManager;
109+
private final ShareGroupConfigProvider configProvider;
110110

111111
/**
112112
* The default record lock duration is the time in milliseconds that a record lock is held for.
@@ -157,7 +157,7 @@ public SharePartitionManager(
157157
int maxInFlightRecords,
158158
long remoteFetchMaxWaitMs,
159159
Persister persister,
160-
GroupConfigManager groupConfigManager,
160+
ShareGroupConfigProvider configProvider,
161161
BrokerTopicStats brokerTopicStats
162162
) {
163163
this(replicaManager,
@@ -169,7 +169,7 @@ public SharePartitionManager(
169169
maxInFlightRecords,
170170
remoteFetchMaxWaitMs,
171171
persister,
172-
groupConfigManager,
172+
configProvider,
173173
new ShareGroupMetrics(time),
174174
brokerTopicStats
175175
);
@@ -185,7 +185,7 @@ private SharePartitionManager(
185185
int maxInFlightRecords,
186186
long remoteFetchMaxWaitMs,
187187
Persister persister,
188-
GroupConfigManager groupConfigManager,
188+
ShareGroupConfigProvider configProvider,
189189
ShareGroupMetrics shareGroupMetrics,
190190
BrokerTopicStats brokerTopicStats
191191
) {
@@ -200,7 +200,7 @@ private SharePartitionManager(
200200
maxInFlightRecords,
201201
remoteFetchMaxWaitMs,
202202
persister,
203-
groupConfigManager,
203+
configProvider,
204204
shareGroupMetrics,
205205
brokerTopicStats
206206
);
@@ -218,7 +218,7 @@ private SharePartitionManager(
218218
int maxInFlightRecords,
219219
long remoteFetchMaxWaitMs,
220220
Persister persister,
221-
GroupConfigManager groupConfigManager,
221+
ShareGroupConfigProvider configProvider,
222222
ShareGroupMetrics shareGroupMetrics,
223223
BrokerTopicStats brokerTopicStats
224224
) {
@@ -232,7 +232,7 @@ private SharePartitionManager(
232232
this.maxInFlightRecords = maxInFlightRecords;
233233
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
234234
this.persister = persister;
235-
this.groupConfigManager = groupConfigManager;
235+
this.configProvider = configProvider;
236236
this.shareGroupMetrics = shareGroupMetrics;
237237
this.brokerTopicStats = brokerTopicStats;
238238
this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
@@ -718,7 +718,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
718718
time,
719719
persister,
720720
replicaManager,
721-
groupConfigManager,
721+
configProvider,
722722
listener
723723
);
724724
});

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
3636
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, CoordinatorRecord}
3737
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
3838
import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
39+
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider
3940
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics}
4041
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
4142
import org.apache.kafka.coordinator.transaction.ProducerIdManager
@@ -456,7 +457,7 @@ class BrokerServer(
456457
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
457458
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
458459
persister,
459-
groupConfigManager,
460+
new ShareGroupConfigProvider(groupConfigManager),
460461
brokerTopicStats
461462
)
462463

core/src/test/java/kafka/server/share/SharePartitionManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
5252
import org.apache.kafka.common.utils.Time;
5353
import org.apache.kafka.coordinator.group.GroupConfigManager;
54+
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
5455
import org.apache.kafka.server.common.ShareVersion;
5556
import org.apache.kafka.server.purgatory.DelayedOperationKey;
5657
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -3287,7 +3288,7 @@ public SharePartitionManager build() {
32873288
MAX_IN_FLIGHT_MESSAGES,
32883289
REMOTE_FETCH_MAX_WAIT_MS,
32893290
persister,
3290-
mock(GroupConfigManager.class),
3291+
new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
32913292
shareGroupMetrics,
32923293
brokerTopicStats
32933294
);

0 commit comments

Comments
 (0)