Skip to content

Commit 1c8350e

Browse files
mihir20atzoum
authored andcommitted
fix: add source type to internalBatch request stats (#6030)
# Description - Add sourceType to batch request statistics for internalBatch requests - Update getSourceConfigFromSourceID method to return source type - Modify handle logic to populate sourceType in batch request stats ## Linear Ticket pipe-2160 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 3d0a05a commit 1c8350e

File tree

2 files changed

+15
-11
lines changed

2 files changed

+15
-11
lines changed

gateway/gateway_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1865,7 +1865,7 @@ var _ = Describe("Gateway", func() {
18651865
"reqType": "internalBatch",
18661866
"workspaceId": WorkspaceID,
18671867
"sourceID": SourceIDEnabled,
1868-
"sourceType": "",
1868+
"sourceType": sourceType2,
18691869
"sdkVersion": "",
18701870
"sourceDefName": SourceIDEnabled,
18711871
},
@@ -1881,7 +1881,7 @@ var _ = Describe("Gateway", func() {
18811881
"reqType": "internalBatch",
18821882
"workspaceId": WorkspaceID,
18831883
"sourceID": SourceIDEnabled,
1884-
"sourceType": "",
1884+
"sourceType": sourceType2,
18851885
"sdkVersion": "",
18861886
"sourceDefName": SourceIDEnabled,
18871887
},
@@ -1945,7 +1945,7 @@ var _ = Describe("Gateway", func() {
19451945
"reqType": "internalBatch",
19461946
"workspaceId": WorkspaceID,
19471947
"sourceID": SourceIDEnabled,
1948-
"sourceType": "",
1948+
"sourceType": sourceType2,
19491949
"sdkVersion": "",
19501950
"sourceDefName": SourceIDEnabled,
19511951
},
@@ -2073,7 +2073,7 @@ var _ = Describe("Gateway", func() {
20732073
"reqType": "internalBatch",
20742074
"workspaceId": WorkspaceID,
20752075
"sourceID": SourceIDEnabled,
2076-
"sourceType": "",
2076+
"sourceType": sourceType2,
20772077
"sdkVersion": "",
20782078
"source": "",
20792079
"sourceDefName": SourceIDEnabled,
@@ -2085,7 +2085,7 @@ var _ = Describe("Gateway", func() {
20852085
"reqType": "internalBatch",
20862086
"workspaceId": WorkspaceID,
20872087
"sourceID": SourceIDEnabled,
2088-
"sourceType": "",
2088+
"sourceType": sourceType2,
20892089
"sdkVersion": "",
20902090
"source": "",
20912091
"sourceDefName": SourceIDEnabled,
@@ -2120,7 +2120,7 @@ var _ = Describe("Gateway", func() {
21202120
"reqType": "internalBatch",
21212121
"workspaceId": WorkspaceID,
21222122
"sourceID": SourceIDEnabled,
2123-
"sourceType": "",
2123+
"sourceType": sourceType2,
21242124
"sdkVersion": "",
21252125
"source": "",
21262126
"reason": "storeFailed",
@@ -2133,7 +2133,7 @@ var _ = Describe("Gateway", func() {
21332133
"reqType": "internalBatch",
21342134
"workspaceId": WorkspaceID,
21352135
"sourceID": SourceIDEnabled,
2136-
"sourceType": "",
2136+
"sourceType": sourceType2,
21372137
"sdkVersion": "",
21382138
"source": "",
21392139
"sourceDefName": SourceIDEnabled,

gateway/handle.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -907,17 +907,21 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
907907
}
908908
}
909909

910-
writeKey, sourceDefName, sourceName, ok := gw.getSourceConfigFromSourceID(msg.Properties.SourceID)
910+
writeKey, sourceDefName, sourceName, sourceType := "", "", "", ""
911+
src, ok := gw.getSourceConfigFromSourceID(msg.Properties.SourceID)
911912
if !ok {
912913
// only live-events will not work if writeKey is not found
913914
gw.logger.Errorn("unable to get source details from sourceID",
914915
logger.NewStringField("messageId", messageID),
915916
obskit.SourceID(msg.Properties.SourceID))
917+
} else {
918+
writeKey, sourceDefName, sourceName, sourceType = src.WriteKey, src.SourceDefinition.Name, src.Name, src.SourceDefinition.Category
916919
}
917920
stat.SourceID = msg.Properties.SourceID
918921
stat.WorkspaceID = msg.Properties.WorkspaceID
919922
stat.WriteKey = writeKey
920923
stat.SourceDefName = sourceDefName
924+
stat.SourceType = sourceType
921925

922926
if isUserSuppressed(msg.Properties.WorkspaceID, msg.Properties.UserID, msg.Properties.SourceID) {
923927
gw.logger.Infon("suppressed event",
@@ -1042,13 +1046,13 @@ func fillRequestIP(event []byte, ip string) ([]byte, error) {
10421046
return event, nil
10431047
}
10441048

1045-
func (gw *Handle) getSourceConfigFromSourceID(sourceID string) (writeKey, sourceDefName, sourceName string, ok bool) {
1049+
func (gw *Handle) getSourceConfigFromSourceID(sourceID string) (src backendconfig.SourceT, ok bool) {
10461050
gw.configSubscriberLock.RLock()
10471051
defer gw.configSubscriberLock.RUnlock()
10481052
if s, ok := gw.sourceIDSourceMap[sourceID]; ok {
1049-
return s.WriteKey, s.SourceDefinition.Name, s.Name, true
1053+
return s, true
10501054
}
1051-
return "", "", "", false
1055+
return backendconfig.SourceT{}, false
10521056
}
10531057

10541058
func (gw *Handle) storeJobs(ctx context.Context, jobs []*jobsdb.JobT) error {

0 commit comments

Comments
 (0)