@@ -4,15 +4,15 @@ import (
44 pb "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/raccoon/v1beta1"
55 "context"
66 "fmt"
7- "github.com/goto/raccoon/clients/go/log"
8- "github.com/goto/raccoon/serialization"
97 "time"
108
119 "github.com/gojek/courier-go"
10+ "github.com/goto/raccoon/clients/go/log"
1211 "github.com/goto/raccoon/collection"
1312 "github.com/goto/raccoon/config"
1413 "github.com/goto/raccoon/identification"
1514 "github.com/goto/raccoon/metrics"
15+ "github.com/goto/raccoon/serialization"
1616 "google.golang.org/protobuf/proto"
1717)
1818
@@ -25,69 +25,81 @@ type Handler struct {
2525// and sends them to the Collector.
2626func (h * Handler ) MQTTHandler (ctx context.Context , c courier.PubSub , message * courier.Message ) {
2727 start := time .Now ()
28-
29- identifier := identification.Identifier {
30- Group : config .ServerMQTT .ConnGroup ,
31- }
28+ group := config .ServerMQTT .ConnGroup
3229 log .Infof ("MQTT message received with content %v" , message )
3330
3431 var req pb.SendEventRequest
3532 if err := message .DecodePayload (& req ); err != nil {
36- metrics .Increment (
37- "batches_read_total" ,
38- fmt .Sprintf ("status=failed,conn_group=%s,reason=serde" , identifier .Group ),
39- )
40- log .Errorf ("mqtt message decoding failed due to : %v" , err )
33+ h .recordMetrics ("request" , fmt .Sprintf ("status=failed,conn_group=%s,reason=serde" , group ), nil )
34+ log .Errorf ("mqtt message decoding failed: %v" , err )
4135 return
4236 }
4337
4438 if proto .Equal (& req , & pb.SendEventRequest {}) {
45- metrics .Increment (
46- "batches_read_total" ,
47- fmt .Sprintf ("status=failed,conn_group=%s,reason=empty" , identifier .Group ),
48- )
39+ h .recordMetrics ("request" , fmt .Sprintf ("status=failed,conn_group=%s,reason=empty" , group ), nil )
4940 log .Errorf ("mqtt request message according proto format is empty" )
5041 return
5142 }
5243
53- //to be removed post end-to-end test
44+ // Debug — can be removed after E2E verification
5445 for _ , event := range req .Events {
55- log .Infof ("MQTT message content post deserialization event : %v" , event )
46+ log .Infof ("MQTT message content post deserialization event: %v" , event )
5647 }
57- log .Infof ("MQTT message request id %v" , req .ReqGuid )
48+ log .Infof ("MQTT message request id: %v" , req .ReqGuid )
5849
59- //instrument the request number
60- metrics .Increment (
61- "batches_read_total" ,
62- fmt .Sprintf ("status=success,conn_group=%s" , identifier .Group ),
63- )
64- //instrument the request size
50+ // Serialize to compute request size
6551 reqBytes , err := serialization .SerializeProto (& req )
6652 if err != nil {
67- log .Errorf ("mqtt message serialization failed : %v" , err )
68- } else {
69- metrics .Count ("request_bytes_total" , len (reqBytes ),
70- fmt .Sprintf ("conn_group=%s" , identifier .Group ))
53+ log .Errorf ("mqtt message serialization failed: %v" , err )
7154 }
7255
73- h .recordEventMetrics (req .Events , identifier .Group )
56+ // Record all metrics via generic function
57+ h .recordMetrics ("request" , fmt .Sprintf ("status=success,conn_group=%s" , group ), reqBytes )
58+ h .recordMetrics ("event" , fmt .Sprintf ("conn_group=%s" , group ), req .Events )
7459
7560 h .Collector .Collect (ctx , & collection.CollectRequest {
76- ConnectionIdentifier : identifier ,
61+ ConnectionIdentifier : identification. Identifier { Group : group } ,
7762 TimeConsumed : start ,
7863 SendEventRequest : & req ,
7964 AckFunc : nil ,
8065 })
8166}
8267
83- // recordEventMetrics updates per-event metrics like byte size and event count.
84- func (h * Handler ) recordEventMetrics (events []* pb.Event , group string ) {
68+ // recordMetrics is a generic entry function that routes metric recording
69+ // based on metricName.
70+ func (h * Handler ) recordMetrics (metricName string , tags string , data any ) {
71+ switch metricName {
72+ case "request" :
73+ h .recordRequestMetrics (tags , data )
74+ case "event" :
75+ h .recordEventMetrics (tags , data )
76+ default :
77+ log .Errorf ("unknown metricName=%s ignored" , metricName )
78+ }
79+ }
80+
81+ // recordRequestMetrics captures request-level metrics (success/failure, bytes).
82+ func (h * Handler ) recordRequestMetrics (tags string , data any ) {
83+ metrics .Increment ("batches_read_total" , tags )
84+
85+ if reqBytes , ok := data .([]byte ); ok && len (reqBytes ) > 0 {
86+ metrics .Count ("request_bytes_total" , len (reqBytes ), tags )
87+ }
88+ }
89+
90+ // recordEventMetrics captures per-event metrics like count and size.
91+ func (h * Handler ) recordEventMetrics (tags string , data any ) {
92+ events , ok := data .([]* pb.Event )
93+ if ! ok {
94+ return
95+ }
96+
8597 for _ , e := range events {
8698 if e == nil {
8799 continue
88100 }
89- tags := fmt .Sprintf ("conn_group= %s,event_type=%s" , group , e .Type )
90- metrics .Count ( "events_rx_bytes_total " , len ( e . EventBytes ), tags )
91- metrics .Increment ( "events_rx_total " , tags )
101+ eventTags := fmt .Sprintf ("%s,event_type=%s" , tags , e .Type )
102+ metrics .Increment ( "events_rx_total " , eventTags )
103+ metrics .Count ( "events_rx_bytes_total " , len ( e . EventBytes ), eventTags )
92104 }
93105}
0 commit comments