Skip to content

Commit eda76fb

Browse files
authored
feat: add reporting for failed hydration events (#6521)
# Description add reporting for failed hydration events ## Linear Ticket pipe-2487 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 26b2661 commit eda76fb

File tree

8 files changed

+267
-30
lines changed

8 files changed

+267
-30
lines changed

integration_test/srchydration/src_hydration_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ func TestSrcHydration(t *testing.T) {
278278
prepareExpectedReports(t, sourceID2, false, numEvents)...,
279279
)
280280
if tt.failOnHydrationFailure {
281+
expectedReports = append(expectedReports, prepareSrcHydrationFailedReports(t, sourceID3, numEvents)...)
281282
expectedReports = append(expectedReports, prepareExpectedReports(t, sourceID3, true, numEvents)...)
282283
}
283284
requireReports(t, ctx, postgresContainer.DB, expectedReports)
@@ -289,6 +290,27 @@ func TestSrcHydration(t *testing.T) {
289290
})
290291
}
291292

293+
func prepareSrcHydrationFailedReports(t *testing.T, sourceID string, numEvents int) []reportRow {
294+
t.Helper()
295+
return []reportRow{
296+
{
297+
WorkspaceID: workspaceID,
298+
InstanceID: "1",
299+
SourceID: sourceID,
300+
DestinationID: "",
301+
InPU: "destination_filter",
302+
PU: "source_hydration",
303+
StatusCode: 500,
304+
Status: "aborted",
305+
Count: int64(numEvents),
306+
TerminalState: false,
307+
InitialState: false,
308+
SourceCategory: "webhook",
309+
EventType: "identify",
310+
},
311+
}
312+
}
313+
292314
func prepareExpectedReports(t *testing.T, sourceId string, gwOnly bool, numEvents int) []reportRow {
293315
t.Helper()
294316
if gwOnly {
@@ -331,7 +353,7 @@ func prepareExpectedReports(t *testing.T, sourceId string, gwOnly bool, numEvent
331353
InstanceID: "1",
332354
SourceID: sourceId,
333355
DestinationID: "destination-1",
334-
InPU: "destination_filter",
356+
InPU: "source_hydration",
335357
PU: "event_filter",
336358
StatusCode: 200,
337359
Status: "succeeded",

processor/processor.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1653,6 +1653,7 @@ type preTransformationMessage struct {
16531653
jobList []*jobsdb.JobT
16541654
sourceDupStats map[dupStatKey]int
16551655
dedupKeys map[string]struct{}
1656+
srcHydrationEnabledMap map[SourceIDT]bool
16561657
}
16571658

16581659
func (proc *Handle) preprocessStage(partition string, subJobs subJob, delay time.Duration) (*srcHydrationMessage, error) {
@@ -2236,7 +2237,7 @@ func (proc *Handle) pretransformStage(partition string, preTrans *preTransformat
22362237
// Placing the trackingPlan validation filters here.
22372238
// Else further down events are duplicated by destId, so multiple validation takes places for same event
22382239
validateEventsStart := time.Now()
2239-
validatedEventsBySourceId, validatedReportMetrics, trackingPlanEnabledMap := proc.validateEvents(preTrans.groupedEventsBySourceId, preTrans.eventsByMessageID)
2240+
validatedEventsBySourceId, validatedReportMetrics, sourcePipelineSteps := proc.validateEvents(preTrans.groupedEventsBySourceId, preTrans.eventsByMessageID, preTrans.srcHydrationEnabledMap)
22402241
validateEventsTime := time.Since(validateEventsStart)
22412242
defer proc.stats.validateEventsTime(preTrans.partition).SendTiming(validateEventsTime)
22422243

@@ -2313,7 +2314,7 @@ func (proc *Handle) pretransformStage(partition string, preTrans *preTransformat
23132314
return &transformationMessage{
23142315
preTrans.subJobs.ctx,
23152316
groupedEvents,
2316-
trackingPlanEnabledMap,
2317+
sourcePipelineSteps,
23172318
preTrans.eventsByMessageID,
23182319
uniqueMessageIdsBySrcDestKey,
23192320
preTrans.reportMetrics,
@@ -2375,11 +2376,17 @@ func (proc *Handle) storeArchiveJobs(ctx context.Context, archivalJobs []*jobsdb
23752376
return nil
23762377
}
23772378

2379+
type SourcePipelineSteps struct {
2380+
srcHydration bool
2381+
trackingPlanValidation bool
2382+
}
2383+
type sourceIDPipelineSteps map[SourceIDT]SourcePipelineSteps
2384+
23782385
type transformationMessage struct {
23792386
ctx context.Context
23802387
groupedEvents map[string][]types.TransformerEvent
23812388

2382-
trackingPlanEnabledMap map[SourceIDT]bool
2389+
srcPipelineSteps sourceIDPipelineSteps
23832390
eventsByMessageID map[string]types.SingularEventWithReceivedAt
23842391
uniqueMessageIdsBySrcDestKey map[string]map[string]struct{}
23852392
reportMetrics []*reportingtypes.PUReportedMetric
@@ -2469,7 +2476,7 @@ func (proc *Handle) userTransformStage(partition string, in *transformationMessa
24692476
partition,
24702477
srcAndDestKey,
24712478
eventList,
2472-
in.trackingPlanEnabledMap,
2479+
in.srcPipelineSteps,
24732480
in.eventsByMessageID,
24742481
in.uniqueMessageIdsBySrcDestKey,
24752482
)
@@ -2938,15 +2945,7 @@ type userTransformAndFilterOutput struct {
29382945
transformAt string
29392946
}
29402947

2941-
func (proc *Handle) userTransformAndFilter(
2942-
ctx context.Context,
2943-
partition string,
2944-
srcAndDestKey string,
2945-
eventList []types.TransformerEvent,
2946-
trackingPlanEnabledMap map[SourceIDT]bool,
2947-
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
2948-
uniqueMessageIdsBySrcDestKey map[string]map[string]struct{},
2949-
) userTransformAndFilterOutput {
2948+
func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAndDestKey string, eventList []types.TransformerEvent, srcPipelineSteps sourceIDPipelineSteps, eventsByMessageID map[string]types.SingularEventWithReceivedAt, uniqueMessageIdsBySrcDestKey map[string]map[string]struct{}) userTransformAndFilterOutput {
29502949
if len(eventList) == 0 {
29512950
return userTransformAndFilterOutput{
29522951
eventsToTransform: eventList,
@@ -2967,7 +2966,7 @@ func (proc *Handle) userTransformAndFilter(
29672966
transformationEnabled := len(destination.Transformations) > 0
29682967
proc.config.configSubscriberLock.RUnlock()
29692968

2970-
trackingPlanEnabled := trackingPlanEnabledMap[SourceIDT(sourceID)]
2969+
sourceSteps := srcPipelineSteps[SourceIDT(sourceID)]
29712970

29722971
var inCountMap map[string]int64
29732972
var inCountMetadataMap map[string]MetricMetadata
@@ -3013,9 +3012,12 @@ func (proc *Handle) userTransformAndFilter(
30133012
var response types.Response
30143013
var eventsToTransform []types.TransformerEvent
30153014
var inPU string
3016-
if trackingPlanEnabled {
3015+
switch {
3016+
case sourceSteps.trackingPlanValidation:
30173017
inPU = reportingtypes.TRACKINGPLAN_VALIDATOR
3018-
} else {
3018+
case sourceSteps.srcHydration:
3019+
inPU = reportingtypes.SOURCE_HYDRATION
3020+
default:
30193021
inPU = reportingtypes.DESTINATION_FILTER
30203022
}
30213023
// Send to custom transformer only if the destination has a transformer enabled

processor/processor_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,8 @@ var sampleBackendConfig = backendconfig.ConfigT{
930930
{
931931
ID: fblaSourceId,
932932
SourceDefinition: backendconfig.SourceDefinitionT{
933-
Name: "fbla",
933+
Name: "fbla",
934+
Category: "webhook",
934935
Options: backendconfig.SourceDefinitionOptions{
935936
Hydration: struct {
936937
Enabled bool
@@ -942,7 +943,8 @@ var sampleBackendConfig = backendconfig.ConfigT{
942943
{
943944
ID: fblaSourceId2,
944945
SourceDefinition: backendconfig.SourceDefinitionT{
945-
Name: "fbla",
946+
Name: "fbla",
947+
Category: "webhook",
946948
Options: backendconfig.SourceDefinitionOptions{
947949
Hydration: struct {
948950
Enabled bool
@@ -954,7 +956,8 @@ var sampleBackendConfig = backendconfig.ConfigT{
954956
{
955957
ID: fblaSourceId5,
956958
SourceDefinition: backendconfig.SourceDefinitionT{
957-
Name: "fbla",
959+
Name: "fbla",
960+
Category: "webhook",
958961
Options: backendconfig.SourceDefinitionOptions{
959962
Hydration: struct {
960963
Enabled bool
@@ -966,7 +969,8 @@ var sampleBackendConfig = backendconfig.ConfigT{
966969
{
967970
ID: fblaSourceId3,
968971
SourceDefinition: backendconfig.SourceDefinitionT{
969-
Name: "fbla",
972+
Name: "fbla",
973+
Category: "webhook",
970974
Options: backendconfig.SourceDefinitionOptions{
971975
Hydration: struct {
972976
Enabled bool
@@ -978,7 +982,8 @@ var sampleBackendConfig = backendconfig.ConfigT{
978982
{
979983
ID: fblaSourceId4,
980984
SourceDefinition: backendconfig.SourceDefinitionT{
981-
Name: "fbla",
985+
Name: "fbla",
986+
Category: "webhook",
982987
Options: backendconfig.SourceDefinitionOptions{
983988
Hydration: struct {
984989
Enabled bool

processor/src_hydration_stage.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package processor
22

33
import (
44
"context"
5+
"net/http"
56
"strconv"
67
"sync"
78

9+
"github.com/rudderlabs/rudder-server/utils/misc"
10+
811
kitctx "github.com/rudderlabs/rudder-go-kit/context"
912

1013
"github.com/samber/lo"
@@ -65,9 +68,10 @@ func (proc *Handle) srcHydrationStage(partition string, message *srcHydrationMes
6568

6669
// Mutex to protect shared maps
6770
var sharedMapsMutex sync.Mutex
68-
71+
srcHydrationEnabledMap := make(map[SourceIDT]bool)
6972
for sourceId, jobs := range message.groupedEventsBySourceId {
7073
g.Go(func() error {
74+
var hydrationFailedReports []*reportingtypes.PUReportedMetric
7175
source, err := proc.getSourceBySourceID(string(sourceId))
7276
if err != nil {
7377
return err
@@ -82,12 +86,21 @@ func (proc *Handle) srcHydrationStage(partition string, message *srcHydrationMes
8286
obskit.SourceID(string(sourceId)),
8387
obskit.Error(err),
8488
)
89+
// report metrics for failed hydration
90+
hydrationFailedReports = proc.getHydrationFailedReports(source, jobs, err)
8591
}
8692

8793
// Update shared maps with mutex protection
8894
sharedMapsMutex.Lock()
8995
defer sharedMapsMutex.Unlock()
9096

97+
srcHydrationEnabledMap[sourceId] = true
98+
// Append hydration failed reports if any
99+
if hydrationFailedReports != nil {
100+
message.reportMetrics = append(message.reportMetrics, hydrationFailedReports...)
101+
}
102+
103+
// If no hydrated jobs, remove entry from groupedEventsBySourceId
91104
if len(hydratedJobs) == 0 {
92105
delete(message.groupedEventsBySourceId, sourceId)
93106
return nil
@@ -159,6 +172,7 @@ func (proc *Handle) srcHydrationStage(partition string, message *srcHydrationMes
159172
jobList: message.jobList,
160173
sourceDupStats: message.sourceDupStats,
161174
dedupKeys: message.dedupKeys,
175+
srcHydrationEnabledMap: srcHydrationEnabledMap,
162176
}, nil
163177
}
164178

@@ -195,3 +209,37 @@ func (proc *Handle) hydrate(ctx context.Context, source *backendconfig.SourceT,
195209
return originalEvent
196210
}), nil
197211
}
212+
213+
func (proc *Handle) getHydrationFailedReports(source *backendconfig.SourceT, jobs []types.TransformerEvent, err error) []*reportingtypes.PUReportedMetric {
214+
metricsMap := make(map[string]map[string]*reportingtypes.PUReportedMetric)
215+
return lo.FilterMap(jobs, func(job types.TransformerEvent, _ int) (*reportingtypes.PUReportedMetric, bool) {
216+
eventName, _ := misc.MapLookup(job.Message, "event").(string)
217+
eventType, _ := misc.MapLookup(job.Message, "type").(string)
218+
if _, ok := metricsMap[eventName]; !ok {
219+
metricsMap[eventName] = make(map[string]*reportingtypes.PUReportedMetric)
220+
}
221+
if _, ok := metricsMap[eventName][eventType]; !ok {
222+
sampleEvent, _ := jsonrs.Marshal(job.Message)
223+
metricsMap[eventName][eventType] = &reportingtypes.PUReportedMetric{
224+
ConnectionDetails: reportingtypes.ConnectionDetails{
225+
SourceID: source.ID,
226+
SourceDefinitionID: source.SourceDefinition.ID,
227+
SourceCategory: source.SourceDefinition.Category,
228+
},
229+
PUDetails: *reportingtypes.CreatePUDetails(reportingtypes.DESTINATION_FILTER, reportingtypes.SOURCE_HYDRATION, false, false),
230+
StatusDetail: &reportingtypes.StatusDetail{
231+
Status: jobsdb.Aborted.State,
232+
Count: 1,
233+
StatusCode: http.StatusInternalServerError,
234+
SampleResponse: err.Error(),
235+
SampleEvent: sampleEvent,
236+
EventName: eventName,
237+
EventType: eventType,
238+
},
239+
}
240+
return metricsMap[eventName][eventType], true
241+
}
242+
metricsMap[eventName][eventType].StatusDetail.Count += 1
243+
return nil, false
244+
})
245+
}

0 commit comments

Comments
 (0)