Skip to content

Commit bf53e66

Browse files
committed
KAFKA:20612: Stitch dlq metrics with ShareGroupDLQStateManager.
1 parent 9c618ee commit bf53e66

6 files changed

Lines changed: 559 additions & 16 deletions

File tree

core/src/main/java/kafka/server/share/SharePartitionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public SharePartitionManager(
170170
long remoteFetchMaxWaitMs,
171171
Persister persister,
172172
ShareGroupConfigProvider configProvider,
173+
ShareGroupMetrics metrics,
173174
BrokerTopicStats brokerTopicStats,
174175
Supplier<Boolean> shareGroupDlqEnableSupplier,
175176
ShareGroupDLQManager shareGroupDLQManager
@@ -184,7 +185,7 @@ public SharePartitionManager(
184185
remoteFetchMaxWaitMs,
185186
persister,
186187
configProvider,
187-
new ShareGroupMetrics(time),
188+
metrics,
188189
brokerTopicStats,
189190
shareGroupDlqEnableSupplier,
190191
shareGroupDLQManager
@@ -634,7 +635,6 @@ private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, List<Dela
634635
@Override
635636
public void close() throws Exception {
636637
this.timer.close();
637-
this.shareGroupMetrics.close();
638638
}
639639

640640
private ShareSessionKey shareSessionKey(String groupId, String memberId) {

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, LogManager
6363
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6464
import org.apache.kafka.server.partition.{AlterPartitionManager, DefaultAlterPartitionManager}
6565
import org.apache.kafka.server.share.dlq.{DefaultShareGroupDLQManager, NoOpShareGroupDLQManager, ShareGroupDLQManager}
66+
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
6667

6768
import java.time.Duration
6869
import java.util
@@ -165,6 +166,8 @@ class BrokerServer(
165166

166167
var clientMetricsManager: ClientMetricsManager = _
167168

169+
var shareGroupMetrics: ShareGroupMetrics = _
170+
168171
var sharePartitionManager: SharePartitionManager = _
169172

170173
var persister: Persister = _
@@ -389,6 +392,9 @@ class BrokerServer(
389392
/* create persister */
390393
persister = createShareStatePersister()
391394

395+
/* create metrics object to be shared with share DLQ manager share partition manager*/
396+
shareGroupMetrics = new ShareGroupMetrics(time)
397+
392398
/* create share group DLQ manager */
393399
shareGroupDLQManager = createShareGroupDLQManager()
394400

@@ -468,6 +474,7 @@ class BrokerServer(
468474
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
469475
persister,
470476
new ShareGroupConfigProvider(groupConfigManager),
477+
shareGroupMetrics,
471478
brokerTopicStats,
472479
() => ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)).supportsShareGroupDLQ(),
473480
shareGroupDLQManager
@@ -765,7 +772,8 @@ class BrokerServer(
765772
NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config, metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager broker=${config.brokerId}]")),
766773
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.partitionFor(key), config.interBrokerListenerName, groupConfigManager),
767774
Time.SYSTEM,
768-
shareGroupTimer
775+
shareGroupTimer,
776+
shareGroupMetrics
769777
)
770778
} else if (klass.getName.equals(classOf[NoOpShareGroupDLQManager].getName)) {
771779
info("Using no-op share group DLQ manager")
@@ -917,6 +925,9 @@ class BrokerServer(
917925
if (shareGroupDLQManager != null)
918926
Utils.swallow(this.logger.underlying, () => shareGroupDLQManager.stop())
919927

928+
if (shareGroupMetrics != null)
929+
Utils.swallow(this.logger.underlying, () => shareGroupMetrics.close())
930+
920931
Utils.closeQuietly(shareGroupTimer, "share group timer")
921932

922933
if (lifecycleManager != null)

core/src/test/java/kafka/server/share/SharePartitionManagerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1266,7 +1266,6 @@ public void testCloseSharePartitionManager() throws Exception {
12661266
sharePartitionManager.close();
12671267
// Verify that the timer object in sharePartitionManager is closed by checking the calls to timer.close() and shareGroupMetrics.close().
12681268
Mockito.verify(timer, times(1)).close();
1269-
Mockito.verify(shareGroupMetrics, times(1)).close();
12701269
}
12711270

12721271
@Test

server/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.kafka.clients.KafkaClient;
2121
import org.apache.kafka.common.utils.Time;
22+
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
2223
import org.apache.kafka.server.util.timer.Timer;
2324

2425
import org.slf4j.Logger;
@@ -39,14 +40,26 @@ public class DefaultShareGroupDLQManager implements ShareGroupDLQManager {
3940

4041
private static final Logger log = LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
4142

42-
public static ShareGroupDLQManager instance(KafkaClient client, ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
43-
DefaultShareGroupDLQManager instance = new DefaultShareGroupDLQManager(client, cacheHelper, time, timer);
43+
public static ShareGroupDLQManager instance(
44+
KafkaClient client,
45+
ShareGroupDLQMetadataCacheHelper cacheHelper,
46+
Time time,
47+
Timer timer,
48+
ShareGroupMetrics metrics
49+
) {
50+
DefaultShareGroupDLQManager instance = new DefaultShareGroupDLQManager(client, cacheHelper, time, timer, metrics);
4451
instance.start();
4552
return instance;
4653
}
4754

48-
private DefaultShareGroupDLQManager(KafkaClient client, ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
49-
this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, time, timer);
55+
private DefaultShareGroupDLQManager(
56+
KafkaClient client,
57+
ShareGroupDLQMetadataCacheHelper cacheHelper,
58+
Time time,
59+
Timer timer,
60+
ShareGroupMetrics shareGroupMetrics
61+
) {
62+
this.stateManager = new ShareGroupDLQStateManager(client, cacheHelper, time, timer, shareGroupMetrics);
5063
}
5164

5265
private void start() {

server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.kafka.common.utils.Time;
4646
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
4747
import org.apache.kafka.server.config.ServerConfigs;
48+
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
4849
import org.apache.kafka.server.util.InterBrokerSendThread;
4950
import org.apache.kafka.server.util.RequestAndCompletionHandler;
5051
import org.apache.kafka.server.util.timer.Timer;
@@ -80,6 +81,7 @@ public class ShareGroupDLQStateManager {
8081
private final Time time;
8182
private final Timer timer;
8283
private final ShareGroupDLQMetadataCacheHelper cacheHelper;
84+
private final ShareGroupMetrics shareGroupMetrics;
8385
public static final long REQUEST_BACKOFF_MS = 1_000L;
8486
public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
8587
private static final int MAX_REQUEST_ATTEMPTS = 5;
@@ -91,7 +93,18 @@ public class ShareGroupDLQStateManager {
9193
private final Map<Node, List<ProduceRequestHandler>> nodeRPCMap = new HashMap<>();
9294
private final Object nodeMapLock = new Object();
9395

94-
public ShareGroupDLQStateManager(KafkaClient client, ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
96+
// Called when the generateRequests method is executed by InterBrokerSendThread, returning requests.
97+
// Mainly for testing and introspection purpose to inspect the state of the nodeRPC map
98+
// when generateRequests is called.
99+
private Runnable generateCallback;
100+
101+
public ShareGroupDLQStateManager(
102+
KafkaClient client,
103+
ShareGroupDLQMetadataCacheHelper cacheHelper,
104+
Time time,
105+
Timer timer,
106+
ShareGroupMetrics shareGroupMetrics
107+
) {
95108
if (client == null) {
96109
throw new IllegalArgumentException("Kafkaclient must not be null.");
97110
}
@@ -108,9 +121,14 @@ public ShareGroupDLQStateManager(KafkaClient client, ShareGroupDLQMetadataCacheH
108121
throw new IllegalArgumentException("Timer must not be null.");
109122
}
110123

124+
if (shareGroupMetrics == null) {
125+
throw new IllegalArgumentException("ShareGroupMetrics must not be null.");
126+
}
127+
111128
this.time = time;
112129
this.timer = timer;
113130
this.cacheHelper = cacheHelper;
131+
this.shareGroupMetrics = shareGroupMetrics;
114132
this.sender = new SendThread(
115133
"ShareGroupDLQSendThread",
116134
client,
@@ -154,6 +172,16 @@ CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param, long requestBack
154172
return future;
155173
}
156174

175+
// Visibility for tests
176+
void setGenerateCallback(Runnable generateCallback) {
177+
this.generateCallback = generateCallback;
178+
}
179+
180+
// Visibility for tests
181+
Map<Node, List<ShareGroupDLQStateManager.ProduceRequestHandler>> nodeRPCMap() {
182+
return nodeRPCMap;
183+
}
184+
157185
private void enqueue(ProduceRequestHandler requestHandler) {
158186
sender.enqueue(requestHandler);
159187
}
@@ -333,6 +361,10 @@ public ProduceRequestData.TopicProduceData topicProduceData() {
333361
simpleRecords.toArray(new SimpleRecord[]{})
334362
);
335363

364+
// Update the metric to say a new request is created to se sent. This might not be the
365+
// actual RPC count as we coalesce the requests before sending.
366+
shareGroupMetrics.recordDLQProduce(param.groupId());
367+
336368
return new ProduceRequestData.TopicProduceData()
337369
.setName(dlqTopicPartitionData.topicName())
338370
.setTopicId(dlqTopicPartitionData.topicId().get())
@@ -580,6 +612,7 @@ private void handleProduceResponse(ClientResponse response) {
580612
switch (error) {
581613
case NONE:
582614
LOG.debug("Successfully produced records {} to dlq topic node {}.", this, dlqPartitionLeaderNode());
615+
shareGroupMetrics.recordDLQRecordWrite(param.groupId(), (int) (param.lastOffset() - param.firstOffset() + 1));
583616
produceRequestBackoff.resetAttempts();
584617
this.result.complete(null);
585618
break;
@@ -588,6 +621,7 @@ private void handleProduceResponse(ClientResponse response) {
588621
LOG.debug("Received retriable error produce response for {} to dlq topic node {} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
589622
if (!produceRequestBackoff.canAttempt()) {
590623
LOG.error("Exhausted max retries to produce {} to DLQ topic node {}.", this, dlqPartitionLeaderNode());
624+
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
591625
requestErrorResponse(new Exception("Exhausted max retries to produce to DLQ topic without success."));
592626
break;
593627
}
@@ -598,6 +632,7 @@ private void handleProduceResponse(ClientResponse response) {
598632
LOG.error("Unable to produce {} to DLQ topic node {} - {}.", this, dlqPartitionLeaderNode(), errorMessage);
599633
partitionResponse.recordErrors().forEach(recordError ->
600634
LOG.error("Records with errors {} - {}.", recordError.batchIndex(), recordError.batchIndexErrorMessage()));
635+
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
601636
requestErrorResponse(error.exception());
602637
}
603638
break;
@@ -609,6 +644,7 @@ private void handleProduceResponse(ClientResponse response) {
609644
if (!produceRequestBackoff.canAttempt()) {
610645
LOG.error("Exhausted max retries to produce {} to DLQ topic node {} due to client response error {}.",
611646
param, dlqPartitionLeaderNode(), clientResponseErrorMessage);
647+
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
612648
requestErrorResponse(clientResponseError.exception());
613649
break;
614650
}
@@ -618,6 +654,7 @@ private void handleProduceResponse(ClientResponse response) {
618654
default:
619655
LOG.error("Unable to produce {} to DLQ topic node {} due to client response error {}.",
620656
param, dlqPartitionLeaderNode(), clientResponseErrorMessage);
657+
shareGroupMetrics.recordDLQProduceFailed(param.groupId());
621658
requestErrorResponse(clientResponseError.exception());
622659
}
623660
}
@@ -634,6 +671,11 @@ private class SendThread extends InterBrokerSendThread {
634671

635672
@Override
636673
public Collection<RequestAndCompletionHandler> generateRequests() {
674+
// Introspection for testing - will be null in prod
675+
if (generateCallback != null) {
676+
generateCallback.run();
677+
}
678+
637679
List<RequestAndCompletionHandler> requests = new ArrayList<>();
638680

639681
if (!queue.isEmpty()) {
@@ -772,13 +814,19 @@ public void run() {
772814
}
773815
}
774816

775-
private record CoalesceResults(
817+
// Visibility for tests
818+
record CoalesceResults(
776819
AbstractRequest.Builder<? extends AbstractRequest> request,
777820
List<ProduceRequestHandler> liveHandlers
778821
) {
779822
}
780823

781-
private static CoalesceResults coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
824+
// Visibility for tests
825+
static CoalesceResults coalesceProduceRequests(List<ProduceRequestHandler> handlers) {
826+
// Above handlers are destined for the same broker node - it could be for different DLQ topics and partitions
827+
// but the same broker node. Now the produce request requires each topic data request to be
828+
// scoped to a specific topic/topicId and the partition data could have all the record information
829+
// and the destination DLQ partition. To accomplish this, we will map handlers by DLQ topic id.
782830
Map<Uuid, ProduceRequestData.TopicProduceData> produceHandlerMap = new HashMap<>();
783831
List<ProduceRequestHandler> liveHandlers = new ArrayList<>(handlers.size());
784832
handlers.forEach(handler -> {

0 commit comments

Comments
 (0)