Skip to content

Commit f2f3110

Browse files
authored
KAFKA-20611: Stitch share group DLQ manager with the main code. (#22372)
* Add ShareGroupDLQManager instance creation code in BrokerServer and pass along the instance to SharePartitionManager to be handed over to SharePartition. NOTE: Merge after #22368 Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com> Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
1 parent 0a3bc5e commit f2f3110

7 files changed

Lines changed: 102 additions & 20 deletions

File tree

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
4343
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
4444
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
45-
import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
4645
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
4746
import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
4847
import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
@@ -330,15 +329,16 @@ enum SharePartitionState {
330329
private long fetchLockIdleDurationMs;
331330

332331
/**
333-
* Reference to the dlq manager implementation.
332+
* Supplier to toggle DLQ support.
334333
*/
335-
private final ShareGroupDLQManager shareGroupDLQ = new NoOpShareGroupDLQManager();
334+
private final Supplier<Boolean> shareGroupDlqEnableSupplier;
336335

337336
/**
338-
* Supplier to toggle dlq support.
337+
* Reference to the DLQ manager implementation.
339338
*/
340-
private final Supplier<Boolean> shareGroupDlqEnableSupplier;
339+
private final ShareGroupDLQManager shareGroupDLQManager;
341340

341+
@SuppressWarnings("ParameterNumber")
342342
SharePartition(
343343
String groupId,
344344
TopicIdPartition topicIdPartition,
@@ -352,11 +352,13 @@ enum SharePartitionState {
352352
ReplicaManager replicaManager,
353353
ShareGroupConfigProvider configProvider,
354354
SharePartitionListener listener,
355-
Supplier<Boolean> shareGroupDlqEnableSupplier
355+
Supplier<Boolean> shareGroupDlqEnableSupplier,
356+
ShareGroupDLQManager shareGroupDLQManager
356357
) {
357358
this(groupId, topicIdPartition, leaderEpoch, defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
358359
timer, time, persister, replicaManager, configProvider, SharePartitionState.EMPTY, listener,
359-
new SharePartitionMetrics(groupId, topicIdPartition.topic(), topicIdPartition.partition()), shareGroupDlqEnableSupplier);
360+
new SharePartitionMetrics(groupId, topicIdPartition.topic(), topicIdPartition.partition()), shareGroupDlqEnableSupplier,
361+
shareGroupDLQManager);
360362
}
361363

362364
// Visible for testing
@@ -376,7 +378,8 @@ enum SharePartitionState {
376378
SharePartitionState sharePartitionState,
377379
SharePartitionListener listener,
378380
SharePartitionMetrics sharePartitionMetrics,
379-
Supplier<Boolean> shareGroupDlqEnableSupplier
381+
Supplier<Boolean> shareGroupDlqEnableSupplier,
382+
ShareGroupDLQManager shareGroupDLQManager
380383
) {
381384
this.groupId = groupId;
382385
this.topicIdPartition = topicIdPartition;
@@ -403,6 +406,7 @@ enum SharePartitionState {
403406
this.registerGaugeMetrics();
404407
this.deliveryCompleteCount = new AtomicInteger(0);
405408
this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
409+
this.shareGroupDLQManager = shareGroupDLQManager;
406410
}
407411

408412
/**
@@ -3337,7 +3341,7 @@ private RecordState recordStateWithDlq(byte ackType) {
33373341
void initiateDLQAndArchive(InFlightState updatedState, long firstOffset,
33383342
long lastOffset, short deliveryCount, Throwable dlqCause) {
33393343
// Step 1: Enqueue to DLQ
3340-
shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
3344+
shareGroupDLQManager.enqueue(new ShareGroupDLQRecordParameter(
33413345
groupId, topicIdPartition, firstOffset, lastOffset,
33423346
Optional.of(deliveryCount), Optional.ofNullable(dlqCause), false
33433347
)).whenComplete((v1, dlqException) -> {

core/src/main/java/kafka/server/share/SharePartitionManager.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.kafka.server.share.context.FinalContext;
4242
import org.apache.kafka.server.share.context.ShareFetchContext;
4343
import org.apache.kafka.server.share.context.ShareSessionContext;
44+
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
4445
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
4546
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
4647
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
@@ -154,6 +155,11 @@ public class SharePartitionManager implements AutoCloseable {
154155
*/
155156
private final Supplier<Boolean> shareGroupDlqEnableSupplier;
156157

158+
/**
159+
* Reference to the DLQ manager implementation.
160+
*/
161+
private final ShareGroupDLQManager shareGroupDLQManager;
162+
157163
public SharePartitionManager(
158164
ReplicaManager replicaManager,
159165
Time time,
@@ -165,7 +171,8 @@ public SharePartitionManager(
165171
Persister persister,
166172
ShareGroupConfigProvider configProvider,
167173
BrokerTopicStats brokerTopicStats,
168-
Supplier<Boolean> shareGroupDlqEnableSupplier
174+
Supplier<Boolean> shareGroupDlqEnableSupplier,
175+
ShareGroupDLQManager shareGroupDLQManager
169176
) {
170177
this(replicaManager,
171178
time,
@@ -179,10 +186,12 @@ public SharePartitionManager(
179186
configProvider,
180187
new ShareGroupMetrics(time),
181188
brokerTopicStats,
182-
shareGroupDlqEnableSupplier
189+
shareGroupDlqEnableSupplier,
190+
shareGroupDLQManager
183191
);
184192
}
185193

194+
@SuppressWarnings("ParameterNumber")
186195
private SharePartitionManager(
187196
ReplicaManager replicaManager,
188197
Time time,
@@ -196,7 +205,8 @@ private SharePartitionManager(
196205
ShareGroupConfigProvider configProvider,
197206
ShareGroupMetrics shareGroupMetrics,
198207
BrokerTopicStats brokerTopicStats,
199-
Supplier<Boolean> shareGroupDlqEnableSupplier
208+
Supplier<Boolean> shareGroupDlqEnableSupplier,
209+
ShareGroupDLQManager shareGroupDLQManager
200210
) {
201211
this(replicaManager,
202212
time,
@@ -212,7 +222,8 @@ private SharePartitionManager(
212222
configProvider,
213223
shareGroupMetrics,
214224
brokerTopicStats,
215-
shareGroupDlqEnableSupplier
225+
shareGroupDlqEnableSupplier,
226+
shareGroupDLQManager
216227
);
217228
}
218229

@@ -232,7 +243,8 @@ private SharePartitionManager(
232243
ShareGroupConfigProvider configProvider,
233244
ShareGroupMetrics shareGroupMetrics,
234245
BrokerTopicStats brokerTopicStats,
235-
Supplier<Boolean> shareGroupDlqEnableSupplier
246+
Supplier<Boolean> shareGroupDlqEnableSupplier,
247+
ShareGroupDLQManager shareGroupDLQManager
236248
) {
237249
this.replicaManager = replicaManager;
238250
this.time = time;
@@ -249,6 +261,7 @@ private SharePartitionManager(
249261
this.brokerTopicStats = brokerTopicStats;
250262
this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
251263
this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
264+
this.shareGroupDLQManager = shareGroupDLQManager;
252265
}
253266

254267
/**
@@ -733,7 +746,8 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
733746
replicaManager,
734747
configProvider,
735748
listener,
736-
shareGroupDlqEnableSupplier
749+
shareGroupDlqEnableSupplier,
750+
shareGroupDLQManager
737751
);
738752
});
739753
}

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
6262
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, LogManager => JLogManager}
6363
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6464
import org.apache.kafka.server.partition.{AlterPartitionManager, DefaultAlterPartitionManager}
65+
import org.apache.kafka.server.share.dlq.{DefaultShareGroupDLQManager, NoOpShareGroupDLQManager, ShareGroupDLQManager}
6566

6667
import java.time.Duration
6768
import java.util
@@ -170,6 +171,8 @@ class BrokerServer(
170171

171172
private var shareGroupTimer: Timer = _
172173

174+
private var shareGroupDLQManager: ShareGroupDLQManager = _
175+
173176
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
174177
lock.lock()
175178
try {
@@ -386,6 +389,9 @@ class BrokerServer(
386389
/* create persister */
387390
persister = createShareStatePersister()
388391

392+
/* create share group DLQ manager */
393+
shareGroupDLQManager = createShareGroupDLQManager()
394+
389395
partitionMetadataClient = createPartitionMetadataClient(metadataCache)
390396

391397
groupCoordinator = createGroupCoordinator()
@@ -463,7 +469,8 @@ class BrokerServer(
463469
persister,
464470
new ShareGroupConfigProvider(groupConfigManager),
465471
brokerTopicStats,
466-
() => ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)).supportsShareGroupDLQ()
472+
() => ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)).supportsShareGroupDLQ(),
473+
shareGroupDLQManager
467474
)
468475

469476
dataPlaneRequestProcessor = new KafkaApis(
@@ -750,6 +757,30 @@ class BrokerServer(
750757
}
751758
}
752759

760+
private def createShareGroupDLQManager(): ShareGroupDLQManager = {
761+
if (config.shareGroupConfig.shareGroupDLQManagerClassName.nonEmpty) {
762+
val klass = Utils.loadClass(config.shareGroupConfig.shareGroupDLQManagerClassName, classOf[Object]).asInstanceOf[Class[ShareGroupDLQManager]]
763+
if (klass.getName.equals(classOf[DefaultShareGroupDLQManager].getName)) {
764+
DefaultShareGroupDLQManager.instance(
765+
NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config, metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager broker=${config.brokerId}]")),
766+
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.partitionFor(key), config.interBrokerListenerName, groupConfigManager),
767+
Time.SYSTEM,
768+
shareGroupTimer
769+
)
770+
} else if (klass.getName.equals(classOf[NoOpShareGroupDLQManager].getName)) {
771+
info("Using no-op share group DLQ manager")
772+
new NoOpShareGroupDLQManager()
773+
} else {
774+
error("Unknown share group DLQ manager specialization specified. ShareGroupDLQManager is only factory-pluggable!")
775+
throw new IllegalArgumentException("Unknown share group DLQ manager specified " + config.shareGroupConfig.shareGroupDLQManagerClassName)
776+
}
777+
} else {
778+
// in case share group DLQ manager class name deliberately empty (key=)
779+
info("Using no-op share group DLQ manager")
780+
new NoOpShareGroupDLQManager()
781+
}
782+
}
783+
753784
protected def createRemoteLogManager(listenerInfo: ListenerInfo): Option[RemoteLogManager] = {
754785
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled) {
755786
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
@@ -883,6 +914,9 @@ class BrokerServer(
883914
if (persister != null)
884915
Utils.swallow(this.logger.underlying, () => persister.stop())
885916

917+
if (shareGroupDLQManager != null)
918+
Utils.swallow(this.logger.underlying, () => shareGroupDLQManager.stop())
919+
886920
Utils.closeQuietly(shareGroupTimer, "share group timer")
887921

888922
if (lifecycleManager != null)

core/src/test/java/kafka/server/share/SharePartitionManagerTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import org.apache.kafka.server.share.context.FinalContext;
6565
import org.apache.kafka.server.share.context.ShareFetchContext;
6666
import org.apache.kafka.server.share.context.ShareSessionContext;
67+
import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
68+
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
6769
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
6870
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
6971
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
@@ -3249,6 +3251,7 @@ static class SharePartitionManagerBuilder {
32493251
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
32503252
private BrokerTopicStats brokerTopicStats;
32513253
private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
3254+
private ShareGroupDLQManager shareGroupDLQManager = new NoOpShareGroupDLQManager();
32523255

32533256
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
32543257
this.replicaManager = replicaManager;
@@ -3290,6 +3293,11 @@ private SharePartitionManagerBuilder withShareGroupDlqEnableSupplier(Supplier<Bo
32903293
return this;
32913294
}
32923295

3296+
private SharePartitionManagerBuilder withShareGroupDlqManager(ShareGroupDLQManager shareGroupDLQManager) {
3297+
this.shareGroupDLQManager = shareGroupDLQManager;
3298+
return this;
3299+
}
3300+
32933301
public static SharePartitionManagerBuilder builder() {
32943302
return new SharePartitionManagerBuilder();
32953303
}
@@ -3308,7 +3316,8 @@ public SharePartitionManager build() {
33083316
new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
33093317
shareGroupMetrics,
33103318
brokerTopicStats,
3311-
shareGroupDlqEnableSupplier
3319+
shareGroupDlqEnableSupplier,
3320+
shareGroupDLQManager
33123321
);
33133322
}
33143323
}

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
5757
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
5858
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
59+
import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
5960
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
6061
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
6162
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -13318,6 +13319,7 @@ private static class SharePartitionBuilder {
1331813319
private Time time = MOCK_TIME;
1331913320
private SharePartitionMetrics sharePartitionMetrics = Mockito.mock(SharePartitionMetrics.class);
1332013321
private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
13322+
private ShareGroupDLQManager shareGroupDLQManager = new NoOpShareGroupDLQManager();
1332113323

1332213324
private SharePartitionBuilder withMaxInflightRecords(int defaultMaxInflightRecords) {
1332313325
this.defaultMaxInflightRecords = defaultMaxInflightRecords;
@@ -13369,14 +13371,20 @@ private SharePartitionBuilder withShareGroupDlqEnableSupplier(Supplier<Boolean>
1336913371
return this;
1337013372
}
1337113373

13374+
private SharePartitionBuilder withShareGroupDlqManager(ShareGroupDLQManager shareGroupDLQManager) {
13375+
this.shareGroupDLQManager = shareGroupDLQManager;
13376+
return this;
13377+
}
13378+
1337213379
public static SharePartitionBuilder builder() {
1337313380
return new SharePartitionBuilder();
1337413381
}
1337513382

1337613383
public SharePartition build() {
1337713384
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, defaultMaxInflightRecords, defaultMaxDeliveryCount,
13378-
defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, replicaManager, configProvider,
13379-
state, Mockito.mock(SharePartitionListener.class), sharePartitionMetrics, shareGroupDlqEnableSupplier);
13385+
defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, replicaManager, configProvider,
13386+
state, Mockito.mock(SharePartitionListener.class), sharePartitionMetrics, shareGroupDlqEnableSupplier,
13387+
shareGroupDLQManager);
1338013388
}
1338113389
}
1338213390
}

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,7 @@ class KafkaConfigTest {
10921092
case GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => // ignore string
10931093
case GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore string
10941094
case GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
1095+
case ShareGroupConfig.SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG => //ignore string
10951096

10961097
/** Streams groups configs */
10971098
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ public class ShareGroupConfig {
8282
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The fully qualified name of a class which implements " +
8383
"the <code>org.apache.kafka.server.share.Persister</code> interface.";
8484

85+
public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG = "group.share.dlq.manager.class.name";
86+
public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DEFAULT = "org.apache.kafka.server.share.dlq.DefaultShareGroupDLQManager";
87+
public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DOC = "The fully qualified name of a class which implements " +
88+
"the <code>org.apache.kafka.server.share.dlq.ShareGroupDLQManager</code> interface.";
89+
8590
public static final ConfigDef CONFIG_DEF = new ConfigDef()
8691
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
8792
.define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
@@ -94,7 +99,8 @@ public class ShareGroupConfig {
9499
.define(SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 2000), MEDIUM, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DOC)
95100
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
96101
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
97-
.defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
102+
.defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC)
103+
.defineInternal(SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DOC);
98104

99105
private final int shareGroupPartitionMaxRecordLocks;
100106
private final int shareGroupMaxPartitionMaxRecordLocks;
@@ -108,6 +114,7 @@ public class ShareGroupConfig {
108114
private final int shareFetchPurgatoryPurgeIntervalRequests;
109115
private final int shareGroupMaxShareSessions;
110116
private final String shareGroupPersisterClassName;
117+
private final String shareGroupDLQManagerClassName;
111118
private final AbstractConfig config;
112119

113120
public ShareGroupConfig(AbstractConfig config) {
@@ -124,6 +131,7 @@ public ShareGroupConfig(AbstractConfig config) {
124131
shareFetchPurgatoryPurgeIntervalRequests = config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
125132
shareGroupMaxShareSessions = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
126133
shareGroupPersisterClassName = config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
134+
shareGroupDLQManagerClassName = config.getString(ShareGroupConfig.SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG);
127135
validate();
128136
}
129137

@@ -186,6 +194,10 @@ public String shareGroupPersisterClassName() {
186194
return shareGroupPersisterClassName;
187195
}
188196

197+
public String shareGroupDLQManagerClassName() {
198+
return shareGroupDLQManagerClassName;
199+
}
200+
189201
private void validate() {
190202
Utils.require(shareGroupMaxDeliveryCountLimit >= shareGroupDeliveryCountLimit,
191203
String.format("%s must be greater than or equal to %s",

0 commit comments

Comments
 (0)