From e3a6826903c6c75e9973e3735e5abece6af687ab Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Wed, 23 Apr 2025 17:28:29 +0530 Subject: [PATCH 01/10] chore: remove oauth v1 code from router --- router/handle.go | 55 ----------- router/handle_lifecycle.go | 3 +- router/transformer/transformer.go | 131 ++++++++----------------- router/transformer/transformer_test.go | 33 ++----- router/types.go | 1 - router/worker.go | 81 +++++++-------- router/worker_test.go | 8 +- 7 files changed, 93 insertions(+), 219 deletions(-) diff --git a/router/handle.go b/router/handle.go index c05a0f4cab..6fbc9930bd 100644 --- a/router/handle.go +++ b/router/handle.go @@ -8,7 +8,6 @@ import ( "net/http" "sort" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -732,60 +731,6 @@ func (*Handle) crashRecover() { // NO-OP } -func (rt *Handle) handleOAuthDestResponse(params *HandleDestOAuthRespParams, authErrorCategory string) (int, string, string) { - trRespStatusCode := params.trRespStCd - trRespBody := params.trRespBody - destinationJob := params.destinationJob - - workspaceID := destinationJob.JobMetadataArray[0].WorkspaceID - // Check the category - // Trigger the refresh endpoint/disable endpoint - rudderAccountID := oauth.GetAccountId(destinationJob.Destination.Config, oauth.DeliveryAccountIdKey) - if strings.TrimSpace(rudderAccountID) == "" { - return trRespStatusCode, trRespBody, params.contentType - } - switch authErrorCategory { - case oauth.AUTH_STATUS_INACTIVE: - authStatusStCd := rt.updateAuthStatusToInactive(&destinationJob.Destination, workspaceID, rudderAccountID) - authStatusMsg := gjson.Get(trRespBody, "message").Raw - return authStatusStCd, authStatusMsg, "text/plain; charset=utf-8" - case oauth.REFRESH_TOKEN: - refTokenParams := &oauth.RefreshTokenParams{ - Secret: params.secret, - WorkspaceId: workspaceID, - AccountId: rudderAccountID, - DestDefName: destinationJob.Destination.DestinationDefinition.Name, - WorkerId: params.workerID, - } - errCatStatusCode, refSecret := rt.oauth.RefreshToken(refTokenParams) - if routerutils.IsNotEmptyString(refSecret.Err) && refSecret.Err == oauth.REF_TOKEN_INVALID_GRANT { - // In-case the refresh token has been revoked, this error comes in - // Even trying to refresh the token also doesn't work here. Hence, this would be more ideal to Abort Events - // As well as to disable destination as well. - // Alert the user in this error as well, to check if the refresh token also has been revoked & fix it - authStatusInactiveStCode := rt.updateAuthStatusToInactive(&destinationJob.Destination, workspaceID, rudderAccountID) - stats.Default.NewTaggedStat(oauth.REF_TOKEN_INVALID_GRANT, stats.CountType, stats.Tags{ - "destinationId": destinationJob.Destination.ID, - "workspaceId": refTokenParams.WorkspaceId, - "accountId": refTokenParams.AccountId, - "destType": refTokenParams.DestDefName, - "flowType": string(oauth.RudderFlow_Delivery), - }).Increment() - rt.logger.Errorf(`[OAuth request] Aborting the event as %v`, oauth.REF_TOKEN_INVALID_GRANT) - return authStatusInactiveStCode, refSecret.ErrorMessage, "text/plain; charset=utf-8" - } - // Error while refreshing the token or Has an error while refreshing or sending empty access token - if errCatStatusCode != http.StatusOK || routerutils.IsNotEmptyString(refSecret.Err) { - return http.StatusTooManyRequests, refSecret.Err, "text/plain; charset=utf-8" - } - // Retry with Refreshed Token by failing with 5xx - return http.StatusInternalServerError, trRespBody, params.contentType - default: - // By default, send the status code & response from transformed response directly - return trRespStatusCode, trRespBody, params.contentType - } -} - func (rt *Handle) updateAuthStatusToInactive(destination *backendconfig.DestinationT, workspaceID, rudderAccountId string) int { inactiveAuthStatusStatTags := stats.Tags{ "id": destination.ID, diff --git a/router/handle_lifecycle.go b/router/handle_lifecycle.go index baaeba382b..9e00d8ccc4 100644 --- a/router/handle_lifecycle.go +++ b/router/handle_lifecycle.go @@ -126,7 +126,7 @@ func (rt *Handle) Setup( rt.throttlingErrorStat = stats.Default.NewTaggedStat("router_throttling_error", stats.CountType, statTags) rt.throttledStat = stats.Default.NewTaggedStat("router_throttled", stats.CountType, statTags) rt.transformer = transformer.NewTransformer(rt.netClientTimeout, rt.transformerTimeout, - backendConfig, rt.reloadableConfig.oauthV2Enabled, + backendConfig, rt.reloadableConfig.oauthV2ExpirationTimeDiff, rt.transformerFeaturesService, ) @@ -317,7 +317,6 @@ func (rt *Handle) setupReloadableVars() { rt.reloadableConfig.pickupFlushInterval = config.GetReloadableDurationVar(2, time.Second, "Router.pickupFlushInterval") rt.reloadableConfig.failingJobsPenaltySleep = config.GetReloadableDurationVar(2000, time.Millisecond, "Router.failingJobsPenaltySleep") rt.reloadableConfig.failingJobsPenaltyThreshold = config.GetReloadableFloat64Var(0.6, "Router.failingJobsPenaltyThreshold") - rt.reloadableConfig.oauthV2Enabled = config.GetReloadableBoolVar(false, "Router."+rt.destType+".oauthV2Enabled", "Router.oauthV2Enabled") rt.reloadableConfig.oauthV2ExpirationTimeDiff = config.GetReloadableDurationVar(5, time.Minute, "Router."+rt.destType+".oauth.expirationTimeDiff", "Router.oauth.expirationTimeDiff") rt.diagnosisTickerTime = config.GetDurationVar(60, time.Second, "Diagnostics.routerTimePeriod", "Diagnostics.routerTimePeriodInS") rt.netClientTimeout = config.GetDurationVar(10, time.Second, diff --git a/router/transformer/transformer.go b/router/transformer/transformer.go index e8cda72c70..057d80c60b 100644 --- a/router/transformer/transformer.go +++ b/router/transformer/transformer.go @@ -25,7 +25,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/sync" backendconfig "github.com/rudderlabs/rudder-server/backend-config" - transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client" "github.com/rudderlabs/rudder-server/jsonrs" "github.com/rudderlabs/rudder-server/processor/integrations" "github.com/rudderlabs/rudder-server/router/types" @@ -49,10 +48,6 @@ const ( // handle is the handle for this class type handle struct { tr *http.Transport - // http client for router transformation request - client sysUtils.HTTPClientI - // Mockable http.client for transformer proxy request - proxyClient sysUtils.HTTPClientI // http client timeout for transformer proxy request destinationTimeout time.Duration // http client timeout for server-transformer request @@ -62,12 +57,10 @@ type handle struct { stats stats.Stats - // clientOAuthV2 is the HTTP client for router transformation requests using OAuth V2. - clientOAuthV2 *http.Client - // proxyClientOAuthV2 is the mockable HTTP client for transformer proxy requests using OAuth V2. - proxyClientOAuthV2 sysUtils.HTTPClientI - // oAuthV2EnabledLoader dynamically loads the OAuth V2 enabled status. - oAuthV2EnabledLoader config.ValueLoader[bool] + // clientOAuth is the HTTP client for router transformation requests using OAuth V2. + clientOAuth *http.Client + // proxyClientOAuth is the mockable HTTP client for transformer proxy requests using OAuth V2. + proxyClientOAuth sysUtils.HTTPClientI // expirationTimeDiff holds the configured time difference for token expiration. expirationTimeDiff config.ValueLoader[time.Duration] @@ -122,15 +115,13 @@ type Transformer interface { func NewTransformer( destinationTimeout, transformTimeout time.Duration, backendConfig backendconfig.BackendConfig, - oauthV2Enabled config.ValueLoader[bool], expirationTimeDiff config.ValueLoader[time.Duration], featuresService transformerfs.FeaturesService, ) Transformer { cache := oauthv2.NewCache() oauthLock := sync.NewPartitionRWLocker() handle := &handle{ - oAuthV2EnabledLoader: oauthV2Enabled, - expirationTimeDiff: expirationTimeDiff, + expirationTimeDiff: expirationTimeDiff, } handle.setup(destinationTimeout, transformTimeout, &cache, oauthLock, backendConfig, featuresService) return handle @@ -224,20 +215,15 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra req.Header.Set("X-Feature-Gzip-Support", "?1") // Header to let transformer know that the client understands event filter code req.Header.Set("X-Feature-Filter-Code", "?1") - if trans.oAuthV2EnabledLoader.Load() { - trans.logger.Debugn("[router transform]", logger.NewBoolField("oauthV2Enabled", true)) - destinationInfo := &oauthv2.DestinationInfo{ - Config: transformMessageCopy.Data[0].Destination.Config, - DefinitionConfig: transformMessageCopy.Data[0].Destination.DestinationDefinition.Config, - WorkspaceID: transformMessageCopy.Data[0].JobMetadata.WorkspaceID, - DefinitionName: transformMessageCopy.Data[0].Destination.DestinationDefinition.Name, - ID: transformMessageCopy.Data[0].Destination.ID, - } - req = req.WithContext(cntx.CtxWithDestInfo(req.Context(), destinationInfo)) - resp, err = trans.clientOAuthV2.Do(req) - } else { - resp, err = trans.client.Do(req) + destinationInfo := &oauthv2.DestinationInfo{ + Config: transformMessageCopy.Data[0].Destination.Config, + DefinitionConfig: transformMessageCopy.Data[0].Destination.DestinationDefinition.Config, + WorkspaceID: transformMessageCopy.Data[0].JobMetadata.WorkspaceID, + DefinitionName: transformMessageCopy.Data[0].Destination.DestinationDefinition.Name, + ID: transformMessageCopy.Data[0].Destination.ID, } + req = req.WithContext(cntx.CtxWithDestInfo(req.Context(), destinationInfo)) + resp, err = trans.clientOAuth.Do(req) duration := time.Since(s) trans.stats.NewTaggedStat("transformer_client_total_durations_seconds", stats.CountType, labels.ToStatsTag()).Count(int(duration.Seconds())) @@ -285,13 +271,11 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra } var transResp oauthv2.TransportResponse - if trans.oAuthV2EnabledLoader.Load() { - // We don't need to handle it, as we can receive a string response even before executing OAuth operations like Refresh Token or Auth Status Toggle. - // It's acceptable if the structure of respData doesn't match the oauthv2.TransportResponse struct. - err = jsonrs.Unmarshal(respData, &transResp) - if err == nil && transResp.OriginalResponse != "" { - respData = []byte(transResp.OriginalResponse) // re-assign originalResponse - } + // We don't need to handle it, as we can receive a string response even before executing OAuth operations like Refresh Token or Auth Status Toggle. + // It's acceptable if the structure of respData doesn't match the oauthv2.TransportResponse struct. + err = jsonrs.Unmarshal(respData, &transResp) + if err == nil && transResp.OriginalResponse != "" { + respData = []byte(transResp.OriginalResponse) // re-assign originalResponse } if resp.StatusCode == http.StatusOK { @@ -488,17 +472,15 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ respData will be in ProxyResponseV0 or ProxyResponseV1 */ var transportResponse oauthv2.TransportResponse // response that we get from oauth-interceptor in postRoundTrip - if trans.oAuthV2EnabledLoader.Load() { - _ = jsonrs.Unmarshal(respData, &transportResponse) - // unmarshal unsuccessful scenarios - // if respData is not a valid json - if transportResponse.OriginalResponse != "" { - respData = []byte(transportResponse.OriginalResponse) - } + _ = jsonrs.Unmarshal(respData, &transportResponse) + // unmarshal unsuccessful scenarios + // if respData is not a valid json + if transportResponse.OriginalResponse != "" { + respData = []byte(transportResponse.OriginalResponse) + } - if transportResponse.InterceptorResponse.StatusCode > 0 { - respCode = transportResponse.InterceptorResponse.StatusCode - } + if transportResponse.InterceptorResponse.StatusCode > 0 { + respCode = transportResponse.InterceptorResponse.StatusCode } /** @@ -527,24 +509,20 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ } } - if trans.oAuthV2EnabledLoader.Load() { - for _, metadata := range proxyReqParams.ResponseData.Metadata { - // Conditions for which InterceptorResponse.StatusCode/Response will not be empty - // 1. authErrorCategory == CategoryRefreshToken - // 2. authErrorCategory == CategoryAuthStatusInactive - // 3. Any error occurred while performing authStatusInactive / RefreshToken - // Under these conditions, we will have to propagate the response from interceptor to JobsDB - if transportResponse.InterceptorResponse.StatusCode > 0 { - transResp.routerJobResponseCodes[metadata.JobID] = transportResponse.InterceptorResponse.StatusCode - } - if transportResponse.InterceptorResponse.Response != "" { - transResp.routerJobResponseBodys[metadata.JobID] = transportResponse.InterceptorResponse.Response - } + for _, metadata := range proxyReqParams.ResponseData.Metadata { + // Conditions for which InterceptorResponse.StatusCode/Response will not be empty + // 1. authErrorCategory == CategoryRefreshToken + // 2. authErrorCategory == CategoryAuthStatusInactive + // 3. Any error occurred while performing authStatusInactive / RefreshToken + // Under these conditions, we will have to propagate the response from interceptor to JobsDB + if transportResponse.InterceptorResponse.StatusCode > 0 { + transResp.routerJobResponseCodes[metadata.JobID] = transportResponse.InterceptorResponse.StatusCode + } + if transportResponse.InterceptorResponse.Response != "" { + transResp.routerJobResponseBodys[metadata.JobID] = transportResponse.InterceptorResponse.Response } } - if trans.oAuthV2EnabledLoader.Load() && transportResponse.InterceptorResponse.Response != "" { - respData = []byte(transportResponse.InterceptorResponse.Response) - } + respData = []byte(transportResponse.InterceptorResponse.Response) trans.stats.NewTaggedStat("transformer_client_response_total_events", stats.CountType, labels).Count(len(transResp.routerJobResponseCodes)) @@ -578,8 +556,6 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c // Destination API timeout // Basically this timeout we will configure when we make final call to destination to send event trans.destinationTimeout = destinationTimeout - // This client is used for Router Transformation - trans.client = transformerclient.NewClient(trans.transformerClientConfig()) optionalArgs := &oauthv2httpclient.HttpClientOptionalArgs{ Locker: locker, Augmenter: extensions.RouterHeaderAugmenter, @@ -587,17 +563,15 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c Logger: logger.NewLogger().Child("TransformerHttpClient"), } // This client is used for Router Transformation using oauthV2 - trans.clientOAuthV2 = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformResponse, optionalArgs) + trans.clientOAuth = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformResponse, optionalArgs) proxyClientOptionalArgs := &oauthv2httpclient.HttpClientOptionalArgs{ Locker: locker, ExpirationTimeDiff: (trans.expirationTimeDiff).Load(), Logger: logger.NewLogger().Child("TransformerProxyHttpClient"), } - // This client is used for Transformer Proxy(delivered from transformer to destination) - trans.proxyClient = transformerclient.NewClient(trans.transformerClientConfig()) // This client is used for Transformer Proxy(delivered from transformer to destination) using oauthV2 - trans.proxyClientOAuthV2 = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformProxyResponse, proxyClientOptionalArgs) + trans.proxyClientOAuth = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformProxyResponse, proxyClientOptionalArgs) trans.stats = stats.Default trans.transformRequestTimerStat = stats.Default.NewStat("router.transformer_request_time", stats.TimerType) trans.compactionEnabled = config.GetReloadableBoolVar(true, "Router.DestinationTransformer.compactionEnabled", "Transformer.compactionEnabled") @@ -609,20 +583,6 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c } } -func (trans *handle) transformerClientConfig() *transformerclient.ClientConfig { - transformerClientConfig := &transformerclient.ClientConfig{ - ClientTimeout: config.GetDurationVar(600, time.Second, "HttpClient.backendProxy.timeout", "HttpClient.routerTransformer.timeout"), - ClientTTL: config.GetDurationVar(10, time.Second, "Transformer.Client.ttl"), - ClientType: config.GetStringVar("stdlib", "Transformer.Client.type"), - PickerType: config.GetStringVar("power_of_two", "Transformer.Client.httplb.pickerType"), - } - transformerClientConfig.TransportConfig.DisableKeepAlives = config.GetBoolVar(true, "Transformer.Client.disableKeepAlives") - transformerClientConfig.TransportConfig.MaxConnsPerHost = config.GetIntVar(100, 1, "Transformer.Client.maxHTTPConnections") - transformerClientConfig.TransportConfig.MaxIdleConnsPerHost = config.GetIntVar(10, 1, "Transformer.Client.maxHTTPIdleConnections") - transformerClientConfig.TransportConfig.IdleConnTimeout = config.GetDurationVar(30, time.Second, "Transformer.Client.maxIdleConnDuration") - return transformerClientConfig -} - type httpProxyResponse struct { respData []byte statusCode int @@ -648,14 +608,9 @@ func (trans *handle) doProxyRequest(ctx context.Context, proxyUrl string, proxyR req.Header.Set("RdProxy-Timeout", strconv.FormatInt(trans.destinationTimeout.Milliseconds(), 10)) httpReqStTime := time.Now() var resp *http.Response - if trans.oAuthV2EnabledLoader.Load() { - trans.logger.Debugn("[router delivery]", logger.NewBoolField("oauthV2Enabled", true)) - req = req.WithContext(cntx.CtxWithDestInfo(req.Context(), proxyReqParams.DestInfo)) - req = req.WithContext(cntx.CtxWithSecret(req.Context(), proxyReqParams.ResponseData.Metadata[0].Secret)) - resp, err = trans.proxyClientOAuthV2.Do(req) - } else { - resp, err = trans.proxyClient.Do(req) - } + req = req.WithContext(cntx.CtxWithDestInfo(req.Context(), proxyReqParams.DestInfo)) + req = req.WithContext(cntx.CtxWithSecret(req.Context(), proxyReqParams.ResponseData.Metadata[0].Secret)) + resp, err = trans.proxyClientOAuth.Do(req) reqRoundTripTime := time.Since(httpReqStTime) // This stat will be useful in understanding the round trip time taken for the http req // between server and transformer diff --git a/router/transformer/transformer_test.go b/router/transformer/transformer_test.go index e81eb7b22d..0b8c1b3f59 100644 --- a/router/transformer/transformer_test.go +++ b/router/transformer/transformer_test.go @@ -338,9 +338,8 @@ func TestProxyRequest(t *testing.T) { srv := httptest.NewServer(mockProxyHandler(tc.proxy.timeout, tc.proxy.code, tc.proxy.response)) defer srv.Close() - isOAuthV2EnabledLoader := config.SingleValueLoader(false) expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(tc.rtTimeout, httpClientTimeout, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(tc.rtTimeout, httpClientTimeout, nil, expTimeDiff, nil) ctx := context.TODO() reqParams := &ProxyRequestParams{ ResponseData: tc.postParameters, @@ -362,14 +361,13 @@ func TestProxyRequest(t *testing.T) { srv := httptest.NewServer(mockProxyHandler(tc.proxy.timeout, tc.proxy.code, tc.proxy.response)) defer srv.Close() var tr Transformer - isOAuthV2EnabledLoader := config.SingleValueLoader(false) expTimeDiff := config.SingleValueLoader(1 * time.Minute) // Logic for executing test-cases not manipulating test-cases if tc.rtTimeout.Milliseconds() > 0 { - tr = NewTransformer(tc.rtTimeout, httpClientTimeout, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr = NewTransformer(tc.rtTimeout, httpClientTimeout, nil, expTimeDiff, nil) } else { // Just a default value - tr = NewTransformer(2*time.Millisecond, httpClientTimeout, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr = NewTransformer(2*time.Millisecond, httpClientTimeout, nil, expTimeDiff, nil) } // Logic to include context timing out ctx := context.TODO() @@ -632,7 +630,6 @@ func TestRouterTransformationWithOAuthV2(t *testing.T) { cfgBeSvr := httptest.NewServer(cpRespProducer.MockCpRequests()) - isOAuthV2EnabledLoader := config.SingleValueLoader(true) defer svr.Close() defer cfgBeSvr.Close() t.Setenv("DEST_TRANSFORM_URL", svr.URL) @@ -642,7 +639,7 @@ func TestRouterTransformationWithOAuthV2(t *testing.T) { backendconfig.Init() expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(time.Minute, time.Minute, mockBackendConfig, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(time.Minute, time.Minute, mockBackendConfig, expTimeDiff, nil) transformMsg := types.TransformMessageT{ Data: tc.inputEvents, @@ -1671,7 +1668,6 @@ func TestProxyRequestWithOAuthV2(t *testing.T) { cfgBeSvr := httptest.NewServer(cpRespProducer.MockCpRequests()) - isOAuthV2EnabledLoader := config.SingleValueLoader(true) defer svr.Close() defer cfgBeSvr.Close() t.Setenv("DEST_TRANSFORM_URL", svr.URL) @@ -1680,7 +1676,7 @@ func TestProxyRequestWithOAuthV2(t *testing.T) { backendconfig.Init() expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(time.Minute, time.Minute, mockBackendConfig, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(time.Minute, time.Minute, mockBackendConfig, expTimeDiff, nil) var adapter transformerProxyAdapter adapter = NewTransformerProxyAdapter("v1", loggerOverride) @@ -1741,11 +1737,10 @@ func TestTransformNoValidationErrors(t *testing.T) { _, err = w.Write(b) require.NoError(t, err) })) - isOAuthV2EnabledLoader := config.SingleValueLoader(false) defer svr.Close() t.Setenv("DEST_TRANSFORM_URL", svr.URL) expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(time.Minute, time.Minute, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(time.Minute, time.Minute, nil, expTimeDiff, nil) transformMessage := types.TransformMessageT{ Data: []types.RouterJobT{ @@ -1776,9 +1771,8 @@ func TestTransformValidationUnmarshallingError(t *testing.T) { })) defer svr.Close() t.Setenv("DEST_TRANSFORM_URL", svr.URL) - isOAuthV2EnabledLoader := config.SingleValueLoader(false) expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(time.Minute, time.Minute, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(time.Minute, time.Minute, nil, expTimeDiff, nil) transformMessage := types.TransformMessageT{ Data: []types.RouterJobT{ @@ -1818,9 +1812,8 @@ func TestTransformValidationInOutMismatchError(t *testing.T) { })) defer svr.Close() t.Setenv("DEST_TRANSFORM_URL", svr.URL) - isOAuthV2EnabledLoader := config.SingleValueLoader(false) expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(time.Minute, time.Minute, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(time.Minute, time.Minute, nil, expTimeDiff, nil) transformMessage := types.TransformMessageT{ Data: []types.RouterJobT{ @@ -1859,9 +1852,8 @@ func TestTransformValidationJobIDMismatchError(t *testing.T) { })) defer svr.Close() t.Setenv("DEST_TRANSFORM_URL", svr.URL) - isOAuthV2EnabledLoader := config.SingleValueLoader(false) expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(time.Minute, time.Minute, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(time.Minute, time.Minute, nil, expTimeDiff, nil) transformMessage := types.TransformMessageT{ Data: []types.RouterJobT{ @@ -1908,9 +1900,8 @@ func TestDehydrateHydrate(t *testing.T) { require.NoError(t, err) })) config.Set("DEST_TRANSFORM_URL", srv.URL) - isOAuthV2EnabledLoader := config.SingleValueLoader(false) expTimeDiff := config.SingleValueLoader(1 * time.Minute) - tr := NewTransformer(time.Minute, time.Minute, nil, isOAuthV2EnabledLoader, expTimeDiff, nil) + tr := NewTransformer(time.Minute, time.Minute, nil, expTimeDiff, nil) transformerResponse := tr.Transform(BATCH, &transformMessage) @@ -1968,7 +1959,6 @@ func TestTransformerMetrics(t *testing.T) { })) defer srv.Close() - isOAuthV2EnabledLoader := config.SingleValueLoader(false) expTimeDiff := config.SingleValueLoader(1 * time.Minute) t.Setenv("DEST_TRANSFORM_URL", srv.URL) @@ -1976,10 +1966,7 @@ func TestTransformerMetrics(t *testing.T) { tr := &handle{ stats: statsStore, logger: logger.NOP, - client: srv.Client(), - proxyClient: srv.Client(), tr: &http.Transport{}, - oAuthV2EnabledLoader: isOAuthV2EnabledLoader, expirationTimeDiff: expTimeDiff, transformRequestTimerStat: statsStore.NewStat("router.transformer_request_time", stats.TimerType), // Add this line } diff --git a/router/types.go b/router/types.go index 4ad3679b9e..1e985d706f 100644 --- a/router/types.go +++ b/router/types.go @@ -82,6 +82,5 @@ type reloadableConfig struct { transformerProxy config.ValueLoader[bool] skipRtAbortAlertForTransformation config.ValueLoader[bool] // represents if event delivery(via transformerProxy) should be alerted via router-aborted-count alert def skipRtAbortAlertForDelivery config.ValueLoader[bool] // represents if transformation(router or batch) should be alerted via router-aborted-count alert def - oauthV2Enabled config.ValueLoader[bool] oauthV2ExpirationTimeDiff config.ValueLoader[time.Duration] } diff --git a/router/worker.go b/router/worker.go index b40f87735d..67535ee56f 100644 --- a/router/worker.go +++ b/router/worker.go @@ -17,7 +17,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/processor/integrations" @@ -197,31 +196,26 @@ func (w *worker) workLoop() { } destination := batchDestination.Destination connection := conn.Connection - oauthV2Enabled := w.rt.reloadableConfig.oauthV2Enabled.Load() - // TODO: Remove later - w.logger.Debugn("[router worker]", - logger.NewBoolField("oauthV2Enabled", oauthV2Enabled), - obskit.DestinationType(destination.DestinationDefinition.Name), - ) - if w.rt.isOAuthDestination && !oauthV2Enabled { - rudderAccountID := oauth.GetAccountId(destination.Config, oauth.DeliveryAccountIdKey) - - if routerutils.IsNotEmptyString(rudderAccountID) { - w.logger.Debugf(`[%s][FetchToken] Token Fetch Method to be called`, destination.DestinationDefinition.Name) - // Get Access Token Information to send it as part of the event - tokenStatusCode, accountSecretInfo := w.rt.oauth.FetchToken(&oauth.RefreshTokenParams{ - AccountId: rudderAccountID, - WorkspaceId: jobMetadata.WorkspaceID, - DestDefName: destination.DestinationDefinition.Name, - }) - w.logger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo) - if tokenStatusCode == http.StatusOK { - jobMetadata.Secret = accountSecretInfo.Account.Secret - } else { - w.logger.Errorf(`[%s][FetchToken] Token Fetch Method error (statusCode, error): (%d, %s)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo.Err) - } - } - } + + // if w.rt.isOAuthDestination && !oauthV2Enabled { + // rudderAccountID := oauth.GetAccountId(destination.Config, oauth.DeliveryAccountIdKey) + + // if routerutils.IsNotEmptyString(rudderAccountID) { + // w.logger.Debugf(`[%s][FetchToken] Token Fetch Method to be called`, destination.DestinationDefinition.Name) + // // Get Access Token Information to send it as part of the event + // tokenStatusCode, accountSecretInfo := w.rt.oauth.FetchToken(&oauth.RefreshTokenParams{ + // AccountId: rudderAccountID, + // WorkspaceId: jobMetadata.WorkspaceID, + // DestDefName: destination.DestinationDefinition.Name, + // }) + // w.logger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo) + // if tokenStatusCode == http.StatusOK { + // jobMetadata.Secret = accountSecretInfo.Account.Secret + // } else { + // w.logger.Errorf(`[%s][FetchToken] Token Fetch Method error (statusCode, error): (%d, %s)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo.Err) + // } + // } + // } if w.rt.enableBatching { w.routerJobs = append(w.routerJobs, types.RouterJobT{ @@ -330,7 +324,7 @@ func (w *worker) transform(routerJobs []types.RouterJobT) []types.DestinationJob } } - if w.rt.isOAuthDestination && w.rt.reloadableConfig.oauthV2Enabled.Load() { + if w.rt.isOAuthDestination { return w.transformJobsPerDestination(routerJobs) } return w.transformJobs(routerJobs) @@ -808,29 +802,28 @@ func (w *worker) proxyRequest(ctx context.Context, destinationJob types.Destinat Adapter: transformer.NewTransformerProxyAdapter(w.rt.transformerFeaturesService.TransformerProxyVersion(), w.rt.logger), } rtlTime := time.Now() - oauthV2Enabled := w.rt.reloadableConfig.oauthV2Enabled.Load() proxyRequestResponse := w.rt.transformer.ProxyRequest(ctx, proxyReqparams) w.routerProxyStat.SendTiming(time.Since(rtlTime)) w.logger.Debugf(`[TransformerProxy] (Dest-%[1]v) {Job - %[2]v} Request ended`, w.rt.destType, jobID) if !oauth.IsOAuthDestination(destinationJob.Destination.DestinationDefinition.Config) { return proxyRequestResponse } - if proxyRequestResponse.ProxyRequestStatusCode != http.StatusOK && !oauthV2Enabled { - w.logger.Debugn(`Sending for OAuth destination`) - // Token from header of the request - respStatusCode, respBodyTemp, contentType := w.rt.handleOAuthDestResponse(&HandleDestOAuthRespParams{ - ctx: ctx, - destinationJob: destinationJob, - workerID: w.id, - trRespStCd: proxyRequestResponse.ProxyRequestStatusCode, - trRespBody: proxyRequestResponse.ProxyRequestResponseBody, - secret: m[0].Secret, - contentType: proxyRequestResponse.RespContentType, - }, proxyRequestResponse.OAuthErrorCategory) - - proxyRequestResponse.RespStatusCodes, proxyRequestResponse.RespBodys = w.prepareResponsesForJobs(&destinationJob, respStatusCode, respBodyTemp) - proxyRequestResponse.RespContentType = contentType - } + // if proxyRequestResponse.ProxyRequestStatusCode != http.StatusOK && !oauthV2Enabled { + // w.logger.Debugn(`Sending for OAuth destination`) + // // Token from header of the request + // respStatusCode, respBodyTemp, contentType := w.rt.handleOAuthDestResponse(&HandleDestOAuthRespParams{ + // ctx: ctx, + // destinationJob: destinationJob, + // workerID: w.id, + // trRespStCd: proxyRequestResponse.ProxyRequestStatusCode, + // trRespBody: proxyRequestResponse.ProxyRequestResponseBody, + // secret: m[0].Secret, + // contentType: proxyRequestResponse.RespContentType, + // }, proxyRequestResponse.OAuthErrorCategory) + + // proxyRequestResponse.RespStatusCodes, proxyRequestResponse.RespBodys = w.prepareResponsesForJobs(&destinationJob, respStatusCode, respBodyTemp) + // proxyRequestResponse.RespContentType = contentType + // } return proxyRequestResponse } diff --git a/router/worker_test.go b/router/worker_test.go index 68d93bcef0..2d4f3b9a59 100644 --- a/router/worker_test.go +++ b/router/worker_test.go @@ -517,9 +517,7 @@ func TestTransformForOAuthV2Destination(t *testing.T) { routerTransformInputCountStat: stats.NOP.NewTaggedStat("router_transform_input_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}), routerTransformOutputCountStat: stats.NOP.NewTaggedStat("router_transform_output_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}), isOAuthDestination: true, - reloadableConfig: &reloadableConfig{ - oauthV2Enabled: config.GetReloadableBoolVar(true), - }, + reloadableConfig: &reloadableConfig{}, }, } var limiterWg sync.WaitGroup @@ -676,9 +674,7 @@ func TestTransformForNonOAuthDestination(t *testing.T) { routerTransformInputCountStat: stats.NOP.NewTaggedStat("router_transform_input_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}), routerTransformOutputCountStat: stats.NOP.NewTaggedStat("router_transform_output_count", stats.CountType, stats.Tags{"destType": "some_dest_type"}), isOAuthDestination: false, - reloadableConfig: &reloadableConfig{ - oauthV2Enabled: config.GetReloadableBoolVar(true), - }, + reloadableConfig: &reloadableConfig{}, }, } var limiterWg sync.WaitGroup From c7e37cf4fab290364352e428d3a72ad83eeddd86 Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Thu, 24 Apr 2025 08:15:40 +0530 Subject: [PATCH 02/10] chore: remove oauth v1 module --- mocks/services/oauth/mock_oauth.go | 86 --- regulation-worker/cmd/main.go | 17 +- regulation-worker/internal/delete/api/api.go | 121 +--- .../internal/delete/api/api_test.go | 45 +- router/handle.go | 25 - router/handle_lifecycle.go | 11 +- router/worker.go | 40 -- runner/runner.go | 2 - services/oauth/oauth.go | 621 ------------------ services/oauth/oauth_test.go | 191 ------ services/oauth/v2/common/constants.go | 24 +- services/oauth/v2/common/types.go | 1 + services/oauth/v2/destination_info.go | 3 +- services/rmetrics/pending_events.go | 153 ----- services/rmetrics/pending_events_test.go | 91 --- 15 files changed, 46 insertions(+), 1385 deletions(-) delete mode 100644 mocks/services/oauth/mock_oauth.go delete mode 100644 services/oauth/oauth.go delete mode 100644 services/oauth/oauth_test.go delete mode 100644 services/rmetrics/pending_events.go delete mode 100644 services/rmetrics/pending_events_test.go diff --git a/mocks/services/oauth/mock_oauth.go b/mocks/services/oauth/mock_oauth.go deleted file mode 100644 index 1c2a34e2f0..0000000000 --- a/mocks/services/oauth/mock_oauth.go +++ /dev/null @@ -1,86 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/rudderlabs/rudder-server/services/oauth (interfaces: Authorizer) -// -// Generated by this command: -// -// mockgen -destination=../../mocks/services/oauth/mock_oauth.go -package=mocks_oauth github.com/rudderlabs/rudder-server/services/oauth Authorizer -// - -// Package mocks_oauth is a generated GoMock package. -package mocks_oauth - -import ( - reflect "reflect" - - oauth "github.com/rudderlabs/rudder-server/services/oauth" - gomock "go.uber.org/mock/gomock" -) - -// MockAuthorizer is a mock of Authorizer interface. -type MockAuthorizer struct { - ctrl *gomock.Controller - recorder *MockAuthorizerMockRecorder - isgomock struct{} -} - -// MockAuthorizerMockRecorder is the mock recorder for MockAuthorizer. -type MockAuthorizerMockRecorder struct { - mock *MockAuthorizer -} - -// NewMockAuthorizer creates a new mock instance. -func NewMockAuthorizer(ctrl *gomock.Controller) *MockAuthorizer { - mock := &MockAuthorizer{ctrl: ctrl} - mock.recorder = &MockAuthorizerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockAuthorizer) EXPECT() *MockAuthorizerMockRecorder { - return m.recorder -} - -// AuthStatusToggle mocks base method. -func (m *MockAuthorizer) AuthStatusToggle(arg0 *oauth.AuthStatusToggleParams) (int, string) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AuthStatusToggle", arg0) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(string) - return ret0, ret1 -} - -// AuthStatusToggle indicates an expected call of AuthStatusToggle. -func (mr *MockAuthorizerMockRecorder) AuthStatusToggle(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AuthStatusToggle", reflect.TypeOf((*MockAuthorizer)(nil).AuthStatusToggle), arg0) -} - -// FetchToken mocks base method. -func (m *MockAuthorizer) FetchToken(fetchTokenParams *oauth.RefreshTokenParams) (int, *oauth.AuthResponse) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchToken", fetchTokenParams) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(*oauth.AuthResponse) - return ret0, ret1 -} - -// FetchToken indicates an expected call of FetchToken. -func (mr *MockAuthorizerMockRecorder) FetchToken(fetchTokenParams any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchToken", reflect.TypeOf((*MockAuthorizer)(nil).FetchToken), fetchTokenParams) -} - -// RefreshToken mocks base method. -func (m *MockAuthorizer) RefreshToken(refTokenParams *oauth.RefreshTokenParams) (int, *oauth.AuthResponse) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RefreshToken", refTokenParams) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(*oauth.AuthResponse) - return ret0, ret1 -} - -// RefreshToken indicates an expected call of RefreshToken. -func (mr *MockAuthorizerMockRecorder) RefreshToken(refTokenParams any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshToken", reflect.TypeOf((*MockAuthorizer)(nil).RefreshToken), refTokenParams) -} diff --git a/regulation-worker/cmd/main.go b/regulation-worker/cmd/main.go index c130b45d17..8d23eb8d2e 100644 --- a/regulation-worker/cmd/main.go +++ b/regulation-worker/cmd/main.go @@ -26,7 +26,6 @@ import ( "github.com/rudderlabs/rudder-server/regulation-worker/internal/service" "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/services/diagnostics" - "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/crash" "github.com/rudderlabs/rudder-server/utils/misc" @@ -70,7 +69,6 @@ func Run(ctx context.Context) error { misc.Init() diagnostics.Init() backendconfig.Init() - oauth.Init() // initialise oauth if err := backendconfig.Setup(nil); err != nil { return fmt.Errorf("setting up backend config: %w", err) @@ -88,13 +86,9 @@ func Run(ctx context.Context) error { backendconfig.DefaultBackendConfig.WaitForConfig(ctx) identity := backendconfig.DefaultBackendConfig.Identity() dest.Start(ctx) - oauthV2Enabled := config.GetBoolVar(false, "RegulationWorker.oauthV2Enabled") - pkgLogger.Infon("[regulationApi]", logger.NewBoolField("oauthV2Enabled", oauthV2Enabled)) httpTimeout := config.GetDurationVar(60, time.Second, "HttpClient.regulationWorker.regulationManager.timeout") - // setting up oauth - OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete)) - apiManagerHttpClient := createHTTPClient(config, httpTimeout, oauthV2Enabled) + apiManagerHttpClient := createHTTPClient(config, httpTimeout) svc := service.JobSvc{ API: &client.JobAPI{ @@ -112,8 +106,6 @@ func Run(ctx context.Context) error { &api.APIManager{ Client: apiManagerHttpClient, DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"), - OAuth: OAuth, - IsOAuthV2Enabled: oauthV2Enabled, MaxOAuthRefreshRetryAttempts: config.GetInt("RegulationWorker.oauth.maxRefreshRetryAttempts", 1), TransformerFeaturesService: transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceOptions{ PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second), @@ -141,7 +133,7 @@ func withLoop(svc service.JobSvc) *service.Looper { } } -func createHTTPClient(conf *config.Config, httpTimeout time.Duration, oauthV2Enabled bool) *http.Client { +func createHTTPClient(conf *config.Config, httpTimeout time.Duration) *http.Client { cli := &http.Client{ Timeout: httpTimeout, Transport: &http.Transport{ @@ -151,9 +143,6 @@ func createHTTPClient(conf *config.Config, httpTimeout time.Duration, oauthV2Ena IdleConnTimeout: 300 * time.Second, }, } - if !oauthV2Enabled { - return cli - } cache := oauthv2.NewCache() oauthLock := kitsync.NewPartitionRWLocker() optionalArgs := oauthv2http.HttpClientOptionalArgs{ @@ -163,7 +152,7 @@ func createHTTPClient(conf *config.Config, httpTimeout time.Duration, oauthV2Ena } return oauthv2http.NewOAuthHttpClient( cli, - common.RudderFlow(oauth.RudderFlow_Delete), + common.RudderFlow(common.RudderFlowDelete), &cache, backendconfig.DefaultBackendConfig, api.GetAuthErrorCategoryFromResponse, &optionalArgs, ) diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index 5d66837d89..f58f1e3b9a 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -19,11 +19,8 @@ import ( "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jsonrs" "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" - "github.com/rudderlabs/rudder-server/services/oauth" oauthv2 "github.com/rudderlabs/rudder-server/services/oauth/v2" "github.com/rudderlabs/rudder-server/services/oauth/v2/common" cntx "github.com/rudderlabs/rudder-server/services/oauth/v2/context" @@ -39,15 +36,12 @@ var ( type APIManager struct { Client *http.Client DestTransformURL string - OAuth oauth.Authorizer MaxOAuthRefreshRetryAttempts int TransformerFeaturesService transformer.FeaturesService - IsOAuthV2Enabled bool } type oauthDetail struct { - secretToken *oauth.AuthResponse - id string + id string } func GetAuthErrorCategoryFromResponse(bodyBytes []byte) (string, error) { @@ -109,22 +103,8 @@ func (m *APIManager) deleteWithRetry(ctx context.Context, job model.Job, destina pkgLogger.Error(err) return model.JobStatus{Status: model.JobStatusFailed, Error: err} } - var oAuthDetail oauthDetail - if isOAuth && !m.IsOAuthV2Enabled { - oAuthDetail, err = m.getOAuthDetail(&destination, job.WorkspaceID) - if err != nil { - pkgLogger.Error(err) - return model.JobStatus{Status: model.JobStatusFailed, Error: err} - } - err = setOAuthHeader(oAuthDetail.secretToken, req) - if err != nil { - pkgLogger.Errorf("[%v] error occurred while setting oauth header for workspace: %v, destination: %v", destination.Name, job.WorkspaceID, destination.DestinationID) - pkgLogger.Error(err) - return model.JobStatus{Status: model.JobStatusFailed, Error: err} - } - } - if isOAuth && m.IsOAuthV2Enabled { + if isOAuth { req = req.WithContext(cntx.CtxWithDestInfo(req.Context(), dest)) } @@ -161,7 +141,7 @@ func (m *APIManager) deleteWithRetry(ctx context.Context, job model.Job, destina respStatusCode := resp.StatusCode respBodyBytes := bodyBytes // Post response work to be done for OAuthV2 - if isOAuth && m.IsOAuthV2Enabled { + if isOAuth { var transportResponse oauthv2.TransportResponse // We don't need to handle it, as we can receive a string response even before executing OAuth operations like Refresh Token or Auth Status Toggle. // It's acceptable if the structure of bodyBytes doesn't match the oauthv2.TransportResponse struct. @@ -187,7 +167,6 @@ func (m *APIManager) deleteWithRetry(ctx context.Context, job model.Job, destina job: job, isOAuthEnabled: isOAuth, currentOAuthRetryAttempt: currentOauthRetryAttempt, - oAuthDetail: oAuthDetail, responseBodyBytes: respBodyBytes, responseStatusCode: respStatusCode, }) @@ -232,82 +211,8 @@ func mapJobToPayload(job model.Job, destName string, destConfig map[string]inter func getOAuthErrorJob(jobResponses []JobRespSchema) (JobRespSchema, bool) { return lo.Find(jobResponses, func(item JobRespSchema) bool { - return lo.Contains([]string{oauth.AUTH_STATUS_INACTIVE, oauth.REFRESH_TOKEN}, item.AuthErrorCategory) - }) -} - -func setOAuthHeader(secretToken *oauth.AuthResponse, req *http.Request) error { - payload, marshalErr := jsonrs.Marshal(secretToken.Account) - if marshalErr != nil { - marshalFailErr := fmt.Sprintf("error while marshalling account secret information: %v", marshalErr) - pkgLogger.Errorf(marshalFailErr) - return errors.New(marshalFailErr) - } - req.Header.Set("X-Rudder-Dest-Info", string(payload)) - return nil -} - -func (m *APIManager) getOAuthDetail(destDetail *model.Destination, workspaceId string) (oauthDetail, error) { - id := oauth.GetAccountId(destDetail.Config, oauth.DeleteAccountIdKey) - if strings.TrimSpace(id) == "" { - return oauthDetail{}, fmt.Errorf("[%v] Delete account ID key (%v) is not present for destination: %v", destDetail.Name, oauth.DeleteAccountIdKey, destDetail.DestinationID) - } - tokenStatusCode, secretToken := m.OAuth.FetchToken(&oauth.RefreshTokenParams{ - AccountId: id, - WorkspaceId: workspaceId, - DestDefName: destDetail.Name, - }) - if tokenStatusCode != http.StatusOK { - return oauthDetail{}, fmt.Errorf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s", destDetail.Name, tokenStatusCode, secretToken.ErrorMessage) - } - return oauthDetail{ - id: id, - secretToken: secretToken, - }, nil -} - -func (m *APIManager) inactivateAuthStatus(destination *model.Destination, job model.Job, oAuthDetail oauthDetail) (jobStatus model.JobStatus) { - dest := &backendconfig.DestinationT{ - ID: destination.DestinationID, - Config: destination.Config, - DestinationDefinition: backendconfig.DestinationDefinitionT{ - Name: destination.Name, - Config: destination.DestDefConfig, - }, - } - _, resp := m.OAuth.AuthStatusToggle(&oauth.AuthStatusToggleParams{ - Destination: dest, - WorkspaceId: job.WorkspaceID, - RudderAccountId: oAuthDetail.id, - AuthStatus: oauth.AuthStatusInactive, + return lo.Contains([]string{common.AuthStatusInActive, common.CategoryRefreshToken}, item.AuthErrorCategory) }) - jobStatus.Status = model.JobStatusAborted - jobStatus.Error = errors.New(resp) - return jobStatus -} - -func (m *APIManager) refreshOAuthToken(destination *model.Destination, job model.Job, oAuthDetail oauthDetail) error { - refTokenParams := &oauth.RefreshTokenParams{ - Secret: oAuthDetail.secretToken.Account.Secret, - WorkspaceId: job.WorkspaceID, - AccountId: oAuthDetail.id, - DestDefName: destination.Name, - } - statusCode, refreshResponse := m.OAuth.RefreshToken(refTokenParams) - if statusCode != http.StatusOK { - if refreshResponse.Err == oauth.REF_TOKEN_INVALID_GRANT { - // authStatus should be made inactive - m.inactivateAuthStatus(destination, job, oAuthDetail) - return errors.New(refreshResponse.ErrorMessage) - } - - var refreshRespErr string - if refreshResponse != nil { - refreshRespErr = refreshResponse.ErrorMessage - } - return fmt.Errorf("[%v] Failed to refresh token for destination in workspace(%v) & account(%v) with %v", destination.Name, job.WorkspaceID, oAuthDetail.id, refreshRespErr) - } - return nil } type PostResponseParams struct { @@ -315,7 +220,6 @@ type PostResponseParams struct { isOAuthEnabled bool currentOAuthRetryAttempt int job model.Job - oAuthDetail oauthDetail responseBodyBytes []byte responseStatusCode int } @@ -334,23 +238,8 @@ func (m *APIManager) PostResponse(ctx context.Context, params PostResponseParams if oauthErrorJob, ok := getOAuthErrorJob(jobResp); ok { authErrorCategory = oauthErrorJob.AuthErrorCategory } - // old oauth handling - if authErrorCategory != "" && params.isOAuthEnabled && !m.IsOAuthV2Enabled { - if authErrorCategory == oauth.AUTH_STATUS_INACTIVE { - return m.inactivateAuthStatus(¶ms.destination, params.job, params.oAuthDetail) - } - if authErrorCategory == oauth.REFRESH_TOKEN && params.currentOAuthRetryAttempt < m.MaxOAuthRefreshRetryAttempts { - if err := m.refreshOAuthToken(¶ms.destination, params.job, params.oAuthDetail); err != nil { - pkgLogger.Errorn("Error while refreshing authToken", obskit.Error(err)) - return model.JobStatus{Status: model.JobStatusFailed, Error: err} - } - // retry the request - pkgLogger.Infon("[%v] Retrying deleteRequest job(id: %v) for the whole batch, RetryAttempt: %v", logger.NewStringField("destinationName", params.destination.Name), logger.NewIntField("jobId", int64(params.job.ID)), logger.NewIntField("retryAttempt", int64(params.currentOAuthRetryAttempt+1))) - return m.deleteWithRetry(ctx, params.job, params.destination, params.currentOAuthRetryAttempt+1) - } - } // new oauth handling - if params.isOAuthEnabled && m.IsOAuthV2Enabled { + if params.isOAuthEnabled { if authErrorCategory == common.CategoryRefreshToken { // All the handling related to OAuth has been done(inside api.Client.Do() itself)! // retry the request diff --git a/regulation-worker/internal/delete/api/api_test.go b/regulation-worker/internal/delete/api/api_test.go index cc3adf0f9d..7c898fe812 100644 --- a/regulation-worker/internal/delete/api/api_test.go +++ b/regulation-worker/internal/delete/api/api_test.go @@ -23,7 +23,6 @@ import ( mock_features "github.com/rudderlabs/rudder-server/mocks/services/transformer" "github.com/rudderlabs/rudder-server/regulation-worker/internal/delete/api" "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" - "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/transformer" testutils "github.com/rudderlabs/rudder-server/utils/tests" "github.com/rudderlabs/rudder-server/utils/types/deployment" @@ -269,7 +268,6 @@ type oauthTestCases struct { expectedDeleteStatus model.JobStatus expectedDeleteStatus_OAuthV2 model.JobStatus expectedPayload string - isOAuthV2Enabled bool } var defaultDestDefConfig = map[string]interface{}{ @@ -650,7 +648,7 @@ var oauthTests = []oauthTestCases{ deleteResponses: []deleteResponseParams{ { status: 400, - jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, oauth.AUTH_STATUS_INACTIVE), + jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.AuthStatusInActive), }, }, cpResponses: []testutils.CpResponseParams{ @@ -697,7 +695,7 @@ var oauthTests = []oauthTestCases{ deleteResponses: []deleteResponseParams{ { status: 400, - jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, oauth.AUTH_STATUS_INACTIVE), + jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.AuthStatusInActive), }, }, cpResponses: []testutils.CpResponseParams{ @@ -784,7 +782,6 @@ func (*mockIdentifier) Type() deployment.Type { return "mockType" } func TestOAuth(t *testing.T) { for _, tc := range oauthTests { tc.name = fmt.Sprintf("[OAuthV2] %s", tc.name) - tc.isOAuthV2Enabled = true oauthTests = append(oauthTests, tc) // oauthTests[i] = tc } @@ -821,41 +818,33 @@ func TestOAuth(t *testing.T) { }, } - oauth.Init() - OAuth := oauth.NewOAuthErrorHandler(mockBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete), oauth.WithOAuthClientTimeout(tt.oauthHttpClientTimeout)) - if tt.isOAuthV2Enabled { - cache := oauthV2.NewCache() - oauthLock := rudderSync.NewPartitionRWLocker() - - if tt.oauthHttpClientTimeout.Seconds() > 0 { - config.Set("HttpClient.oauth.timeout", tt.oauthHttpClientTimeout.Seconds()) - } - optionalArgs := oauthv2_http.HttpClientOptionalArgs{ - Augmenter: extensions.HeaderAugmenter, - Locker: oauthLock, - } - cli = oauthv2_http.NewOAuthHttpClient( - cli, common.RudderFlow(oauth.RudderFlow_Delete), - &cache, mockBackendConfig, - api.GetAuthErrorCategoryFromResponse, &optionalArgs, - ) + cache := oauthV2.NewCache() + oauthLock := rudderSync.NewPartitionRWLocker() + + if tt.oauthHttpClientTimeout.Seconds() > 0 { + config.Set("HttpClient.oauth.timeout", tt.oauthHttpClientTimeout.Seconds()) + } + optionalArgs := oauthv2_http.HttpClientOptionalArgs{ + Augmenter: extensions.HeaderAugmenter, + Locker: oauthLock, } + cli = oauthv2_http.NewOAuthHttpClient( + cli, common.RudderFlow(common.RudderFlowDelete), + &cache, mockBackendConfig, + api.GetAuthErrorCategoryFromResponse, &optionalArgs, + ) api := api.APIManager{ Client: cli, DestTransformURL: svr.URL, - OAuth: OAuth, MaxOAuthRefreshRetryAttempts: 1, - IsOAuthV2Enabled: tt.isOAuthV2Enabled, } status := api.Delete(ctx, tt.job, tt.dest) require.Equal(t, tt.expectedDeleteStatus.Status, status.Status) if tt.expectedDeleteStatus.Status != model.JobStatusComplete { exp := tt.expectedDeleteStatus.Error.Error() - if tt.isOAuthV2Enabled { - exp = tt.expectedDeleteStatus_OAuthV2.Error.Error() - } + exp = tt.expectedDeleteStatus_OAuthV2.Error.Error() jobError := strings.Replace(exp, "__cfgBE_server__", cfgBeSrv.URL, 1) require.Contains(t, strings.ToLower(status.Error.Error()), strings.ToLower(jobError)) diff --git a/router/handle.go b/router/handle.go index 6fbc9930bd..b9992f8279 100644 --- a/router/handle.go +++ b/router/handle.go @@ -35,7 +35,6 @@ import ( routerutils "github.com/rudderlabs/rudder-server/router/utils" destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" "github.com/rudderlabs/rudder-server/services/diagnostics" - "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/rmetrics" "github.com/rudderlabs/rudder-server/services/rsources" transformerFeaturesService "github.com/rudderlabs/rudder-server/services/transformer" @@ -90,7 +89,6 @@ type Handle struct { customDestinationManager customDestinationManager.DestinationManager transformer transformer.Transformer isOAuthDestination bool - oauth oauth.Authorizer destinationsMapMu sync.RWMutex destinationsMap map[string]*routerutils.DestinationWithSources // destinationID -> destination connectionsMap map[types.SourceDest]types.ConnectionWithID @@ -730,26 +728,3 @@ func (rt *Handle) getThrottlingCost(job *jobsdb.JobT) (cost int64) { func (*Handle) crashRecover() { // NO-OP } - -func (rt *Handle) updateAuthStatusToInactive(destination *backendconfig.DestinationT, workspaceID, rudderAccountId string) int { - inactiveAuthStatusStatTags := stats.Tags{ - "id": destination.ID, - "destType": destination.DestinationDefinition.Name, - "workspaceId": workspaceID, - "success": "true", - "flowType": string(oauth.RudderFlow_Delivery), - } - errCatStatusCode, _ := rt.oauth.AuthStatusToggle(&oauth.AuthStatusToggleParams{ - Destination: destination, - WorkspaceId: workspaceID, - RudderAccountId: rudderAccountId, - AuthStatus: oauth.AuthStatusInactive, - }) - if errCatStatusCode != http.StatusOK { - // Error while inactivating authStatus - inactiveAuthStatusStatTags["success"] = "false" - } - stats.Default.NewTaggedStat("auth_status_inactive_category_count", stats.CountType, inactiveAuthStatusStatTags).Increment() - // Abort the jobs as the destination is disabled - return http.StatusBadRequest -} diff --git a/router/handle_lifecycle.go b/router/handle_lifecycle.go index 9e00d8ccc4..cdedf0a50f 100644 --- a/router/handle_lifecycle.go +++ b/router/handle_lifecycle.go @@ -30,7 +30,8 @@ import ( routerutils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/rruntime" destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" - "github.com/rudderlabs/rudder-server/services/oauth" + oauthv2 "github.com/rudderlabs/rudder-server/services/oauth/v2" + "github.com/rudderlabs/rudder-server/services/oauth/v2/common" "github.com/rudderlabs/rudder-server/services/rmetrics" "github.com/rudderlabs/rudder-server/services/rsources" transformerFeaturesService "github.com/rudderlabs/rudder-server/services/transformer" @@ -130,9 +131,11 @@ func (rt *Handle) Setup( rt.reloadableConfig.oauthV2ExpirationTimeDiff, rt.transformerFeaturesService, ) - rt.isOAuthDestination = oauth.IsOAuthDestination(destinationDefinition.Config) - rt.oauth = oauth.NewOAuthErrorHandler(backendConfig) - + destination := oauthv2.DestinationInfo{ + DefinitionName: destinationDefinition.Name, + Config: destinationDefinition.Config, + } + rt.isOAuthDestination, _ = destination.IsOAuthDestination(common.RudderFlowDelivery) rt.isBackendConfigInitialized = false rt.backendConfigInitialized = make(chan bool) diff --git a/router/worker.go b/router/worker.go index 67535ee56f..f7a2b0414b 100644 --- a/router/worker.go +++ b/router/worker.go @@ -26,7 +26,6 @@ import ( routerutils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/rruntime" destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" - "github.com/rudderlabs/rudder-server/services/oauth" oauthv2 "github.com/rudderlabs/rudder-server/services/oauth/v2" "github.com/rudderlabs/rudder-server/utils/misc" utilTypes "github.com/rudderlabs/rudder-server/utils/types" @@ -197,26 +196,6 @@ func (w *worker) workLoop() { destination := batchDestination.Destination connection := conn.Connection - // if w.rt.isOAuthDestination && !oauthV2Enabled { - // rudderAccountID := oauth.GetAccountId(destination.Config, oauth.DeliveryAccountIdKey) - - // if routerutils.IsNotEmptyString(rudderAccountID) { - // w.logger.Debugf(`[%s][FetchToken] Token Fetch Method to be called`, destination.DestinationDefinition.Name) - // // Get Access Token Information to send it as part of the event - // tokenStatusCode, accountSecretInfo := w.rt.oauth.FetchToken(&oauth.RefreshTokenParams{ - // AccountId: rudderAccountID, - // WorkspaceId: jobMetadata.WorkspaceID, - // DestDefName: destination.DestinationDefinition.Name, - // }) - // w.logger.Debugf(`[%s][FetchToken] Token Fetch Method finished (statusCode, value): (%v, %+v)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo) - // if tokenStatusCode == http.StatusOK { - // jobMetadata.Secret = accountSecretInfo.Account.Secret - // } else { - // w.logger.Errorf(`[%s][FetchToken] Token Fetch Method error (statusCode, error): (%d, %s)`, destination.DestinationDefinition.Name, tokenStatusCode, accountSecretInfo.Err) - // } - // } - // } - if w.rt.enableBatching { w.routerJobs = append(w.routerJobs, types.RouterJobT{ Message: job.EventPayload, @@ -805,25 +784,6 @@ func (w *worker) proxyRequest(ctx context.Context, destinationJob types.Destinat proxyRequestResponse := w.rt.transformer.ProxyRequest(ctx, proxyReqparams) w.routerProxyStat.SendTiming(time.Since(rtlTime)) w.logger.Debugf(`[TransformerProxy] (Dest-%[1]v) {Job - %[2]v} Request ended`, w.rt.destType, jobID) - if !oauth.IsOAuthDestination(destinationJob.Destination.DestinationDefinition.Config) { - return proxyRequestResponse - } - // if proxyRequestResponse.ProxyRequestStatusCode != http.StatusOK && !oauthV2Enabled { - // w.logger.Debugn(`Sending for OAuth destination`) - // // Token from header of the request - // respStatusCode, respBodyTemp, contentType := w.rt.handleOAuthDestResponse(&HandleDestOAuthRespParams{ - // ctx: ctx, - // destinationJob: destinationJob, - // workerID: w.id, - // trRespStCd: proxyRequestResponse.ProxyRequestStatusCode, - // trRespBody: proxyRequestResponse.ProxyRequestResponseBody, - // secret: m[0].Secret, - // contentType: proxyRequestResponse.RespContentType, - // }, proxyRequestResponse.OAuthErrorCategory) - - // proxyRequestResponse.RespStatusCodes, proxyRequestResponse.RespBodys = w.prepareResponsesForJobs(&destinationJob, respStatusCode, respBodyTemp) - // proxyRequestResponse.RespContentType = contentType - // } return proxyRequestResponse } diff --git a/runner/runner.go b/runner/runner.go index 604b4c9ee6..64c4e8fdde 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -31,7 +31,6 @@ import ( "github.com/rudderlabs/rudder-server/services/alert" "github.com/rudderlabs/rudder-server/services/controlplane" "github.com/rudderlabs/rudder-server/services/diagnostics" - "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/streammanager/kafka" "github.com/rudderlabs/rudder-server/utils/crash" "github.com/rudderlabs/rudder-server/utils/misc" @@ -306,7 +305,6 @@ func runAllInit() { kafka.Init() customdestinationmanager.Init() alert.Init() - oauth.Init() } func (r *Runner) versionInfo() map[string]interface{} { diff --git a/services/oauth/oauth.go b/services/oauth/oauth.go deleted file mode 100644 index 18ec2375d7..0000000000 --- a/services/oauth/oauth.go +++ /dev/null @@ -1,621 +0,0 @@ -package oauth - -//go:generate mockgen -destination=../../mocks/services/oauth/mock_oauth.go -package=mocks_oauth github.com/rudderlabs/rudder-server/services/oauth Authorizer -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "os" - "strconv" - "strings" - "sync" - "time" - - "github.com/tidwall/gjson" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - "github.com/rudderlabs/rudder-server/jsonrs" - router_utils "github.com/rudderlabs/rudder-server/router/utils" - "github.com/rudderlabs/rudder-server/utils/httputil" - "github.com/rudderlabs/rudder-server/utils/misc" -) - -type ( - AuthType string - RudderFlow string -) - -const ( - OAuth AuthType = "OAuth" - InvalidAuthType AuthType = "InvalidAuthType" - - RudderFlow_Delivery RudderFlow = "delivery" - RudderFlow_Delete RudderFlow = "delete" - - DeleteAccountIdKey = "rudderDeleteAccountId" - DeliveryAccountIdKey = "rudderAccountId" - - AuthStatusInactive = "inactive" -) - -type AccountSecret struct { - ExpirationDate string `json:"expirationDate"` - Secret json.RawMessage `json:"secret"` -} -type AuthResponse struct { - Account AccountSecret - Err string - ErrorMessage string -} - -type OAuthStats struct { - id string - workspaceId string - errorMessage string - rudderCategory string - statName string - isCallToCpApi bool - authErrCategory string - destDefName string - isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor - flowType RudderFlow - action string // refresh_token, fetch_token, auth_status_toggle -} - -type DisableDestinationResponse struct { - Enabled bool `json:"enabled"` - DestinationId string `json:"id"` -} - -type AuthStatusToggleResponse struct { - Message string `json:"message,omitempty"` -} - -type AuthStatusToggleParams struct { - Destination *backendconfig.DestinationT - WorkspaceId string - RudderAccountId string - AuthStatus string -} - -type RefreshTokenParams struct { - AccountId string - WorkspaceId string - DestDefName string - WorkerId int - Secret json.RawMessage -} - -// OAuthErrResHandler is the handle for this class -type OAuthErrResHandler struct { - tr *http.Transport - client *http.Client - logger logger.Logger - destLockMap map[string]*sync.RWMutex // This mutex map is used for disable destination locking - accountLockMap map[string]*sync.RWMutex // This mutex map is used for refresh token locking - lockMapWMutex *sync.RWMutex // This mutex is used to prevent concurrent writes in lockMap(s) mentioned in the struct - destAuthInfoMap map[string]*AuthResponse - refreshActiveMap map[string]bool // Used to check if a refresh request for an account is already InProgress - authStatusUpdateActiveMap map[string]bool // Used to check if a authStatusInactive request for a destination is already InProgress - tokenProvider tokenProvider - rudderFlowType RudderFlow -} - -type Authorizer interface { - AuthStatusToggle(*AuthStatusToggleParams) (int, string) - RefreshToken(refTokenParams *RefreshTokenParams) (int, *AuthResponse) - FetchToken(fetchTokenParams *RefreshTokenParams) (int, *AuthResponse) -} - -type ControlPlaneRequestT struct { - Body string - ContentType string - Url string - Method string - destName string - RequestType string // This is to add more refined stat tags -} - -var ( - configBEURL string - pkgLogger logger.Logger - loggerNm string -) - -const ( - REFRESH_TOKEN = "REFRESH_TOKEN" - // Identifier to be sent from destination(during transformation/delivery) - AUTH_STATUS_INACTIVE = "AUTH_STATUS_INACTIVE" - - // Identifier for invalid_grant or access_denied errors(during refreshing the token) - REF_TOKEN_INVALID_GRANT = "ref_token_invalid_grant" -) - -var ErrPermissionOrTokenRevoked = errors.New("problem with user permission or access/refresh token have been revoked") - -// This struct only exists for marshalling and sending payload to control-plane -type RefreshTokenBodyParams struct { - HasExpired bool `json:"hasExpired"` - ExpiredSecret json.RawMessage `json:"expiredSecret"` -} - -type tokenProvider interface { - AccessToken() string -} - -func Init() { - configBEURL = backendconfig.GetConfigBackendURL() - pkgLogger = logger.NewLogger().Child("router").Child("OAuthResponseHandler") - loggerNm = "OAuthResponseHandler" -} - -func GetAuthType(config map[string]interface{}) AuthType { - var lookupErr *misc.MapLookupError - var authValue interface{} - if authValue, lookupErr = misc.NestedMapLookup(config, "auth", "type"); lookupErr != nil { - return "" - } - authType, ok := authValue.(string) - if !ok { - return "" - } - return AuthType(authType) -} - -func IsOAuthDestination(config map[string]interface{}) bool { - authType := GetAuthType(config) - return authType == OAuth -} - -// This function creates a new OauthErrorResponseHandler -func NewOAuthErrorHandler(provider tokenProvider, options ...func(*OAuthErrResHandler)) *OAuthErrResHandler { - oAuthErrResHandler := &OAuthErrResHandler{ - tokenProvider: provider, - logger: pkgLogger, - tr: &http.Transport{}, - client: &http.Client{Timeout: config.GetDuration("HttpClient.oauth.timeout", 30, time.Second)}, - // This timeout is kind of modifiable & it seemed like 10 mins for this is too much! - destLockMap: make(map[string]*sync.RWMutex), - accountLockMap: make(map[string]*sync.RWMutex), - lockMapWMutex: &sync.RWMutex{}, - destAuthInfoMap: make(map[string]*AuthResponse), - refreshActiveMap: make(map[string]bool), - authStatusUpdateActiveMap: make(map[string]bool), - rudderFlowType: RudderFlow_Delivery, - } - for _, opt := range options { - opt(oAuthErrResHandler) - } - return oAuthErrResHandler -} - -func GetAccountId(config map[string]interface{}, idKey string) string { - if rudderAccountIdInterface, found := config[idKey]; found { - if rudderAccountId, ok := rudderAccountIdInterface.(string); ok { - return rudderAccountId - } - } - return "" -} - -func WithRudderFlow(rudderFlow RudderFlow) func(*OAuthErrResHandler) { - return func(authErrHandle *OAuthErrResHandler) { - authErrHandle.rudderFlowType = rudderFlow - } -} - -func WithOAuthClientTimeout(timeout time.Duration) func(*OAuthErrResHandler) { - return func(authErrHandle *OAuthErrResHandler) { - authErrHandle.client.Timeout = timeout - } -} - -func (authErrHandler *OAuthErrResHandler) RefreshToken(refTokenParams *RefreshTokenParams) (int, *AuthResponse) { - authStats := &OAuthStats{ - id: refTokenParams.AccountId, - workspaceId: refTokenParams.WorkspaceId, - rudderCategory: "destination", - statName: "", - isCallToCpApi: false, - authErrCategory: REFRESH_TOKEN, - errorMessage: "", - destDefName: refTokenParams.DestDefName, - flowType: authErrHandler.rudderFlowType, - action: "refresh_token", - } - return authErrHandler.GetTokenInfo(refTokenParams, "Refresh token", authStats) -} - -func getOAuthActionStatName(stat string) string { - return fmt.Sprintf("oauth_action_%v", stat) -} - -func (authErrHandler *OAuthErrResHandler) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *AuthResponse) { - authStats := &OAuthStats{ - id: fetchTokenParams.AccountId, - workspaceId: fetchTokenParams.WorkspaceId, - rudderCategory: "destination", - statName: "", - isCallToCpApi: false, - authErrCategory: "", - errorMessage: "", - destDefName: fetchTokenParams.DestDefName, - isTokenFetch: true, - flowType: authErrHandler.rudderFlowType, - action: "fetch_token", - } - return authErrHandler.GetTokenInfo(fetchTokenParams, "Fetch token", authStats) -} - -func (authErrHandler *OAuthErrResHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse) { - startTime := time.Now() - defer func() { - authStats.statName = getOAuthActionStatName("total_latency") - authStats.isCallToCpApi = false - authStats.SendTimerStats(startTime) - }() - - accountMutex := authErrHandler.getKeyMutex(authErrHandler.accountLockMap, refTokenParams.AccountId) - refTokenBody := RefreshTokenBodyParams{} - if router_utils.IsNotEmptyString(string(refTokenParams.Secret)) { - refTokenBody = RefreshTokenBodyParams{ - HasExpired: true, - ExpiredSecret: refTokenParams.Secret, - } - } - accountMutex.RLock() - refVal, ok := authErrHandler.destAuthInfoMap[refTokenParams.AccountId] - if ok { - isInvalidAccountSecretForRefresh := router_utils.IsNotEmptyString(string(refVal.Account.Secret)) && - !bytes.Equal(refVal.Account.Secret, refTokenParams.Secret) - if isInvalidAccountSecretForRefresh { - accountMutex.RUnlock() - authErrHandler.logger.Debugf("[%s request] [Cache] :: (Read) %s response received(rt-worker-%d): %s\n", loggerNm, logTypeName, refTokenParams.WorkerId, refVal.Account.Secret) - return http.StatusOK, refVal - } - } - accountMutex.RUnlock() - - accountMutex.Lock() - if isRefreshActive, isPresent := authErrHandler.refreshActiveMap[refTokenParams.AccountId]; isPresent && isRefreshActive { - accountMutex.Unlock() - if refVal != nil { - secret := refVal.Account.Secret - authErrHandler.logger.Debugf("[%s request] [Active] :: (Read) %s response received from cache(rt-worker-%d): %s\n", loggerNm, logTypeName, refTokenParams.WorkerId, string(secret)) - return http.StatusOK, refVal - } - // Empty Response(valid while many GetToken calls are happening) - return http.StatusOK, &AuthResponse{ - Account: AccountSecret{ - Secret: []byte(""), - }, - Err: "", - } - } - // Refresh will start - authErrHandler.refreshActiveMap[refTokenParams.AccountId] = true - authErrHandler.logger.Debugf("[%s request] [rt-worker-%v] :: %v request is active!", loggerNm, logTypeName, refTokenParams.WorkerId) - accountMutex.Unlock() - - defer func() { - accountMutex.Lock() - authErrHandler.refreshActiveMap[refTokenParams.AccountId] = false - authErrHandler.logger.Debugf("[%s request] [rt-worker-%v]:: %v request is inactive!", loggerNm, logTypeName, refTokenParams.WorkerId) - accountMutex.Unlock() - }() - - authErrHandler.logger.Debugf("[%s] [%v request] Lock Acquired by rt-worker-%d\n", loggerNm, logTypeName, refTokenParams.WorkerId) - - statusCode := authErrHandler.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName) - return statusCode, authErrHandler.destAuthInfoMap[refTokenParams.AccountId] -} - -// This method hits the Control Plane to get the account information -// As well update the account information into the destAuthInfoMap(which acts as an in-memory cache) -func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams, refTokenBody RefreshTokenBodyParams, - authStats *OAuthStats, logTypeName string, -) (statusCode int) { - refreshUrl := fmt.Sprintf("%s/destination/workspaces/%s/accounts/%s/token", configBEURL, refTokenParams.WorkspaceId, refTokenParams.AccountId) - res, err := jsonrs.Marshal(refTokenBody) - if err != nil { - panic(err) - } - refreshCpReq := &ControlPlaneRequestT{ - Method: http.MethodPost, - Url: refreshUrl, - ContentType: "application/json; charset=utf-8", - Body: string(res), - destName: refTokenParams.DestDefName, - RequestType: authStats.action, - } - var accountSecret AccountSecret - // Stat for counting number of Refresh Token endpoint calls - authStats.statName = getOAuthActionStatName(`request_sent`) - authStats.isCallToCpApi = true - authStats.errorMessage = "" - authStats.SendCountStat() - - cpiCallStartTime := time.Now() - statusCode, response := authErrHandler.cpApiCall(refreshCpReq) - authStats.statName = getOAuthActionStatName(`request_latency`) - authStats.SendTimerStats(cpiCallStartTime) - - authErrHandler.logger.Debugf("[%s] Got the response from Control-Plane: rt-worker-%d with statusCode: %d\n", loggerNm, refTokenParams.WorkerId, statusCode) - - // Empty Refresh token response - if !router_utils.IsNotEmptyString(response) { - authStats.statName = getOAuthActionStatName("failure") - authStats.errorMessage = "Empty secret" - authStats.SendCountStat() - // Setting empty accessToken value into in-memory auth info map(cache) - authErrHandler.destAuthInfoMap[refTokenParams.AccountId] = &AuthResponse{ - Account: AccountSecret{ - Secret: []byte(""), - }, - Err: "Empty secret", - } - authErrHandler.logger.Debugf("[%s request] :: Empty %s response received(rt-worker-%d) : %s\n", loggerNm, logTypeName, refTokenParams.WorkerId, response) - return http.StatusInternalServerError - } - - if errType, refErrMsg := authErrHandler.getRefreshTokenErrResp(response, &accountSecret); router_utils.IsNotEmptyString(refErrMsg) { - if _, ok := authErrHandler.destAuthInfoMap[refTokenParams.AccountId]; !ok { - authErrHandler.destAuthInfoMap[refTokenParams.AccountId] = &AuthResponse{ - Err: errType, - ErrorMessage: refErrMsg, - } - } else { - authErrHandler.destAuthInfoMap[refTokenParams.AccountId].Err = errType - authErrHandler.destAuthInfoMap[refTokenParams.AccountId].ErrorMessage = refErrMsg - } - authStats.statName = getOAuthActionStatName("failure") - authStats.errorMessage = refErrMsg - authStats.SendCountStat() - if refErrMsg == REF_TOKEN_INVALID_GRANT { - // Should abort the event as refresh is not going to work - // until we have new refresh token for the account - return http.StatusBadRequest - } - return http.StatusInternalServerError - } - // Update the refreshed account information into in-memory map(cache) - authErrHandler.destAuthInfoMap[refTokenParams.AccountId] = &AuthResponse{ - Account: accountSecret, - } - authStats.statName = getOAuthActionStatName("success") - authStats.errorMessage = "" - authStats.SendCountStat() - authErrHandler.logger.Debugf("[%s request] :: (Write) %s response received(rt-worker-%d): %s\n", loggerNm, logTypeName, refTokenParams.WorkerId, response) - return http.StatusOK -} - -func (authErrHandler *OAuthErrResHandler) getRefreshTokenErrResp(response string, accountSecret *AccountSecret) (errorType, message string) { - if err := jsonrs.Unmarshal([]byte(response), &accountSecret); err != nil { - // Some problem with AccountSecret unmarshalling - message = fmt.Sprintf("Unmarshal of response unsuccessful: %v", response) - errorType = "unmarshallableResponse" - } else if gjson.Get(response, "body.code").String() == REF_TOKEN_INVALID_GRANT { - // User (or) AccessToken (or) RefreshToken has been revoked - bodyMsg := gjson.Get(response, "body.message").String() - if bodyMsg == "" { - // Default message - authErrHandler.logger.Debugf("Failed with error response: %v\n", response) - message = ErrPermissionOrTokenRevoked.Error() - } else { - message = bodyMsg - } - errorType = REF_TOKEN_INVALID_GRANT - } - return errorType, message -} - -func (authStats *OAuthStats) SendTimerStats(startTime time.Time) { - statsTags := stats.Tags{ - "id": authStats.id, - "workspaceId": authStats.workspaceId, - "rudderCategory": authStats.rudderCategory, - "isCallToCpApi": strconv.FormatBool(authStats.isCallToCpApi), - "authErrCategory": authStats.authErrCategory, - "destType": authStats.destDefName, - "flowType": string(authStats.flowType), - "action": authStats.action, - } - stats.Default.NewTaggedStat(authStats.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime)) -} - -// Send count type stats related to OAuth(Destination) -func (refStats *OAuthStats) SendCountStat() { - statsTags := stats.Tags{ - "id": refStats.id, - "workspaceId": refStats.workspaceId, - "rudderCategory": refStats.rudderCategory, - "errorMessage": refStats.errorMessage, - "isCallToCpApi": strconv.FormatBool(refStats.isCallToCpApi), - "authErrCategory": refStats.authErrCategory, - "destType": refStats.destDefName, - "isTokenFetch": strconv.FormatBool(refStats.isTokenFetch), - "flowType": string(refStats.flowType), - "action": refStats.action, - } - stats.Default.NewTaggedStat(refStats.statName, stats.CountType, statsTags).Increment() -} - -func (authErrHandler *OAuthErrResHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusCode int, respBody string) { - authErrHandlerTimeStart := time.Now() - destinationId := params.Destination.ID - authStatusToggleMutex := authErrHandler.getKeyMutex(authErrHandler.destLockMap, destinationId) - action := fmt.Sprintf("auth_status_%v", params.AuthStatus) - - authStatusToggleStats := &OAuthStats{ - id: destinationId, - workspaceId: params.WorkspaceId, - rudderCategory: "destination", - statName: "", - isCallToCpApi: false, - authErrCategory: AUTH_STATUS_INACTIVE, - errorMessage: "", - destDefName: params.Destination.DestinationDefinition.Name, - flowType: authErrHandler.rudderFlowType, - action: action, - } - defer func() { - authStatusToggleStats.statName = getOAuthActionStatName("total_latency") - authStatusToggleStats.isCallToCpApi = false - authStatusToggleStats.SendTimerStats(authErrHandlerTimeStart) - }() - - authStatusToggleMutex.Lock() - isAuthStatusUpdateActive, isAuthStatusUpdateReqPresent := authErrHandler.authStatusUpdateActiveMap[destinationId] - authStatusUpdateActiveReq := strconv.FormatBool(isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive) - if isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive { - authStatusToggleMutex.Unlock() - authErrHandler.logger.Debugf("[%s request] :: AuthStatusInactive request Active : %s\n", loggerNm, authStatusUpdateActiveReq) - return http.StatusConflict, ErrPermissionOrTokenRevoked.Error() - } - - authErrHandler.authStatusUpdateActiveMap[destinationId] = true - authStatusToggleMutex.Unlock() - - defer func() { - authStatusToggleMutex.Lock() - authErrHandler.authStatusUpdateActiveMap[destinationId] = false - authErrHandler.logger.Debugf("[%s request] :: AuthStatusInactive request is inactive!", loggerNm) - authStatusToggleMutex.Unlock() - // After trying to inactivate authStatus for destination, need to remove existing accessToken(from in-memory cache) - // This is being done to obtain new token after an update such as re-authorisation is done - accountMutex := authErrHandler.getKeyMutex(authErrHandler.accountLockMap, params.RudderAccountId) - accountMutex.Lock() - delete(authErrHandler.destAuthInfoMap, params.RudderAccountId) - accountMutex.Unlock() - }() - - authStatusToggleUrl := fmt.Sprintf("%s/workspaces/%s/destinations/%s/authStatus/toggle", configBEURL, params.WorkspaceId, destinationId) - - authStatusInactiveCpReq := &ControlPlaneRequestT{ - Url: authStatusToggleUrl, - Method: http.MethodPut, - Body: fmt.Sprintf(`{"authStatus": "%v"}`, params.AuthStatus), - ContentType: "application/json", - destName: params.Destination.DestinationDefinition.Name, - RequestType: action, - } - - authStatusToggleStats.statName = getOAuthActionStatName("request_sent") - authStatusToggleStats.isCallToCpApi = true - authStatusToggleStats.SendCountStat() - - cpiCallStartTime := time.Now() - statusCode, respBody = authErrHandler.cpApiCall(authStatusInactiveCpReq) - authStatusToggleStats.statName = getOAuthActionStatName("request_latency") - defer authStatusToggleStats.SendTimerStats(cpiCallStartTime) - authErrHandler.logger.Errorf(`Response from CP(stCd: %v) for auth status inactive req: %v`, statusCode, respBody) - - var authStatusToggleRes *AuthStatusToggleResponse - unmarshalErr := jsonrs.Unmarshal([]byte(respBody), &authStatusToggleRes) - if router_utils.IsNotEmptyString(respBody) && (unmarshalErr != nil || !router_utils.IsNotEmptyString(authStatusToggleRes.Message) || statusCode != http.StatusOK) { - var msg string - if unmarshalErr != nil { - msg = unmarshalErr.Error() - } else { - msg = fmt.Sprintf("Could not update authStatus to inactive for destination: %v", authStatusToggleRes.Message) - } - authStatusToggleStats.statName = getOAuthActionStatName("failure") - authStatusToggleStats.errorMessage = msg - authStatusToggleStats.SendCountStat() - return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error() - } - - authErrHandler.logger.Errorf("[%s request] :: (Write) auth status inactive Response received : %s\n", loggerNm, respBody) - authStatusToggleStats.statName = getOAuthActionStatName("success") - authStatusToggleStats.errorMessage = "" - authStatusToggleStats.SendCountStat() - - return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error() -} - -func processResponse(resp *http.Response) (statusCode int, respBody string) { - var respData []byte - var ioUtilReadErr error - if resp != nil && resp.Body != nil { - respData, ioUtilReadErr = io.ReadAll(resp.Body) - defer func() { httputil.CloseResponse(resp) }() - if ioUtilReadErr != nil { - return http.StatusInternalServerError, ioUtilReadErr.Error() - } - } - // Detecting content type of the respData - contentTypeHeader := strings.ToLower(http.DetectContentType(respData)) - // If content type is not of type "*text*", overriding it with empty string - if !strings.Contains(contentTypeHeader, "text") && - !strings.Contains(contentTypeHeader, "application/json") && - !strings.Contains(contentTypeHeader, "application/xml") { - respData = []byte("") - } - - return resp.StatusCode, string(respData) -} - -func (authErrHandler *OAuthErrResHandler) cpApiCall(cpReq *ControlPlaneRequestT) (int, string) { - cpStatTags := stats.Tags{ - "url": cpReq.Url, - "requestType": cpReq.RequestType, - "destType": cpReq.destName, - "method": cpReq.Method, - "flowType": string(authErrHandler.rudderFlowType), - } - - var reqBody *bytes.Buffer - var req *http.Request - var err error - if router_utils.IsNotEmptyString(cpReq.Body) { - reqBody = bytes.NewBufferString(cpReq.Body) - req, err = http.NewRequest(cpReq.Method, cpReq.Url, reqBody) - } else { - req, err = http.NewRequest(cpReq.Method, cpReq.Url, http.NoBody) - } - if err != nil { - authErrHandler.logger.Errorf("[%s request] :: destination request failed: %+v\n", loggerNm, err) - // Abort on receiving an error in request formation - return http.StatusBadRequest, err.Error() - } - // Authorisation setting - req.SetBasicAuth(authErrHandler.tokenProvider.AccessToken(), "") - - // Set content-type in order to send the body in request correctly - if router_utils.IsNotEmptyString(cpReq.ContentType) { - req.Header.Set("Content-Type", cpReq.ContentType) - } - - cpApiDoTimeStart := time.Now() - res, doErr := authErrHandler.client.Do(req) - stats.Default.NewTaggedStat("cp_request_latency", stats.TimerType, cpStatTags).SendTiming(time.Since(cpApiDoTimeStart)) - authErrHandler.logger.Debugf("[%s request] :: destination request sent\n", loggerNm) - if doErr != nil { - // Abort on receiving an error - authErrHandler.logger.Errorf("[%s request] :: destination request failed: %+v\n", loggerNm, doErr) - if os.IsTimeout(doErr) { - stats.Default.NewTaggedStat("cp_request_timeout", stats.CountType, cpStatTags) - } - return http.StatusBadRequest, doErr.Error() - } - defer func() { httputil.CloseResponse(res) }() - statusCode, resp := processResponse(res) - return statusCode, resp -} - -func (resHandler *OAuthErrResHandler) getKeyMutex(mutexMap map[string]*sync.RWMutex, id string) *sync.RWMutex { - resHandler.lockMapWMutex.Lock() - defer resHandler.lockMapWMutex.Unlock() - // mutexMap will not be nil - if _, ok := mutexMap[id]; !ok { - resHandler.logger.Debugf("[%s request] :: Creating new mutex for %s\n", loggerNm, id) - mutexMap[id] = &sync.RWMutex{} - } - return mutexMap[id] -} diff --git a/services/oauth/oauth_test.go b/services/oauth/oauth_test.go deleted file mode 100644 index ef9924d1b6..0000000000 --- a/services/oauth/oauth_test.go +++ /dev/null @@ -1,191 +0,0 @@ -package oauth_test - -import ( - "fmt" - "net/http" - "net/http/httptest" - "sync" - "testing" - "time" - - "github.com/go-chi/chi/v5" - "github.com/samber/lo" - "go.uber.org/mock/gomock" - - "github.com/stretchr/testify/require" - - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" - "github.com/rudderlabs/rudder-server/services/oauth" -) - -type cpResponseParams struct { - timeout time.Duration - code int - response string -} -type cpResponseProducer struct { - responses []cpResponseParams - callCount int -} - -func (s *cpResponseProducer) GetNext() cpResponseParams { - if s.callCount >= len(s.responses) { - panic("ran out of responses") - } - cpResp := s.responses[s.callCount] - s.callCount++ - return cpResp -} - -func (cpRespProducer *cpResponseProducer) mockCpRequests() *chi.Mux { - srvMux := chi.NewMux() - srvMux.HandleFunc("/destination/workspaces/{workspaceId}/accounts/{accountId}/token", func(w http.ResponseWriter, req *http.Request) { - // iterating over request parameters - for _, reqParam := range []string{"workspaceId", "accountId"} { - param := chi.URLParam(req, reqParam) - if param == "" { - // This case wouldn't occur I guess - http.Error(w, fmt.Sprintf("Wrong url being sent: %v", reqParam), http.StatusBadRequest) - return - } - } - - cpResp := cpRespProducer.GetNext() - // sleep is being used to mimic the waiting in actual transformer response - if cpResp.timeout > 0 { - time.Sleep(cpResp.timeout) - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(cpResp.code) - // Lint error fix - _, err := w.Write([]byte(cpResp.response)) - if err != nil { - http.Error(w, fmt.Sprintf("Provided response is faulty, please check it. Err: %v", err.Error()), http.StatusInternalServerError) - return - } - }) - - srvMux.HandleFunc("/workspaces/{workspaceId}/destinations/{destinationId}/authStatus/toggle", func(w http.ResponseWriter, req *http.Request) { - if req.Method != http.MethodPut { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - // iterating over request parameters - for _, reqParam := range []string{"workspaceId", "destinationId"} { - param := chi.URLParam(req, reqParam) - if param == "" { - // This case wouldn't occur I guess - http.Error(w, fmt.Sprintf("Wrong url being sent: %v", reqParam), http.StatusNotFound) - return - } - } - - cpResp := cpRespProducer.GetNext() - // sleep is being used to mimic the waiting in actual transformer response - if cpResp.timeout > 0 { - time.Sleep(cpResp.timeout) - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(cpResp.code) - // Lint error fix - _, err := w.Write([]byte(cpResp.response)) - if err != nil { - http.Error(w, fmt.Sprintf("Provided response is faulty, please check it. Err: %v", err.Error()), http.StatusInternalServerError) - return - } - }) - return srvMux -} - -func TestIsOAuthDestination(t *testing.T) { - testCases := []struct { - name string - config map[string]interface{} - expected bool - }{ - { - name: "should return true if destination is OAuth", - config: map[string]interface{}{"auth": map[string]interface{}{"type": "OAuth"}}, - expected: true, - }, - { - name: "should return false if destination is not OAuth", - config: map[string]interface{}{}, - expected: false, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expected, oauth.IsOAuthDestination(tc.config)) - }) - } -} - -func TestMultipleRequestsForOAuth(t *testing.T) { - mockCtrl := gomock.NewController(t) - mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(mockCtrl) - mockBackendConfig.EXPECT().AccessToken().AnyTimes() - - t.Run("multiple authStatusInactive requests", func(t *testing.T) { - cpRespProducer := &cpResponseProducer{ - responses: []cpResponseParams{ - { - timeout: 1 * time.Second, - code: 200, - }, - }, - } - cfgBeSrv := httptest.NewServer(cpRespProducer.mockCpRequests()) - - defer cfgBeSrv.Close() - - t.Setenv("CONFIG_BACKEND_URL", cfgBeSrv.URL) - t.Setenv("CONFIG_BACKEND_TOKEN", "config_backend_token") - - backendconfig.Init() - oauth.Init() - OAuth := oauth.NewOAuthErrorHandler(mockBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete)) - - totalGoRoutines := 5 - var wg sync.WaitGroup - var allJobStatus []int - var allJobStatusMu sync.Mutex - - dest := &backendconfig.DestinationT{ - ID: "dId", - Config: map[string]interface{}{ - "rudderDeleteAccountId": "accountId", - }, - DestinationDefinition: backendconfig.DestinationDefinitionT{ - Name: "GA", - Config: map[string]interface{}{ - "auth": map[string]interface{}{ - "type": "OAuth", - }, - }, - }, - } - - for i := 0; i < totalGoRoutines; i++ { - wg.Add(1) - go func() { - status, _ := OAuth.AuthStatusToggle(&oauth.AuthStatusToggleParams{ - Destination: dest, - WorkspaceId: "wspId", - RudderAccountId: "accountId", - AuthStatus: oauth.AuthStatusInactive, - }) - allJobStatusMu.Lock() - allJobStatus = append(allJobStatus, status) - allJobStatusMu.Unlock() - wg.Done() - }() - } - wg.Wait() - countMap := lo.CountValues(allJobStatus) - - require.Equal(t, countMap[http.StatusConflict], totalGoRoutines-1, countMap) - require.Equal(t, countMap[http.StatusBadRequest], 1, countMap) - }) -} diff --git a/services/oauth/v2/common/constants.go b/services/oauth/v2/common/constants.go index 38c680923f..c9f61c4e2f 100644 --- a/services/oauth/v2/common/constants.go +++ b/services/oauth/v2/common/constants.go @@ -5,18 +5,18 @@ const ( // CategoryAuthStatusInactive Identifier to be sent from destination(during transformation/delivery) CategoryAuthStatusInactive = "AUTH_STATUS_INACTIVE" // RefTokenInvalidGrant Identifier for invalid_grant or access_denied errors(during refreshing the token) - RefTokenInvalidGrant = "ref_token_invalid_grant" - RefTokenInvalidResponse = "INVALID_REFRESH_RESPONSE" - TimeOutError = "timeout" - NetworkError = "network_error" - None = "none" - - DestKey ContextKey = "destination" - SecretKey ContextKey = "secret" - RudderFlowDelivery RudderFlow = "delivery" - RudderFlowDelete RudderFlow = "delete" - DeleteAccountIDKey = "rudderDeleteAccountId" - DeliveryAccountIDKey = "rudderAccountId" + RefTokenInvalidGrant = "ref_token_invalid_grant" + RefTokenInvalidResponse = "INVALID_REFRESH_RESPONSE" + TimeOutError = "timeout" + NetworkError = "network_error" + None = "none" + OAuth AuthType = "OAuth" + DestKey ContextKey = "destination" + SecretKey ContextKey = "secret" + RudderFlowDelivery RudderFlow = "delivery" + RudderFlowDelete RudderFlow = "delete" + DeleteAccountIDKey = "rudderDeleteAccountId" + DeliveryAccountIDKey = "rudderAccountId" AuthStatusInActive = "inactive" diff --git a/services/oauth/v2/common/types.go b/services/oauth/v2/common/types.go index 40f7ace40f..1467005061 100644 --- a/services/oauth/v2/common/types.go +++ b/services/oauth/v2/common/types.go @@ -3,4 +3,5 @@ package common type ( RudderFlow string ContextKey string + AuthType string ) diff --git a/services/oauth/v2/destination_info.go b/services/oauth/v2/destination_info.go index 7e7d4b5b31..24d4ef0eb5 100644 --- a/services/oauth/v2/destination_info.go +++ b/services/oauth/v2/destination_info.go @@ -5,7 +5,6 @@ import ( "github.com/samber/lo" - "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/services/oauth/v2/common" "github.com/rudderlabs/rudder-server/utils/misc" ) @@ -33,7 +32,7 @@ func (d *DestinationInfo) IsOAuthDestination(flow common.RudderFlow) (bool, erro if err != nil { return false, err } - return authType == string(oauth.OAuth) && isScopeSupported, nil + return authType == string(common.OAuth) && isScopeSupported, nil } func (d *DestinationInfo) IsOAuthSupportedForFlow(flow string) (bool, error) { diff --git a/services/rmetrics/pending_events.go b/services/rmetrics/pending_events.go deleted file mode 100644 index 400de5403c..0000000000 --- a/services/rmetrics/pending_events.go +++ /dev/null @@ -1,153 +0,0 @@ -package rmetrics - -import ( - "fmt" - "sync" - - "github.com/rudderlabs/rudder-go-kit/stats/metric" -) - -const ( - JobsdbPendingEventsCount = "jobsdb_%s_pending_events_count" - All = "ALL" -) - -type ( - DecreasePendingEventsFunc func(tablePrefix, workspace, destType string, value float64) - IncreasePendingEventsFunc func(tablePrefix, workspace, destType string, value float64) -) - -// PendingEventsRegistry is a registry for pending events metrics -type PendingEventsRegistry interface { - // IncreasePendingEvents increments three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges - IncreasePendingEvents(tablePrefix, workspace, destType string, value float64) - // DecreasePendingEvents decrements three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges - DecreasePendingEvents(tablePrefix, workspace, destType string, value float64) - // PendingEvents gets the measurement for pending events metric - PendingEvents(tablePrefix, workspace, destType string) metric.Gauge - // Publish publishes the metrics to the global published metrics registry - Publish() - // Reset resets the registry to a new, non published one and clears the global published metrics registry - Reset() -} - -type Option func(*pendingEventsRegistry) - -// WithPublished creates a registry that writes metrics to the global published metrics registry, without having to call Publish first. -func WithPublished() Option { - return func(per *pendingEventsRegistry) { - per.published = true - per.registry = metric.Instance.GetRegistry(metric.PublishedMetrics) - } -} - -// NewPendingEventsRegistry creates a new PendingEventsRegistry. By default, metrics are not published to the global published metrics registry, until [Publish] is called. -func NewPendingEventsRegistry(opts ...Option) PendingEventsRegistry { - per := &pendingEventsRegistry{ - registry: metric.NewRegistry(), - } - for _, opt := range opts { - opt(per) - } - return per -} - -type pendingEventsRegistry struct { - registryMu sync.RWMutex - published bool - registry metric.Registry -} - -// IncreasePendingEvents increments three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges -func (pem *pendingEventsRegistry) IncreasePendingEvents(tablePrefix, workspace, destType string, value float64) { - pem.registryMu.RLock() - defer pem.registryMu.RUnlock() - - pem.PendingEvents(tablePrefix, workspace, destType).Add(value) - pem.PendingEvents(tablePrefix, All, destType).Add(value) - pem.PendingEvents(tablePrefix, All, All).Add(value) - pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, destType}).Add(value) - pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, All}).Add(value) -} - -// DecreasePendingEvents decrements three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges -func (pem *pendingEventsRegistry) DecreasePendingEvents(tablePrefix, workspace, destType string, value float64) { - pem.registryMu.RLock() - defer pem.registryMu.RUnlock() - pem.PendingEvents(tablePrefix, workspace, destType).Sub(value) - pem.PendingEvents(tablePrefix, All, destType).Sub(value) - pem.PendingEvents(tablePrefix, All, All).Sub(value) - pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, destType}).Sub(value) - pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, All}).Sub(value) -} - -// PendingEvents gets the measurement for pending events metric -func (pem *pendingEventsRegistry) PendingEvents(tablePrefix, workspace, destType string) metric.Gauge { - return pem.registry.MustGetGauge(newPendingEventsMeasurement(tablePrefix, workspace, destType)) -} - -// Publish publishes the metrics to the global published metrics registry -func (pem *pendingEventsRegistry) Publish() { - pem.registryMu.Lock() - defer pem.registryMu.Unlock() - if pem.published { - return - } - pem.published = true - - publishedRegistry := metric.Instance.GetRegistry(metric.PublishedMetrics) - pem.registry.Range(func(key, value any) bool { // copy all gauge metrics to the published registry - m := key.(metric.Measurement) - switch value := value.(type) { - case metric.Gauge: - publishedRegistry.MustGetGauge(m).Set(value.Value()) - } - return true - }) - pem.registry = publishedRegistry -} - -// Reset resets the registry to a new, non published one and clears the global published metrics registry -func (pem *pendingEventsRegistry) Reset() { - pem.registryMu.Lock() - defer pem.registryMu.Unlock() - pem.registry = metric.NewRegistry() - pem.published = false - metric.Instance.Reset() -} - -func newPendingEventsMeasurement(tablePrefix, workspace, destType string) metric.Measurement { - return pendingEventsMeasurement{tablePrefix, workspace, destType} -} - -type pendingEventsMeasurement struct { - tablePrefix string - workspace string - destType string -} - -func (r pendingEventsMeasurement) GetName() string { - return fmt.Sprintf(JobsdbPendingEventsCount, r.tablePrefix) -} - -func (r pendingEventsMeasurement) GetTags() map[string]string { - return map[string]string{ - "workspaceId": r.workspace, - "destType": r.destType, - } -} - -type pendingEventsMeasurementAll struct { - tablePrefix string - destType string -} - -func (r pendingEventsMeasurementAll) GetName() string { - return fmt.Sprintf(JobsdbPendingEventsCount, r.tablePrefix) + "_all" -} - -func (r pendingEventsMeasurementAll) GetTags() map[string]string { - return map[string]string{ - "destType": r.destType, - } -} diff --git a/services/rmetrics/pending_events_test.go b/services/rmetrics/pending_events_test.go deleted file mode 100644 index eda9707fc1..0000000000 --- a/services/rmetrics/pending_events_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package rmetrics_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/rudderlabs/rudder-go-kit/stats/metric" - "github.com/rudderlabs/rudder-server/services/rmetrics" -) - -func TestPendingEventsRegistry(t *testing.T) { - mi := metric.Instance - mi.Reset() - defer mi.Reset() - const ( - tablePrefix = "tablePrefix" - workspace = "workspace" - destType = "destType" - ) - t.Run("default", func(t *testing.T) { - mi.Reset() - r := rmetrics.NewPendingEventsRegistry() - r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) - require.EqualValues(t, 1, r.PendingEvents(tablePrefix, workspace, destType).IntValue()) - - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - require.FailNow(t, "unexpected metric in published metrics") - return false - }) - r.Publish() - var metricsCount int - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - metricsCount++ - return true - }) - require.Equal(t, 5, metricsCount, "for each pending event, 3 gauges are created, plus 2 aggregate gauges") - r.Publish() // should be a no-op - - r.Reset() - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - require.FailNow(t, "unexpected metric in published metrics") - return false - }) - - r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - require.FailNow(t, "unexpected metric in published metrics") - return false - }) - r.Publish() - metricsCount = 0 - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - metricsCount++ - return true - }) - require.Equal(t, 5, metricsCount, "a publish after a reset should publish any pending events recorded after reset") - }) - - t.Run("published", func(t *testing.T) { - mi.Reset() - r := rmetrics.NewPendingEventsRegistry(rmetrics.WithPublished()) - r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) - r.DecreasePendingEvents(tablePrefix, workspace, destType, 1) - require.EqualValues(t, 0, r.PendingEvents(tablePrefix, workspace, destType).IntValue()) - var metricsCount int - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - metricsCount++ - return true - }) - require.Equal(t, 5, metricsCount, "for each pending event, 3 gauges are created, plus 2 aggregate gauges") - r.Publish() // should be a no-op - r.Reset() - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - require.FailNow(t, "unexpected metric in published metrics") - return false - }) - r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - require.FailNow(t, "unexpected metric in published metrics") - return false - }) - r.Publish() - metricsCount = 0 - mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { - metricsCount++ - return true - }) - require.Equal(t, 5, metricsCount, "a publish after a reset should publish any pending events recorded after reset") - }) -} From 70ded57b3f25d3a4f46789ea003e96a93dcb2021 Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Thu, 24 Apr 2025 08:31:20 +0530 Subject: [PATCH 03/10] chore: add deleted required files --- services/rmetrics/pending_events.go | 153 +++++++++++++++++++++++ services/rmetrics/pending_events_test.go | 91 ++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 services/rmetrics/pending_events.go create mode 100644 services/rmetrics/pending_events_test.go diff --git a/services/rmetrics/pending_events.go b/services/rmetrics/pending_events.go new file mode 100644 index 0000000000..400de5403c --- /dev/null +++ b/services/rmetrics/pending_events.go @@ -0,0 +1,153 @@ +package rmetrics + +import ( + "fmt" + "sync" + + "github.com/rudderlabs/rudder-go-kit/stats/metric" +) + +const ( + JobsdbPendingEventsCount = "jobsdb_%s_pending_events_count" + All = "ALL" +) + +type ( + DecreasePendingEventsFunc func(tablePrefix, workspace, destType string, value float64) + IncreasePendingEventsFunc func(tablePrefix, workspace, destType string, value float64) +) + +// PendingEventsRegistry is a registry for pending events metrics +type PendingEventsRegistry interface { + // IncreasePendingEvents increments three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges + IncreasePendingEvents(tablePrefix, workspace, destType string, value float64) + // DecreasePendingEvents decrements three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges + DecreasePendingEvents(tablePrefix, workspace, destType string, value float64) + // PendingEvents gets the measurement for pending events metric + PendingEvents(tablePrefix, workspace, destType string) metric.Gauge + // Publish publishes the metrics to the global published metrics registry + Publish() + // Reset resets the registry to a new, non published one and clears the global published metrics registry + Reset() +} + +type Option func(*pendingEventsRegistry) + +// WithPublished creates a registry that writes metrics to the global published metrics registry, without having to call Publish first. +func WithPublished() Option { + return func(per *pendingEventsRegistry) { + per.published = true + per.registry = metric.Instance.GetRegistry(metric.PublishedMetrics) + } +} + +// NewPendingEventsRegistry creates a new PendingEventsRegistry. By default, metrics are not published to the global published metrics registry, until [Publish] is called. +func NewPendingEventsRegistry(opts ...Option) PendingEventsRegistry { + per := &pendingEventsRegistry{ + registry: metric.NewRegistry(), + } + for _, opt := range opts { + opt(per) + } + return per +} + +type pendingEventsRegistry struct { + registryMu sync.RWMutex + published bool + registry metric.Registry +} + +// IncreasePendingEvents increments three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges +func (pem *pendingEventsRegistry) IncreasePendingEvents(tablePrefix, workspace, destType string, value float64) { + pem.registryMu.RLock() + defer pem.registryMu.RUnlock() + + pem.PendingEvents(tablePrefix, workspace, destType).Add(value) + pem.PendingEvents(tablePrefix, All, destType).Add(value) + pem.PendingEvents(tablePrefix, All, All).Add(value) + pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, destType}).Add(value) + pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, All}).Add(value) +} + +// DecreasePendingEvents decrements three gauges, the dest & workspace-specific gauge, plus two aggregate (global) gauges +func (pem *pendingEventsRegistry) DecreasePendingEvents(tablePrefix, workspace, destType string, value float64) { + pem.registryMu.RLock() + defer pem.registryMu.RUnlock() + pem.PendingEvents(tablePrefix, workspace, destType).Sub(value) + pem.PendingEvents(tablePrefix, All, destType).Sub(value) + pem.PendingEvents(tablePrefix, All, All).Sub(value) + pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, destType}).Sub(value) + pem.registry.MustGetGauge(pendingEventsMeasurementAll{tablePrefix, All}).Sub(value) +} + +// PendingEvents gets the measurement for pending events metric +func (pem *pendingEventsRegistry) PendingEvents(tablePrefix, workspace, destType string) metric.Gauge { + return pem.registry.MustGetGauge(newPendingEventsMeasurement(tablePrefix, workspace, destType)) +} + +// Publish publishes the metrics to the global published metrics registry +func (pem *pendingEventsRegistry) Publish() { + pem.registryMu.Lock() + defer pem.registryMu.Unlock() + if pem.published { + return + } + pem.published = true + + publishedRegistry := metric.Instance.GetRegistry(metric.PublishedMetrics) + pem.registry.Range(func(key, value any) bool { // copy all gauge metrics to the published registry + m := key.(metric.Measurement) + switch value := value.(type) { + case metric.Gauge: + publishedRegistry.MustGetGauge(m).Set(value.Value()) + } + return true + }) + pem.registry = publishedRegistry +} + +// Reset resets the registry to a new, non published one and clears the global published metrics registry +func (pem *pendingEventsRegistry) Reset() { + pem.registryMu.Lock() + defer pem.registryMu.Unlock() + pem.registry = metric.NewRegistry() + pem.published = false + metric.Instance.Reset() +} + +func newPendingEventsMeasurement(tablePrefix, workspace, destType string) metric.Measurement { + return pendingEventsMeasurement{tablePrefix, workspace, destType} +} + +type pendingEventsMeasurement struct { + tablePrefix string + workspace string + destType string +} + +func (r pendingEventsMeasurement) GetName() string { + return fmt.Sprintf(JobsdbPendingEventsCount, r.tablePrefix) +} + +func (r pendingEventsMeasurement) GetTags() map[string]string { + return map[string]string{ + "workspaceId": r.workspace, + "destType": r.destType, + } +} + +type pendingEventsMeasurementAll struct { + tablePrefix string + destType string +} + +func (r pendingEventsMeasurementAll) GetName() string { + return fmt.Sprintf(JobsdbPendingEventsCount, r.tablePrefix) + "_all" +} + +func (r pendingEventsMeasurementAll) GetTags() map[string]string { + return map[string]string{ + "destType": r.destType, + } +} diff --git a/services/rmetrics/pending_events_test.go b/services/rmetrics/pending_events_test.go new file mode 100644 index 0000000000..eda9707fc1 --- /dev/null +++ b/services/rmetrics/pending_events_test.go @@ -0,0 +1,91 @@ +package rmetrics_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/stats/metric" + "github.com/rudderlabs/rudder-server/services/rmetrics" +) + +func TestPendingEventsRegistry(t *testing.T) { + mi := metric.Instance + mi.Reset() + defer mi.Reset() + const ( + tablePrefix = "tablePrefix" + workspace = "workspace" + destType = "destType" + ) + t.Run("default", func(t *testing.T) { + mi.Reset() + r := rmetrics.NewPendingEventsRegistry() + r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) + require.EqualValues(t, 1, r.PendingEvents(tablePrefix, workspace, destType).IntValue()) + + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + require.FailNow(t, "unexpected metric in published metrics") + return false + }) + r.Publish() + var metricsCount int + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + metricsCount++ + return true + }) + require.Equal(t, 5, metricsCount, "for each pending event, 3 gauges are created, plus 2 aggregate gauges") + r.Publish() // should be a no-op + + r.Reset() + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + require.FailNow(t, "unexpected metric in published metrics") + return false + }) + + r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + require.FailNow(t, "unexpected metric in published metrics") + return false + }) + r.Publish() + metricsCount = 0 + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + metricsCount++ + return true + }) + require.Equal(t, 5, metricsCount, "a publish after a reset should publish any pending events recorded after reset") + }) + + t.Run("published", func(t *testing.T) { + mi.Reset() + r := rmetrics.NewPendingEventsRegistry(rmetrics.WithPublished()) + r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) + r.DecreasePendingEvents(tablePrefix, workspace, destType, 1) + require.EqualValues(t, 0, r.PendingEvents(tablePrefix, workspace, destType).IntValue()) + var metricsCount int + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + metricsCount++ + return true + }) + require.Equal(t, 5, metricsCount, "for each pending event, 3 gauges are created, plus 2 aggregate gauges") + r.Publish() // should be a no-op + r.Reset() + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + require.FailNow(t, "unexpected metric in published metrics") + return false + }) + r.IncreasePendingEvents(tablePrefix, workspace, destType, 1) + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + require.FailNow(t, "unexpected metric in published metrics") + return false + }) + r.Publish() + metricsCount = 0 + mi.GetRegistry(metric.PublishedMetrics).Range(func(key, value interface{}) bool { + metricsCount++ + return true + }) + require.Equal(t, 5, metricsCount, "a publish after a reset should publish any pending events recorded after reset") + }) +} From 86d2d7a0cbfac0117e1703336db6c5ef8372aea8 Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Thu, 24 Apr 2025 11:19:08 +0530 Subject: [PATCH 04/10] chore: fix router worker test --- router/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/router/worker_test.go b/router/worker_test.go index 2d4f3b9a59..0e4b9fd046 100644 --- a/router/worker_test.go +++ b/router/worker_test.go @@ -418,8 +418,8 @@ var _ = Describe("Proxy Request", func() { ProxyRequestResponseBody: "Err", RespContentType: "application/json", RespStatusCodes: map[int64]int{ - 1: 500, - 2: 501, + 1: 400, + 2: 400, }, RespBodys: map[int64]string{ 1: "err1", From 7baa145a734a829601de1dafc147d0bcb8d48b98 Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Thu, 24 Apr 2025 13:32:51 +0530 Subject: [PATCH 05/10] chore: fix api test for regulation worker --- regulation-worker/internal/delete/api/api.go | 2 +- .../internal/delete/api/api_test.go | 112 +++++++++--------- 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index f58f1e3b9a..ae5f0b4088 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -211,7 +211,7 @@ func mapJobToPayload(job model.Job, destName string, destConfig map[string]inter func getOAuthErrorJob(jobResponses []JobRespSchema) (JobRespSchema, bool) { return lo.Find(jobResponses, func(item JobRespSchema) bool { - return lo.Contains([]string{common.AuthStatusInActive, common.CategoryRefreshToken}, item.AuthErrorCategory) + return lo.Contains([]string{common.CategoryAuthStatusInactive, common.CategoryRefreshToken}, item.AuthErrorCategory) }) } diff --git a/regulation-worker/internal/delete/api/api_test.go b/regulation-worker/internal/delete/api/api_test.go index 7c898fe812..37c59de626 100644 --- a/regulation-worker/internal/delete/api/api_test.go +++ b/regulation-worker/internal/delete/api/api_test.go @@ -648,7 +648,7 @@ var oauthTests = []oauthTestCases{ deleteResponses: []deleteResponseParams{ { status: 400, - jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.AuthStatusInActive), + jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.CategoryAuthStatusInactive), }, }, cpResponses: []testutils.CpResponseParams{ @@ -695,7 +695,7 @@ var oauthTests = []oauthTestCases{ deleteResponses: []deleteResponseParams{ { status: 400, - jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.AuthStatusInActive), + jobResponse: fmt.Sprintf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.CategoryAuthStatusInactive), }, }, cpResponses: []testutils.CpResponseParams{ @@ -714,60 +714,60 @@ var oauthTests = []oauthTestCases{ expectedDeleteStatus_OAuthV2: model.JobStatus{Status: model.JobStatusAborted, Error: fmt.Errorf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.CategoryAuthStatusInactive)}, expectedPayload: `[{"jobId":"16","destType":"ga","config":{"authStatus":"active","rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"greymore@gmail.com","phone":"8463633841","userId":"203984798476"}]}]`, }, - { - name: "when REFRESH_TOKEN error happens but refreshing token fails due to token revocation, fail the job with Failed status", - job: model.Job{ - ID: 17, - WorkspaceID: "1001", - DestinationID: "1234", - Status: model.JobStatus{Status: model.JobStatusPending}, - Users: []model.User{ - { - ID: "203984798477", - Attributes: map[string]string{ - "phone": "8463633841", - "email": "greymore@gmail.com", - }, - }, - }, - }, - dest: model.Destination{ - DestinationID: "1234", - Config: map[string]interface{}{ - "rudderDeleteAccountId": "xyz", - "authStatus": "active", - }, - Name: "GA", - DestDefConfig: defaultDestDefConfig, - }, - deleteResponses: []deleteResponseParams{ - { - status: 500, - jobResponse: `[{"status":"failed","authErrorCategory":"REFRESH_TOKEN", "error": "[GA] invalid credentials"}]`, - }, - }, - - cpResponses: []testutils.CpResponseParams{ - // fetch token http request - { - Code: 200, - Response: `{"secret": {"access_token": "invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token"}}`, - }, - // refresh token http request - { - Code: 403, - Response: `{"status":403,"body":{"message":"[google_analytics] \"invalid_grant\" error, refresh token has been revoked","status":403,"code":"ref_token_invalid_grant"},"code":"ref_token_invalid_grant","access_token":"invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token","developer_token":"dev_token"}`, - }, - // authStatus inactive http request - { - Code: 200, - }, - }, - - expectedDeleteStatus: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, - expectedDeleteStatus_OAuthV2: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, - expectedPayload: `[{"jobId":"17","destType":"ga","config":{"authStatus":"active","rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"greymore@gmail.com","phone":"8463633841","userId":"203984798477"}]}]`, - }, + // { + // name: "when REFRESH_TOKEN error happens but refreshing token fails due to token revocation, fail the job with Failed status", + // job: model.Job{ + // ID: 17, + // WorkspaceID: "1001", + // DestinationID: "1234", + // Status: model.JobStatus{Status: model.JobStatusPending}, + // Users: []model.User{ + // { + // ID: "203984798477", + // Attributes: map[string]string{ + // "phone": "8463633841", + // "email": "greymore@gmail.com", + // }, + // }, + // }, + // }, + // dest: model.Destination{ + // DestinationID: "1234", + // Config: map[string]interface{}{ + // "rudderDeleteAccountId": "xyz", + // "authStatus": "active", + // }, + // Name: "GA", + // DestDefConfig: defaultDestDefConfig, + // }, + // deleteResponses: []deleteResponseParams{ + // { + // status: 500, + // jobResponse: `[{"status":"failed","authErrorCategory":"REFRESH_TOKEN", "error": "[GA] invalid credentials"}]`, + // }, + // }, + + // cpResponses: []testutils.CpResponseParams{ + // // fetch token http request + // { + // Code: 200, + // Response: `{"secret": {"access_token": "invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token"}}`, + // }, + // // refresh token http request + // { + // Code: 403, + // Response: `{"status":403,"body":{"message":"[google_analytics] \"invalid_grant\" error, refresh token has been revoked","status":403,"code":"ref_token_invalid_grant"},"code":"ref_token_invalid_grant","access_token":"invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token","developer_token":"dev_token"}`, + // }, + // // authStatus inactive http request + // { + // Code: 200, + // }, + // }, + + // expectedDeleteStatus: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, + // expectedDeleteStatus_OAuthV2: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, + // expectedPayload: `[{"jobId":"17","destType":"ga","config":{"authStatus":"active","rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"greymore@gmail.com","phone":"8463633841","userId":"203984798477"}]}]`, + // }, } type mockIdentifier struct { From 5177a8b424369217c15573661c2c243d4a6840e5 Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Thu, 24 Apr 2025 13:35:37 +0530 Subject: [PATCH 06/10] chore: remove comment from test cases --- .../internal/delete/api/api_test.go | 108 +++++++++--------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/regulation-worker/internal/delete/api/api_test.go b/regulation-worker/internal/delete/api/api_test.go index 37c59de626..9b2ec92b14 100644 --- a/regulation-worker/internal/delete/api/api_test.go +++ b/regulation-worker/internal/delete/api/api_test.go @@ -714,60 +714,60 @@ var oauthTests = []oauthTestCases{ expectedDeleteStatus_OAuthV2: model.JobStatus{Status: model.JobStatusAborted, Error: fmt.Errorf(`[{"status":"failed","authErrorCategory": "%v", "error": "User does not have sufficient permissions"}]`, common.CategoryAuthStatusInactive)}, expectedPayload: `[{"jobId":"16","destType":"ga","config":{"authStatus":"active","rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"greymore@gmail.com","phone":"8463633841","userId":"203984798476"}]}]`, }, - // { - // name: "when REFRESH_TOKEN error happens but refreshing token fails due to token revocation, fail the job with Failed status", - // job: model.Job{ - // ID: 17, - // WorkspaceID: "1001", - // DestinationID: "1234", - // Status: model.JobStatus{Status: model.JobStatusPending}, - // Users: []model.User{ - // { - // ID: "203984798477", - // Attributes: map[string]string{ - // "phone": "8463633841", - // "email": "greymore@gmail.com", - // }, - // }, - // }, - // }, - // dest: model.Destination{ - // DestinationID: "1234", - // Config: map[string]interface{}{ - // "rudderDeleteAccountId": "xyz", - // "authStatus": "active", - // }, - // Name: "GA", - // DestDefConfig: defaultDestDefConfig, - // }, - // deleteResponses: []deleteResponseParams{ - // { - // status: 500, - // jobResponse: `[{"status":"failed","authErrorCategory":"REFRESH_TOKEN", "error": "[GA] invalid credentials"}]`, - // }, - // }, - - // cpResponses: []testutils.CpResponseParams{ - // // fetch token http request - // { - // Code: 200, - // Response: `{"secret": {"access_token": "invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token"}}`, - // }, - // // refresh token http request - // { - // Code: 403, - // Response: `{"status":403,"body":{"message":"[google_analytics] \"invalid_grant\" error, refresh token has been revoked","status":403,"code":"ref_token_invalid_grant"},"code":"ref_token_invalid_grant","access_token":"invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token","developer_token":"dev_token"}`, - // }, - // // authStatus inactive http request - // { - // Code: 200, - // }, - // }, - - // expectedDeleteStatus: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, - // expectedDeleteStatus_OAuthV2: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, - // expectedPayload: `[{"jobId":"17","destType":"ga","config":{"authStatus":"active","rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"greymore@gmail.com","phone":"8463633841","userId":"203984798477"}]}]`, - // }, + { + name: "when REFRESH_TOKEN error happens but refreshing token fails due to token revocation, fail the job with Failed status", + job: model.Job{ + ID: 17, + WorkspaceID: "1001", + DestinationID: "1234", + Status: model.JobStatus{Status: model.JobStatusPending}, + Users: []model.User{ + { + ID: "203984798477", + Attributes: map[string]string{ + "phone": "8463633841", + "email": "greymore@gmail.com", + }, + }, + }, + }, + dest: model.Destination{ + DestinationID: "1234", + Config: map[string]interface{}{ + "rudderDeleteAccountId": "xyz", + "authStatus": "active", + }, + Name: "GA", + DestDefConfig: defaultDestDefConfig, + }, + deleteResponses: []deleteResponseParams{ + { + status: 500, + jobResponse: `[{"status":"failed","authErrorCategory":"REFRESH_TOKEN", "error": "[GA] invalid credentials"}]`, + }, + }, + + cpResponses: []testutils.CpResponseParams{ + // fetch token http request + { + Code: 200, + Response: `{"secret": {"access_token": "invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token"}}`, + }, + // refresh token http request + { + Code: 403, + Response: `{"status":403,"body":{"message":"[google_analytics] \"invalid_grant\" error, refresh token has been revoked","status":403,"code":"ref_token_invalid_grant"},"code":"ref_token_invalid_grant","access_token":"invalid_grant_access_token","refresh_token":"invalid_grant_refresh_token","developer_token":"dev_token"}`, + }, + // authStatus inactive http request + { + Code: 200, + }, + }, + + expectedDeleteStatus: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, + expectedDeleteStatus_OAuthV2: model.JobStatus{Status: model.JobStatusFailed, Error: fmt.Errorf("[google_analytics] \"invalid_grant\" error, refresh token has been revoked")}, + expectedPayload: `[{"jobId":"17","destType":"ga","config":{"authStatus":"active","rudderDeleteAccountId":"xyz"},"userAttributes":[{"email":"greymore@gmail.com","phone":"8463633841","userId":"203984798477"}]}]`, + }, } type mockIdentifier struct { From 5343c36d53656717c5c9622fc3d9e9e9f8b7a6dc Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Thu, 24 Apr 2025 18:03:21 +0530 Subject: [PATCH 07/10] chore: fix test cases and lint issues --- regulation-worker/cmd/main.go | 2 +- regulation-worker/internal/delete/api/api.go | 4 ---- regulation-worker/internal/delete/api/api_test.go | 5 ++--- router/transformer/transformer.go | 4 +++- router/types.go | 12 ------------ 5 files changed, 6 insertions(+), 21 deletions(-) diff --git a/regulation-worker/cmd/main.go b/regulation-worker/cmd/main.go index 8d23eb8d2e..eca7322642 100644 --- a/regulation-worker/cmd/main.go +++ b/regulation-worker/cmd/main.go @@ -152,7 +152,7 @@ func createHTTPClient(conf *config.Config, httpTimeout time.Duration) *http.Clie } return oauthv2http.NewOAuthHttpClient( cli, - common.RudderFlow(common.RudderFlowDelete), + common.RudderFlowDelete, &cache, backendconfig.DefaultBackendConfig, api.GetAuthErrorCategoryFromResponse, &optionalArgs, ) diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index ae5f0b4088..7156e77b0f 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -40,10 +40,6 @@ type APIManager struct { TransformerFeaturesService transformer.FeaturesService } -type oauthDetail struct { - id string -} - func GetAuthErrorCategoryFromResponse(bodyBytes []byte) (string, error) { var jobResp []JobRespSchema if err := jsonrs.Unmarshal(bodyBytes, &jobResp); err != nil { diff --git a/regulation-worker/internal/delete/api/api_test.go b/regulation-worker/internal/delete/api/api_test.go index 9b2ec92b14..64fd9a2ef2 100644 --- a/regulation-worker/internal/delete/api/api_test.go +++ b/regulation-worker/internal/delete/api/api_test.go @@ -829,7 +829,7 @@ func TestOAuth(t *testing.T) { Locker: oauthLock, } cli = oauthv2_http.NewOAuthHttpClient( - cli, common.RudderFlow(common.RudderFlowDelete), + cli, common.RudderFlowDelete, &cache, mockBackendConfig, api.GetAuthErrorCategoryFromResponse, &optionalArgs, ) @@ -843,8 +843,7 @@ func TestOAuth(t *testing.T) { status := api.Delete(ctx, tt.job, tt.dest) require.Equal(t, tt.expectedDeleteStatus.Status, status.Status) if tt.expectedDeleteStatus.Status != model.JobStatusComplete { - exp := tt.expectedDeleteStatus.Error.Error() - exp = tt.expectedDeleteStatus_OAuthV2.Error.Error() + exp := tt.expectedDeleteStatus_OAuthV2.Error.Error() jobError := strings.Replace(exp, "__cfgBE_server__", cfgBeSrv.URL, 1) require.Contains(t, strings.ToLower(status.Error.Error()), strings.ToLower(jobError)) diff --git a/router/transformer/transformer.go b/router/transformer/transformer.go index 057d80c60b..fb5e2332c9 100644 --- a/router/transformer/transformer.go +++ b/router/transformer/transformer.go @@ -522,7 +522,9 @@ func (trans *handle) ProxyRequest(ctx context.Context, proxyReqParams *ProxyRequ transResp.routerJobResponseBodys[metadata.JobID] = transportResponse.InterceptorResponse.Response } } - respData = []byte(transportResponse.InterceptorResponse.Response) + if transportResponse.InterceptorResponse.Response != "" { + respData = []byte(transportResponse.InterceptorResponse.Response) + } trans.stats.NewTaggedStat("transformer_client_response_total_events", stats.CountType, labels).Count(len(transResp.routerJobResponseCodes)) diff --git a/router/types.go b/router/types.go index 1e985d706f..441a626dbb 100644 --- a/router/types.go +++ b/router/types.go @@ -1,7 +1,6 @@ package router import ( - "context" "encoding/json" "sync" "time" @@ -22,17 +21,6 @@ type workerJobStatus struct { statTags map[string]string parameters routerutils.JobParameters } - -type HandleDestOAuthRespParams struct { - ctx context.Context - destinationJob types.DestinationJobT - workerID int - trRespStCd int - trRespBody string - secret json.RawMessage - contentType string -} - type Diagnostic struct { diagnosisTicker *time.Ticker requestsMetricLock sync.RWMutex From b02b4d05a2face4eedff5f0eebddc0f61243ebde Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Fri, 25 Apr 2025 16:35:38 +0530 Subject: [PATCH 08/10] chore: fix test cases --- router/transformer/transformer_test.go | 3 +++ services/oauth/v2/controlplane/cp_connector_test.go | 2 +- services/oauth/v2/oauth_test.go | 8 ++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/router/transformer/transformer_test.go b/router/transformer/transformer_test.go index 0b8c1b3f59..778d56c0a6 100644 --- a/router/transformer/transformer_test.go +++ b/router/transformer/transformer_test.go @@ -345,6 +345,7 @@ func TestProxyRequest(t *testing.T) { ResponseData: tc.postParameters, DestName: "not_found_dest", Adapter: &mockAdapter{url: srv.URL}, + DestInfo: &v2.DestinationInfo{}, } r := tr.ProxyRequest(ctx, reqParams) stCd := r.ProxyRequestStatusCode @@ -384,6 +385,7 @@ func TestProxyRequest(t *testing.T) { ResponseData: tc.postParameters, DestName: tc.destName, Adapter: &mockAdapter{url: srv.URL}, + DestInfo: &v2.DestinationInfo{}, } r := tr.ProxyRequest(ctx, reqParams) stCd := r.ProxyRequestStatusCode @@ -1967,6 +1969,7 @@ func TestTransformerMetrics(t *testing.T) { stats: statsStore, logger: logger.NOP, tr: &http.Transport{}, + clientOAuth: srv.Client(), expirationTimeDiff: expTimeDiff, transformRequestTimerStat: statsStore.NewStat("router.transformer_request_time", stats.TimerType), // Add this line } diff --git a/services/oauth/v2/controlplane/cp_connector_test.go b/services/oauth/v2/controlplane/cp_connector_test.go index 7001eba6a8..3bf309116c 100644 --- a/services/oauth/v2/controlplane/cp_connector_test.go +++ b/services/oauth/v2/controlplane/cp_connector_test.go @@ -64,7 +64,7 @@ var _ = Describe("CpConnector", func() { BasicAuthUser: &testutils.BasicAuthMock{}, }) Expect(statusCode).To(Equal(http.StatusInternalServerError)) - Expect(`{"errorType":"timeout","message":"mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out"}`).To(MatchJSON(respBody)) + Expect(`{"errorType":"timeout","message":"mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out"}`).To(MatchJSON(respBody)) }) It("Test CpApiCall function to test connection reset by peer", func() { diff --git a/services/oauth/v2/oauth_test.go b/services/oauth/v2/oauth_test.go index 604724f9ee..0fa9687ce3 100644 --- a/services/oauth/v2/oauth_test.go +++ b/services/oauth/v2/oauth_test.go @@ -354,10 +354,10 @@ var _ = Describe("Oauth", func() { Expect(statusCode).To(Equal(http.StatusInternalServerError)) expectedResponse := &v2.AuthResponse{ Err: "timeout", - ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out", + ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out", } Expect(response).To(Equal(expectedResponse)) - Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out"))) + Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out"))) }) }) @@ -573,10 +573,10 @@ var _ = Describe("Oauth", func() { Expect(statusCode).To(Equal(http.StatusInternalServerError)) expectedResponse := &v2.AuthResponse{ Err: "timeout", - ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out", + ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out", } Expect(response).To(Equal(expectedResponse)) - Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out"))) + Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out"))) }) It("refreshToken function call when stored cache is same as provided secret and cpApiCall returns a failed response because of faulty implementation in some downstream service", func() { From b9257aa4a2f1ab30477e24ddaf95dd4cb1cf4733 Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Fri, 25 Apr 2025 16:54:40 +0530 Subject: [PATCH 09/10] chore: fix test cases for oauth_test.go --- services/oauth/v2/oauth_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/oauth/v2/oauth_test.go b/services/oauth/v2/oauth_test.go index 0fa9687ce3..604724f9ee 100644 --- a/services/oauth/v2/oauth_test.go +++ b/services/oauth/v2/oauth_test.go @@ -354,10 +354,10 @@ var _ = Describe("Oauth", func() { Expect(statusCode).To(Equal(http.StatusInternalServerError)) expectedResponse := &v2.AuthResponse{ Err: "timeout", - ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out", + ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out", } Expect(response).To(Equal(expectedResponse)) - Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out"))) + Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out"))) }) }) @@ -573,10 +573,10 @@ var _ = Describe("Oauth", func() { Expect(statusCode).To(Equal(http.StatusInternalServerError)) expectedResponse := &v2.AuthResponse{ Err: "timeout", - ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out", + ErrorMessage: "mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out", } Expect(response).To(Equal(expectedResponse)) - Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out"))) + Expect(err).To(MatchError(fmt.Errorf("error occurred while fetching/refreshing account info from CP: mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out"))) }) It("refreshToken function call when stored cache is same as provided secret and cpApiCall returns a failed response because of faulty implementation in some downstream service", func() { From d6a3657e3e3f2614cf0e9593459e0b28954f8143 Mon Sep 17 00:00:00 2001 From: ItsSudip Date: Fri, 25 Apr 2025 17:21:20 +0530 Subject: [PATCH 10/10] chore: fix test cases for cp_connector_test.go --- services/oauth/v2/controlplane/cp_connector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/oauth/v2/controlplane/cp_connector_test.go b/services/oauth/v2/controlplane/cp_connector_test.go index 3bf309116c..7001eba6a8 100644 --- a/services/oauth/v2/controlplane/cp_connector_test.go +++ b/services/oauth/v2/controlplane/cp_connector_test.go @@ -64,7 +64,7 @@ var _ = Describe("CpConnector", func() { BasicAuthUser: &testutils.BasicAuthMock{}, }) Expect(statusCode).To(Equal(http.StatusInternalServerError)) - Expect(`{"errorType":"timeout","message":"mock mock 127.0.0.1:1234->127.0.0.1:12340: read: operation timed out"}`).To(MatchJSON(respBody)) + Expect(`{"errorType":"timeout","message":"mock mock 127.0.0.1:1234->127.0.0.1:12340: read: connection timed out"}`).To(MatchJSON(respBody)) }) It("Test CpApiCall function to test connection reset by peer", func() {