@@ -10,6 +10,7 @@ import (
1010 "fmt"
1111 "io"
1212 "net/http"
13+ "net/url"
1314 "os"
1415 "strconv"
1516 "strings"
@@ -61,6 +62,8 @@ type handle struct {
6162 transformRequestTimerStat stats.Measurement
6263 logger logger.Logger
6364
65+ stats stats.Stats
66+
6467 // clientOAuthV2 is the HTTP client for router transformation requests using OAuth V2.
6568 clientOAuthV2 * http.Client
6669 // proxyClientOAuthV2 is the mockable HTTP client for transformer proxy requests using OAuth V2.
@@ -127,6 +130,34 @@ func NewTransformer(destinationTimeout, transformTimeout time.Duration, backendC
127130
128131var loggerOverride logger.Logger
129132
133+ // Add transformerMetricLabels struct and methods
134+ type transformerMetricLabels struct {
135+ Endpoint string // hostname of the service
136+ DestinationType string // BQ, etc.
137+ SourceType string // webhook
138+ Stage string // processor, router, gateway
139+ WorkspaceID string // workspace identifier
140+ SourceID string // source identifier
141+ DestinationID string // destination identifier
142+ }
143+
144+ // ToStatsTag converts transformerMetricLabels to stats.Tags
145+ func (t transformerMetricLabels ) ToStatsTag () stats.Tags {
146+ tags := stats.Tags {
147+ "endpoint" : t .Endpoint ,
148+ "destinationType" : t .DestinationType ,
149+ "sourceType" : t .SourceType ,
150+ "stage" : t .Stage ,
151+ "workspaceId" : t .WorkspaceID ,
152+ "destinationId" : t .DestinationID ,
153+ "sourceId" : t .SourceID ,
154+
155+ // Legacy tags: to be removed
156+ "destType" : t .DestinationType ,
157+ }
158+ return tags
159+ }
160+
130161// Transform transforms router jobs to destination jobs
131162func (trans * handle ) Transform (transformType string , transformMessage * types.TransformMessageT ) []types.DestinationJobT {
132163 var destinationJobs types.DestinationJobs
@@ -138,24 +169,37 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
138169 trans .logger .Errorf ("problematic input for marshalling: %#v" , transformMessage )
139170 panic (err )
140171 }
141- trans .logger .Debugf ("[Router Transformer] :: input payload : %s" , string (rawJSON ))
142-
143- retryCount := 0
144- var resp * http.Response
145- var respData []byte
146- // We should rarely have error communicating with our JS
147- reqFailed := false
148172
149173 var url string
150174 if transformType == BATCH {
151175 url = getBatchURL ()
152176 } else if transformType == ROUTER_TRANSFORM {
153177 url = getRouterTransformURL ()
154178 } else {
155- // Unexpected transformType returning empty
156179 return []types.DestinationJobT {}
157180 }
158181
182+ // Create metric labels
183+ labels := transformerMetricLabels {
184+ Endpoint : getEndpointFromURL (url ),
185+ Stage : "router" ,
186+ DestinationType : transformMessage .Data [0 ].Destination .DestinationDefinition .Name ,
187+ SourceType : transformMessage .Data [0 ].JobMetadata .SourceCategory ,
188+ WorkspaceID : transformMessage .Data [0 ].JobMetadata .WorkspaceID ,
189+ SourceID : transformMessage .Data [0 ].JobMetadata .SourceID ,
190+ DestinationID : transformMessage .Data [0 ].Destination .ID ,
191+ }
192+
193+ // Record request metrics
194+ trans .stats .NewTaggedStat ("transformer_client_request_total_events" , stats .CountType , labels .ToStatsTag ()).Count (len (transformMessage .Data ))
195+ trans .stats .NewTaggedStat ("transformer_client_request_total_bytes" , stats .CountType , labels .ToStatsTag ()).Count (len (rawJSON ))
196+
197+ retryCount := 0
198+ var resp * http.Response
199+ var respData []byte
200+ // We should rarely have error communicating with our JS
201+ reqFailed := false
202+
159203 for {
160204 s := time .Now ()
161205 req , err := http .NewRequest ("POST" , url , bytes .NewBuffer (rawJSON ))
@@ -183,14 +227,19 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
183227 resp , err = trans .client .Do (req )
184228 }
185229
230+ duration := time .Since (s )
231+ trans .stats .NewTaggedStat ("transformer_client_total_durations_seconds" , stats .CountType , labels .ToStatsTag ()).Count (int (duration .Seconds ()))
232+
186233 if err == nil {
187234 // If no err returned by client.Post, reading body.
188235 // If reading body fails, retrying.
189236 respData , err = io .ReadAll (resp .Body )
237+ trans .stats .NewTaggedStat ("transformer_client_response_total_bytes" , stats .CountType , labels .ToStatsTag ()).Count (len (respData ))
238+
190239 }
191240
192241 if err != nil {
193- trans .transformRequestTimerStat .SendTiming (time . Since ( s ) )
242+ trans .transformRequestTimerStat .SendTiming (duration )
194243 reqFailed = true
195244 trans .logger .Errorn (
196245 "JS HTTP connection error" ,
@@ -215,7 +264,7 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
215264 )
216265 }
217266
218- trans .transformRequestTimerStat .SendTiming (time . Since ( s ) )
267+ trans .transformRequestTimerStat .SendTiming (duration )
219268 break
220269 }
221270
@@ -254,6 +303,9 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra
254303 err = jsonfast .Unmarshal (rawResp , & destinationJobs )
255304 }
256305
306+ // Record response events metric
307+ trans .stats .NewTaggedStat ("transformer_client_response_total_events" , stats .CountType , labels .ToStatsTag ()).Count (len (destinationJobs ))
308+
257309 // Validate the response received from the transformer
258310 in := transformMessage .JobIDs ()
259311 var out []int64
@@ -359,11 +411,6 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
359411 routerJobDontBatchDirectives [m .JobID ] = m .DontBatch
360412 }
361413
362- stats .Default .NewTaggedStat ("transformer_proxy.delivery_request" , stats .CountType , stats.Tags {
363- "destType" : proxyReqParams .DestName ,
364- "workspaceId" : proxyReqParams .ResponseData .Metadata [0 ].WorkspaceID ,
365- "destinationId" : proxyReqParams .ResponseData .Metadata [0 ].DestinationID ,
366- }).Increment ()
367414 trans .logger .Debugf (`[TransformerProxy] (Dest-%[1]v) Proxy Request starts - %[1]v` , proxyReqParams .DestName )
368415
369416 payload , err := proxyReqParams .Adapter .getPayload (proxyReqParams )
@@ -377,6 +424,7 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
377424 DontBatchDirectives : routerJobDontBatchDirectives ,
378425 }
379426 }
427+
380428 proxyURL , err := proxyReqParams .Adapter .getProxyURL (proxyReqParams .DestName )
381429 if err != nil {
382430 return ProxyRequestResponse {
@@ -389,13 +437,32 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
389437 }
390438 }
391439
440+ // Create metric labels
441+ labels := transformerMetricLabels {
442+ Endpoint : getEndpointFromURL (proxyURL ),
443+ Stage : "router_proxy" ,
444+ DestinationType : proxyReqParams .DestName ,
445+ WorkspaceID : proxyReqParams .ResponseData .Metadata [0 ].WorkspaceID ,
446+ DestinationID : proxyReqParams .ResponseData .Metadata [0 ].DestinationID ,
447+ }.ToStatsTag ()
448+
449+ // Record request metrics
450+ trans .stats .NewTaggedStat ("transformer_proxy.delivery_request" , stats .CountType , labels ).Increment ()
451+ trans .stats .NewTaggedStat ("transformer_client_request_total_bytes" , stats .CountType , labels ).Count (len (payload ))
452+ trans .stats .NewTaggedStat ("transformer_client_request_total_events" , stats .CountType , labels ).Count (len (proxyReqParams .ResponseData .Metadata ))
453+
392454 rdlTime := time .Now ()
393455 httpPrxResp := trans .doProxyRequest (ctx , proxyURL , proxyReqParams , payload )
394456 respData , respCode , requestError := httpPrxResp .respData , httpPrxResp .statusCode , httpPrxResp .err
395457
396- reqSuccessStr := strconv .FormatBool (requestError == nil )
397- stats .Default .NewTaggedStat ("transformer_proxy.request_latency" , stats .TimerType , stats.Tags {"requestSuccess" : reqSuccessStr , "destType" : proxyReqParams .DestName }).SendTiming (time .Since (rdlTime ))
398- stats .Default .NewTaggedStat ("transformer_proxy.request_result" , stats .CountType , stats.Tags {"requestSuccess" : reqSuccessStr , "destType" : proxyReqParams .DestName }).Increment ()
458+ duration := time .Since (rdlTime )
459+
460+ trans .stats .NewTaggedStat ("transformer_client_total_durations_seconds" , stats .CountType , labels ).Count (int (duration .Seconds ()))
461+ trans .stats .NewTaggedStat ("transformer_client_response_total_bytes" , stats .CountType , labels ).Count (len (respData ))
462+
463+ labelsWithSuccess := lo .Assign (labels , stats.Tags {"requestSuccess" : strconv .FormatBool (requestError == nil )})
464+ trans .stats .NewTaggedStat ("transformer_proxy.request_latency" , stats .TimerType , labelsWithSuccess ).SendTiming (duration )
465+ trans .stats .NewTaggedStat ("transformer_proxy.request_result" , stats .CountType , labelsWithSuccess ).Increment ()
399466
400467 if requestError != nil {
401468 return ProxyRequestResponse {
@@ -471,6 +538,8 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ
471538 respData = []byte (transportResponse .InterceptorResponse .Response )
472539 }
473540
541+ trans .stats .NewTaggedStat ("transformer_client_response_total_events" , stats .CountType , labels ).Count (len (transResp .routerJobResponseCodes ))
542+
474543 return ProxyRequestResponse {
475544 ProxyRequestStatusCode : respCode ,
476545 ProxyRequestResponseBody : string (respData ),
@@ -532,6 +601,7 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c
532601 trans .proxyClient = transformerclient .NewClient (transformerClientConfig )
533602 // This client is used for Transformer Proxy(delivered from transformer to destination) using oauthV2
534603 trans .proxyClientOAuthV2 = oauthv2httpclient .NewOAuthHttpClient (& http.Client {Transport : trans .tr , Timeout : trans .destinationTimeout + trans .transformTimeout }, common .RudderFlowDelivery , cache , backendConfig , GetAuthErrorCategoryFromTransformProxyResponse , proxyClientOptionalArgs )
604+ trans .stats = stats .Default
535605 trans .transformRequestTimerStat = stats .Default .NewStat ("router.transformer_request_time" , stats .TimerType )
536606}
537607
@@ -677,3 +747,11 @@ func GetAuthErrorCategoryFromTransformProxyResponse(respData []byte) (string, er
677747 }
678748 return transformedJobs .AuthErrorCategory , nil
679749}
750+
751+ // Helper function to get endpoint from URL
752+ func getEndpointFromURL (urlStr string ) string {
753+ if parsedURL , err := url .Parse (urlStr ); err == nil {
754+ return parsedURL .Host
755+ }
756+ return ""
757+ }
0 commit comments