@@ -39,7 +39,12 @@ public class ShareGroupMetrics implements AutoCloseable {
3939 private static final String PARTITION_LOAD_TIME_MS = "PartitionLoadTimeMs" ;
4040 private static final String TOPIC_PARTITIONS_FETCH_RATIO = "RequestTopicPartitionsFetchRatio" ;
4141 private static final String TOPIC_PARTITIONS_ACQUIRE_TIME_MS = "TopicPartitionsAcquireTimeMs" ;
42+ private static final String DEAD_LETTER_QUEUE_RECORD_COUNT = "DeadLetterQueueRecordCount" ;
43+ private static final String DEAD_LETTER_QUEUE_TOTAL_PRODUCE_REQ_PER_SEC = "DeadLetterQueueTotalProduceRequestsPerSec" ;
44+ private static final String DEAD_LETTER_QUEUE_FAILED_PRODUCE_REQ_PER_SEC = "DeadLetterQueueFailedProduceRequestsPerSec" ;
45+
4246 private static final String ACK_TYPE_TAG = "ackType" ;
47+ private static final String GROUP_ID_TAG = "group" ;
4348
4449 /**
4550 * Metric for the rate of records acknowledged per acknowledgement type.
@@ -57,6 +62,19 @@ public class ShareGroupMetrics implements AutoCloseable {
5762 * Metric for the time taken to acquire topic partitions for a group.
5863 */
5964 private final Map <String , Histogram > topicPartitionsAcquireTimeMs ;
65+ /**
66+ * Total records written to DLQ topic per group. We are using meter here
67+ * as it will provide count and per sec attributes.
68+ */
69+ private final Map <String , Meter > dlqRecordCountPerGroup ;
70+ /**
71+ * Total produce requests to the DLQ topic per group.
72+ */
73+ private final Map <String , Meter > dlqProduceTotalPerGroup ;
74+ /**
75+ * Failed produce requests to the DLQ topic per group.
76+ */
77+ private final Map <String , Meter > dlqProduceFailedPerGroup ;
6078
6179 private final KafkaMetricsGroup metricsGroup ;
6280 private final Time time ;
@@ -78,6 +96,9 @@ public ShareGroupMetrics(Time time) {
7896 this .partitionLoadTimeMs = metricsGroup .newHistogram (PARTITION_LOAD_TIME_MS );
7997 this .topicPartitionsFetchRatio = new ConcurrentHashMap <>();
8098 this .topicPartitionsAcquireTimeMs = new ConcurrentHashMap <>();
99+ this .dlqRecordCountPerGroup = new ConcurrentHashMap <>();
100+ this .dlqProduceTotalPerGroup = new ConcurrentHashMap <>();
101+ this .dlqProduceFailedPerGroup = new ConcurrentHashMap <>();
81102 }
82103
83104 public void recordAcknowledgement (byte ackType ) {
@@ -97,13 +118,13 @@ public void partitionLoadTime(long start) {
97118
98119 public void recordTopicPartitionsFetchRatio (String groupId , long value ) {
99120 topicPartitionsFetchRatio .computeIfAbsent (groupId ,
100- k -> metricsGroup .newHistogram (TOPIC_PARTITIONS_FETCH_RATIO , true , Map .of ("group" , groupId )));
121+ k -> metricsGroup .newHistogram (TOPIC_PARTITIONS_FETCH_RATIO , true , Map .of (GROUP_ID_TAG , groupId )));
101122 topicPartitionsFetchRatio .get (groupId ).update (value );
102123 }
103124
104125 public void recordTopicPartitionsAcquireTimeMs (String groupId , long timeMs ) {
105126 topicPartitionsAcquireTimeMs .computeIfAbsent (groupId ,
106- k -> metricsGroup .newHistogram (TOPIC_PARTITIONS_ACQUIRE_TIME_MS , true , Map .of ("group" , groupId )));
127+ k -> metricsGroup .newHistogram (TOPIC_PARTITIONS_ACQUIRE_TIME_MS , true , Map .of (GROUP_ID_TAG , groupId )));
107128 topicPartitionsAcquireTimeMs .get (groupId ).update (timeMs );
108129 }
109130
@@ -127,13 +148,43 @@ public Histogram topicPartitionsAcquireTimeMs(String groupId) {
127148 return topicPartitionsAcquireTimeMs .get (groupId );
128149 }
129150
151+ public void recordDLQRecordWrite (String shareGroupId , int count ) {
152+ dlqRecordCountPerGroup .computeIfAbsent (shareGroupId , k -> metricsGroup .newMeter (
153+ DEAD_LETTER_QUEUE_RECORD_COUNT ,
154+ "requests" ,
155+ TimeUnit .SECONDS ,
156+ Map .of (GROUP_ID_TAG , k )
157+ )).mark (count );
158+ }
159+
160+ public void recordDLQProduce (String shareGroupId ) {
161+ dlqProduceTotalPerGroup .computeIfAbsent (shareGroupId , k -> metricsGroup .newMeter (
162+ DEAD_LETTER_QUEUE_TOTAL_PRODUCE_REQ_PER_SEC ,
163+ "errors" ,
164+ TimeUnit .SECONDS ,
165+ Map .of (GROUP_ID_TAG , k )
166+ )).mark ();
167+ }
168+
169+ public void recordDLQProduceFailed (String shareGroupId ) {
170+ dlqProduceFailedPerGroup .computeIfAbsent (shareGroupId , k -> metricsGroup .newMeter (
171+ DEAD_LETTER_QUEUE_FAILED_PRODUCE_REQ_PER_SEC ,
172+ "errors" ,
173+ TimeUnit .SECONDS ,
174+ Map .of (GROUP_ID_TAG , k )
175+ )).mark ();
176+ }
177+
130178 @ Override
131179 public void close () throws Exception {
132180 Arrays .stream (AcknowledgeType .values ()).forEach (
133181 m -> metricsGroup .removeMetric (RECORD_ACKNOWLEDGEMENTS_PER_SEC , Map .of (ACK_TYPE_TAG , m .toString ())));
134182 metricsGroup .removeMetric (PARTITION_LOAD_TIME_MS );
135- topicPartitionsFetchRatio .forEach ((k , v ) -> metricsGroup .removeMetric (TOPIC_PARTITIONS_FETCH_RATIO , Map .of ("group" , k )));
136- topicPartitionsAcquireTimeMs .forEach ((k , v ) -> metricsGroup .removeMetric (TOPIC_PARTITIONS_ACQUIRE_TIME_MS , Map .of ("group" , k )));
183+ topicPartitionsFetchRatio .forEach ((k , v ) -> metricsGroup .removeMetric (TOPIC_PARTITIONS_FETCH_RATIO , Map .of (GROUP_ID_TAG , k )));
184+ topicPartitionsAcquireTimeMs .forEach ((k , v ) -> metricsGroup .removeMetric (TOPIC_PARTITIONS_ACQUIRE_TIME_MS , Map .of (GROUP_ID_TAG , k )));
185+ dlqRecordCountPerGroup .forEach ((k , v ) -> metricsGroup .removeMetric (DEAD_LETTER_QUEUE_RECORD_COUNT , Map .of (GROUP_ID_TAG , k )));
186+ dlqProduceTotalPerGroup .forEach ((k , v ) -> metricsGroup .removeMetric (DEAD_LETTER_QUEUE_TOTAL_PRODUCE_REQ_PER_SEC , Map .of (GROUP_ID_TAG , k )));
187+ dlqProduceFailedPerGroup .forEach ((k , v ) -> metricsGroup .removeMetric (DEAD_LETTER_QUEUE_FAILED_PRODUCE_REQ_PER_SEC , Map .of (GROUP_ID_TAG , k )));
137188 }
138189
139190 private static String capitalize (String string ) {
0 commit comments