11package com .salesforce .mirus .metrics ;
22
3+ import com .google .common .collect .Sets ;
34import com .salesforce .mirus .MirusSourceConnector ;
45import java .util .*;
6+ import java .util .concurrent .TimeUnit ;
57import java .util .stream .Collectors ;
68import org .apache .kafka .common .MetricNameTemplate ;
79import org .apache .kafka .common .TopicPartition ;
10+ import org .apache .kafka .common .metrics .MetricConfig ;
811import org .apache .kafka .common .metrics .Metrics ;
912import org .apache .kafka .common .metrics .Sensor ;
1013import org .apache .kafka .common .metrics .stats .*;
14+ import org .apache .kafka .common .utils .Time ;
1115import org .slf4j .Logger ;
1216import org .slf4j .LoggerFactory ;
1317
1418public class MirrorJmxReporter extends AbstractMirusJmxReporter {
1519
1620 private static final Logger logger = LoggerFactory .getLogger (MirrorJmxReporter .class );
1721
22+ public static final Map <Long , String > LATENCY_BUCKETS =
23+ Map .of (
24+ TimeUnit .MINUTES .toMillis (0 ),
25+ "0m" ,
26+ TimeUnit .MINUTES .toMillis (5 ),
27+ "5m" ,
28+ TimeUnit .MINUTES .toMillis (10 ),
29+ "10m" ,
30+ TimeUnit .MINUTES .toMillis (30 ),
31+ "30m" ,
32+ TimeUnit .MINUTES .toMillis (60 ),
33+ "60m" ,
34+ TimeUnit .HOURS .toMillis (12 ),
35+ "12h" );
36+
1837 private static MirrorJmxReporter instance = null ;
1938
2039 private static final String SOURCE_CONNECTOR_GROUP = MirusSourceConnector .class .getSimpleName ();
2140
2241 private static final Set <String > TOPIC_TAGS = new HashSet <>(Collections .singletonList ("topic" ));
42+ private static final Set <String > TOPIC_BUCKET_TAGS = Sets .newHashSet ("topic" , "bucket" );
2343
2444 private static final MetricNameTemplate REPLICATION_LATENCY =
2545 new MetricNameTemplate (
@@ -38,16 +58,25 @@ public class MirrorJmxReporter extends AbstractMirusJmxReporter {
3858 "replication-latency-ms-avg" , SOURCE_CONNECTOR_GROUP ,
3959 "Average time it takes records to replicate from source to target cluster." , TOPIC_TAGS );
4060
61+ protected static final MetricNameTemplate HISTOGRAM_LATENCY =
62+ new MetricNameTemplate (
63+ "replication-latency-histogram" ,
64+ SOURCE_CONNECTOR_GROUP ,
65+ "Cumulative histogram counting records delivered per second with latency exceeding a set of fixed bucket thresholds." ,
66+ TOPIC_BUCKET_TAGS );
67+
4168 // Map of topics to their metric objects
4269 private final Map <String , Sensor > topicSensors ;
4370 private final Set <TopicPartition > topicPartitionSet ;
71+ private final Map <String , TreeMap <Long , Sensor >> histogramLatencySensors ;
4472
4573 private MirrorJmxReporter () {
46- super (new Metrics ());
74+ super (new Metrics (new MetricConfig (), new ArrayList <>( 0 ), Time . SYSTEM , true ));
4775 metrics .sensor ("replication-latency" );
4876
4977 topicSensors = new HashMap <>();
5078 topicPartitionSet = new HashSet <>();
79+ histogramLatencySensors = new HashMap <>();
5180
5281 logger .info ("Initialized MirrorJMXReporter" );
5382 }
@@ -73,6 +102,15 @@ public synchronized void addTopics(List<TopicPartition> topicPartitions) {
73102 .filter (topic -> !topicSensors .containsKey (topic ))
74103 .collect (Collectors .toMap (topic -> topic , this ::createTopicSensor )));
75104 topicPartitionSet .addAll (topicPartitions );
105+
106+ for (TopicPartition topicPartition : topicPartitions ) {
107+ TreeMap <Long , Sensor > bucketSensors = new TreeMap <>();
108+ String topic = topicPartition .topic ();
109+ LATENCY_BUCKETS .forEach (
110+ (edgeMillis , bucketName ) ->
111+ bucketSensors .put (edgeMillis , createHistogramSensor (topic , bucketName )));
112+ histogramLatencySensors .put (topic , bucketSensors );
113+ }
76114 }
77115
78116 /**
@@ -104,6 +142,7 @@ public synchronized void removeTopics(List<TopicPartition> topicPartitions) {
104142 topic -> {
105143 metrics .removeSensor (replicationLatencySensorName (topic ));
106144 topicSensors .remove (topic );
145+ histogramLatencySensors .remove (topic );
107146 });
108147 }
109148
@@ -112,6 +151,24 @@ public synchronized void recordMirrorLatency(String topic, long millis) {
112151 if (sensor != null ) {
113152 sensor .record ((double ) millis );
114153 }
154+
155+ TreeMap <Long , Sensor > bucketSensors = histogramLatencySensors .get (topic );
156+ for (Map .Entry <Long , Sensor > sensorEntry : bucketSensors .entrySet ()) {
157+ long edgeMillis = sensorEntry .getKey ();
158+ Sensor bucketSensor = sensorEntry .getValue ();
159+ if (millis >= edgeMillis ) {
160+ if (bucketSensor .hasExpired ()) {
161+ String bucket = LATENCY_BUCKETS .get (edgeMillis );
162+ // explicitly replace the expired sensor with a new one
163+ metrics .removeSensor (histogramLatencySensorName (topic , bucket ));
164+ bucketSensor = createHistogramSensor (topic , bucket );
165+ }
166+ bucketSensor .record (1 );
167+ } else {
168+ // bucket sensors are sorted by edgeMillis
169+ break ;
170+ }
171+ }
115172 }
116173
117174 private Sensor createTopicSensor (String topic ) {
@@ -127,7 +184,32 @@ private Sensor createTopicSensor(String topic) {
127184 return sensor ;
128185 }
129186
187+ private Sensor createHistogramSensor (String topic , String bucket ) {
188+ Map <String , String > tags = new LinkedHashMap <>();
189+ tags .put ("topic" , topic );
190+ tags .put ("bucket" , bucket );
191+
192+ // bucket sensor will be expired after 5 mins if inactive
193+ // this is to prevent inactive bucket sensors from reporting too many zero value metrics
194+ Sensor sensor =
195+ metrics .sensor (
196+ histogramLatencySensorName (topic , bucket ),
197+ null ,
198+ TimeUnit .MINUTES .toSeconds (5 ),
199+ Sensor .RecordingLevel .INFO ,
200+ null );
201+ sensor .add (
202+ metrics .metricInstance (HISTOGRAM_LATENCY , tags ),
203+ new Rate (TimeUnit .SECONDS , new WindowedSum ()));
204+
205+ return sensor ;
206+ }
207+
130208 private String replicationLatencySensorName (String topic ) {
131209 return topic + "-" + "replication-latency" ;
132210 }
211+
212+ private String histogramLatencySensorName (String topic , String bucket ) {
213+ return topic + "-" + bucket + "-" + "histogram-latency" ;
214+ }
133215}
0 commit comments