Skip to content

Commit 18f4bdf

Browse files
SankeerthSai Sankeerth
andauthored
chore: oauth v2 stats refactor (#5262)
* chore: oauth v2 stats refactor * chore: rename handler variable * chore: address comments --------- Co-authored-by: Sai Sankeerth <sanpj2292@github.com>
1 parent 8a186e5 commit 18f4bdf

File tree

3 files changed

+102
-80
lines changed

3 files changed

+102
-80
lines changed

services/oauth/v2/oauth.go

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"net/http"
8+
"strings"
89
"time"
910

1011
"github.com/tidwall/gjson"
@@ -141,11 +142,11 @@ func (h *OAuthHandler) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *A
141142
authErrCategory: "",
142143
errorMessage: "",
143144
destDefName: fetchTokenParams.DestDefName,
144-
isTokenFetch: true,
145145
flowType: h.RudderFlowType,
146146
action: "fetch_token",
147147
}
148-
return h.GetTokenInfo(fetchTokenParams, "Fetch token", authStats)
148+
statshandler := NewStatsHandlerFromOAuthStats(authStats)
149+
return h.GetTokenInfo(fetchTokenParams, "Fetch token", statshandler)
149150
}
150151

151152
/*
@@ -176,10 +177,11 @@ func (h *OAuthHandler) RefreshToken(refTokenParams *RefreshTokenParams) (int, *A
176177
action: "refresh_token",
177178
stats: h.stats,
178179
}
179-
return h.GetTokenInfo(refTokenParams, "Refresh token", authStats)
180+
statsHandler := NewStatsHandlerFromOAuthStats(authStats)
181+
return h.GetTokenInfo(refTokenParams, "Refresh token", statsHandler)
180182
}
181183

182-
func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse, error) {
184+
func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, statsHandler OAuthStatsHandler) (int, *AuthResponse, error) {
183185
log := h.Logger.Withn(
184186
logger.NewStringField("Call Type", logTypeName),
185187
logger.NewStringField("AccountId", refTokenParams.AccountID),
@@ -189,13 +191,13 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN
189191
)
190192
log.Debugn("[request] :: Get Token Info request received")
191193
startTime := time.Now()
192-
defer func() {
193-
authStats.statName = GetOAuthActionStatName("total_latency")
194-
authStats.isCallToCpApi = false
195-
authStats.SendTimerStats(startTime)
196-
}()
197194
h.CacheMutex.Lock(refTokenParams.AccountID)
198195
defer h.CacheMutex.Unlock(refTokenParams.AccountID)
196+
defer func() {
197+
statsHandler.SendTiming(startTime, "total_latency", stats.Tags{
198+
"isCallToCpApi": "false",
199+
})
200+
}()
199201
refTokenBody := RefreshTokenBodyParams{}
200202
storedCache, ok := h.Cache.Load(refTokenParams.AccountID)
201203
if ok {
@@ -205,7 +207,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN
205207
return http.StatusInternalServerError, nil, errors.New("failed to type assert the stored cache")
206208
}
207209
// TODO: verify if the storedCache is nil at this point
208-
if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, authStats) {
210+
if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, statsHandler) {
209211
return http.StatusOK, cachedSecret, nil
210212
}
211213
// Refresh token preparation
@@ -214,7 +216,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN
214216
ExpiredSecret: refTokenParams.Secret,
215217
}
216218
}
217-
statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName)
219+
statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, statsHandler, logTypeName)
218220
// handling of refresh token response
219221
if statusCode == http.StatusOK {
220222
// fetching/refreshing through control plane was successful
@@ -241,11 +243,7 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
241243
action: action,
242244
stats: h.stats,
243245
}
244-
defer func() {
245-
authStatusToggleStats.statName = GetOAuthActionStatName("total_latency")
246-
authStatusToggleStats.isCallToCpApi = false
247-
authStatusToggleStats.SendTimerStats(authErrHandlerTimeStart)
248-
}()
246+
statsHandler := NewStatsHandlerFromOAuthStats(authStatusToggleStats)
249247
h.CacheMutex.Lock(params.RudderAccountID)
250248
isAuthStatusUpdateActive, isAuthStatusUpdateReqPresent := h.AuthStatusUpdateActiveMap[destinationId]
251249
if isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive {
@@ -266,6 +264,11 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
266264
h.Cache.Delete(params.RudderAccountID)
267265
h.CacheMutex.Unlock(params.RudderAccountID)
268266
}()
267+
defer func() {
268+
statsHandler.SendTiming(authErrHandlerTimeStart, "total_latency", stats.Tags{
269+
"isCallToCpApi": "false",
270+
})
271+
}()
269272

270273
authStatusToggleUrl := fmt.Sprintf("%s/workspaces/%s/destinations/%s/authStatus/toggle", h.ConfigBEURL, params.WorkspaceID, destinationId)
271274

@@ -278,14 +281,15 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
278281
RequestType: action,
279282
BasicAuthUser: h.Identity(),
280283
}
281-
authStatusToggleStats.statName = GetOAuthActionStatName("request_sent")
282-
authStatusToggleStats.isCallToCpApi = true
283-
authStatusToggleStats.SendCountStat()
284+
statsHandler.Increment("request_sent", stats.Tags{
285+
"isCallToCpApi": "true",
286+
})
284287

285288
cpiCallStartTime := time.Now()
286289
statusCode, respBody = h.CpConn.CpApiCall(authStatusInactiveCpReq)
287-
authStatusToggleStats.statName = GetOAuthActionStatName("request_latency")
288-
authStatusToggleStats.SendTimerStats(cpiCallStartTime)
290+
statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{
291+
"isCallToCpApi": "true",
292+
})
289293
h.Logger.Debugn("[request] :: Response from CP for auth status inactive req",
290294
logger.NewIntField("StatusCode", int64(statusCode)),
291295
logger.NewStringField("Response", respBody))
@@ -299,18 +303,20 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
299303
} else {
300304
msg = fmt.Sprintf("Could not update authStatus to inactive for destination: %v", authStatusToggleRes.Message)
301305
}
302-
authStatusToggleStats.statName = GetOAuthActionStatName("request")
303-
authStatusToggleStats.errorMessage = msg
304-
authStatusToggleStats.SendCountStat()
306+
statsHandler.Increment("request", stats.Tags{
307+
"errorMessage": msg,
308+
"isCallToCpApi": "true",
309+
})
305310
return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error()
306311
}
307312
h.Logger.Debugn("[request] :: (Write) auth status inactive Response received",
308313
logger.NewIntField("StatusCode", int64(statusCode)),
309314
logger.NewStringField("Response", respBody))
310-
authStatusToggleStats.statName = GetOAuthActionStatName("request")
311-
authStatusToggleStats.errorMessage = ""
312-
authStatusToggleStats.SendCountStat()
313315

316+
statsHandler.Increment("request", stats.Tags{
317+
"errorMessage": "",
318+
"isCallToCpApi": "true",
319+
})
314320
return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error()
315321
}
316322

@@ -342,14 +348,15 @@ func (h *OAuthHandler) GetRefreshTokenErrResp(response string, accountSecret *Ac
342348
// This method hits the Control Plane to get the account information
343349
// As well update the account information into the destAuthInfoMap(which acts as an in-memory cache)
344350
func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams, refTokenBody RefreshTokenBodyParams,
345-
authStats *OAuthStats, logTypeName string,
351+
statsHandler OAuthStatsHandler, logTypeName string,
346352
) (int, *AuthResponse, error) {
353+
actionType := strings.Join(strings.Fields(strings.ToLower(logTypeName)), "_")
347354
refreshUrl := fmt.Sprintf("%s/destination/workspaces/%s/accounts/%s/token", h.ConfigBEURL, refTokenParams.WorkspaceID, refTokenParams.AccountID)
348355
res, err := json.Marshal(refTokenBody)
349356
if err != nil {
350-
authStats.statName = GetOAuthActionStatName("request")
351-
authStats.errorMessage = "error in marshalling refresh token body"
352-
authStats.SendCountStat()
357+
statsHandler.Increment("request", stats.Tags{
358+
"errorMessage": "error in marshalling refresh token body",
359+
})
353360
return http.StatusInternalServerError, nil, err
354361
}
355362
refreshCpReq := &controlplane.Request{
@@ -358,20 +365,21 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams
358365
ContentType: "application/json; charset=utf-8",
359366
Body: string(res),
360367
DestName: refTokenParams.DestDefName,
361-
RequestType: authStats.action,
368+
RequestType: actionType,
362369
BasicAuthUser: h.TokenProvider.Identity(),
363370
}
364371
var accountSecret AccountSecret
365372
// Stat for counting number of Refresh Token endpoint calls
366-
authStats.statName = GetOAuthActionStatName("request_sent")
367-
authStats.isCallToCpApi = true
368-
authStats.errorMessage = ""
369-
authStats.SendCountStat()
373+
statsHandler.Increment("request_sent", stats.Tags{
374+
"isCallToCpApi": "true",
375+
"errorMessage": "",
376+
})
370377

371378
cpiCallStartTime := time.Now()
372379
statusCode, response := h.CpConn.CpApiCall(refreshCpReq)
373-
authStats.statName = GetOAuthActionStatName("request_latency")
374-
authStats.SendTimerStats(cpiCallStartTime)
380+
statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{
381+
"isCallToCpApi": "true",
382+
})
375383

376384
log := h.Logger.Withn(logger.NewIntField("StatusCode", int64(statusCode)),
377385
logger.NewIntField("WorkerId", int64(refTokenParams.WorkerID)),
@@ -380,9 +388,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams
380388

381389
// Empty Refresh token response
382390
if !routerutils.IsNotEmptyString(response) {
383-
authStats.statName = GetOAuthActionStatName("request")
384-
authStats.errorMessage = "Empty secret"
385-
authStats.SendCountStat()
391+
statsHandler.Increment("request", stats.Tags{
392+
"errorMessage": "Empty secret",
393+
"isCallToCpApi": "true",
394+
})
386395
// Setting empty accessToken value into in-memory auth info map(cache)
387396
h.Logger.Debugn("Empty response from Control-Plane",
388397
logger.NewStringField("Response", response),
@@ -398,19 +407,21 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams
398407
Err: errType,
399408
ErrorMessage: refErrMsg,
400409
}
401-
authStats.statName = GetOAuthActionStatName("request")
402-
authStats.errorMessage = errType
403-
authStats.SendCountStat()
410+
statsHandler.Increment("request", stats.Tags{
411+
"errorMessage": errType,
412+
"isCallToCpApi": "true",
413+
})
404414
if authResponse.Err == common.RefTokenInvalidGrant {
405415
// Should abort the event as refresh is not going to work
406416
// until we have new refresh token for the account
407417
return http.StatusBadRequest, authResponse, fmt.Errorf("invalid grant")
408418
}
409419
return http.StatusInternalServerError, authResponse, fmt.Errorf("error occurred while fetching/refreshing account info from CP: %s", refErrMsg)
410420
}
411-
authStats.statName = GetOAuthActionStatName("request")
412-
authStats.errorMessage = ""
413-
authStats.SendCountStat()
421+
statsHandler.Increment("request", stats.Tags{
422+
"errorMessage": "",
423+
"isCallToCpApi": "true",
424+
})
414425
log.Debugn("[request] :: (Write) Account Secret received")
415426
// Store expirationDate information
416427
accountSecret.ExpirationDate = gjson.Get(response, "secret.expirationDate").String()

services/oauth/v2/stats.go

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package v2
22

33
import (
44
"strconv"
5+
"strings"
56
"time"
67

8+
"github.com/samber/lo"
9+
710
"github.com/rudderlabs/rudder-go-kit/stats"
811
"github.com/rudderlabs/rudder-server/services/oauth/v2/common"
912
)
1013

14+
const OAUTH_V2_STAT_PREFIX = "oauth_action"
15+
1116
type OAuthStats struct {
1217
stats stats.Stats
1318
id string // destinationId -> for action == auth_status_inactive, accountId -> for action == refresh_token/fetch_token
@@ -18,40 +23,45 @@ type OAuthStats struct {
1823
isCallToCpApi bool // is a call being made to control-plane APIs
1924
authErrCategory string // for action=refresh_token -> REFRESH_TOKEN, for action=fetch_token -> "", for action=auth_status_inactive -> auth_status_inactive
2025
destDefName string
21-
isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor
2226
flowType common.RudderFlow // delivery, delete
2327
action string // refresh_token, fetch_token, auth_status_inactive
2428
}
2529

26-
func (s *OAuthStats) SendTimerStats(startTime time.Time) {
27-
statsTags := stats.Tags{
28-
"id": s.id,
29-
"workspaceId": s.workspaceID,
30-
"rudderCategory": s.rudderCategory,
31-
"isCallToCpApi": strconv.FormatBool(s.isCallToCpApi),
32-
"authErrCategory": s.authErrCategory,
33-
"destType": s.destDefName,
34-
"flowType": string(s.flowType),
35-
"action": s.action,
30+
type OAuthStatsHandler struct {
31+
stats stats.Stats
32+
defaultTags stats.Tags
33+
}
34+
35+
func GetDefaultTagsFromOAuthStats(oauthStats *OAuthStats) stats.Tags {
36+
return stats.Tags{
37+
"id": oauthStats.id,
38+
"workspaceId": oauthStats.workspaceID,
39+
"rudderCategory": "destination",
40+
"isCallToCpApi": strconv.FormatBool(oauthStats.isCallToCpApi),
41+
"authErrCategory": oauthStats.authErrCategory,
42+
"destType": oauthStats.destDefName,
43+
"flowType": string(oauthStats.flowType),
44+
"action": oauthStats.action,
3645
"oauthVersion": "v2",
3746
}
38-
s.stats.NewTaggedStat(s.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime))
3947
}
4048

41-
// SendCountStat Send count type stats related to OAuth(Destination)
42-
func (s *OAuthStats) SendCountStat() {
43-
statsTags := stats.Tags{
44-
"oauthVersion": "v2",
45-
"id": s.id,
46-
"workspaceId": s.workspaceID,
47-
"rudderCategory": s.rudderCategory,
48-
"errorMessage": s.errorMessage,
49-
"isCallToCpApi": strconv.FormatBool(s.isCallToCpApi),
50-
"authErrCategory": s.authErrCategory,
51-
"destType": s.destDefName,
52-
"isTokenFetch": strconv.FormatBool(s.isTokenFetch),
53-
"flowType": string(s.flowType),
54-
"action": s.action,
49+
func NewStatsHandlerFromOAuthStats(oauthStats *OAuthStats) OAuthStatsHandler {
50+
defaultTags := GetDefaultTagsFromOAuthStats(oauthStats)
51+
return OAuthStatsHandler{
52+
stats: oauthStats.stats,
53+
defaultTags: defaultTags,
5554
}
56-
s.stats.NewTaggedStat(s.statName, stats.CountType, statsTags).Increment()
55+
}
56+
57+
func (m *OAuthStatsHandler) Increment(statSuffix string, tags stats.Tags) {
58+
statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_")
59+
allTags := lo.Assign(m.defaultTags, tags)
60+
m.stats.NewTaggedStat(statName, stats.CountType, allTags).Increment()
61+
}
62+
63+
func (m *OAuthStatsHandler) SendTiming(startTime time.Time, statSuffix string, tags stats.Tags) {
64+
statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_")
65+
allTags := lo.Assign(m.defaultTags, tags)
66+
m.stats.NewTaggedStat(statName, stats.TimerType, allTags).SendTiming(time.Since(startTime))
5767
}

services/oauth/v2/utils.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"time"
99

10+
"github.com/rudderlabs/rudder-go-kit/stats"
1011
routerutils "github.com/rudderlabs/rudder-server/router/utils"
1112
"github.com/rudderlabs/rudder-server/services/oauth/v2/common"
1213
"github.com/rudderlabs/rudder-server/utils/misc"
@@ -21,8 +22,8 @@ func GetOAuthActionStatName(stat string) string {
2122
return fmt.Sprintf("oauth_action_%v", stat)
2223
}
2324

24-
func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, stats *OAuthStats) bool {
25-
if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, stats) {
25+
func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, statsHandler OAuthStatsHandler) bool {
26+
if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, &statsHandler) {
2627
return true
2728
}
2829
if !routerutils.IsNotEmptyString(string(oldSecret)) {
@@ -31,12 +32,12 @@ func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiry
3132
return bytes.Equal(secret.Secret, oldSecret)
3233
}
3334

34-
func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, stats *OAuthStats) bool {
35+
func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, statsHandler *OAuthStatsHandler) bool {
3536
date, err := time.Parse(misc.RFC3339Milli, expirationDate)
3637
if err != nil {
37-
stats.errorMessage = "parsing failed"
38-
stats.statName = GetOAuthActionStatName("proactive_token_refresh")
39-
stats.SendCountStat()
38+
statsHandler.Increment("proactive_token_refresh", stats.Tags{
39+
"errorMessage": "parsing failed",
40+
})
4041
return false
4142
}
4243
return date.Before(time.Now().Add(expirationTimeDiff))

0 commit comments

Comments
 (0)