Skip to content

Commit 399c4c6

Browse files
authored
chore: emit filtered metrics from all non-ut stages and clean up diff metrics (#6485)
# Description ## Summary This PR standardizes metric collection across non-UT (User Transformation) stages by consistently capturing `succeeded`, `aborted`, and `filtered` metrics, while removing redundant `diff` metrics. ## Current State **DESTINATION_FILTER stage:** - Currently captures `diff` metrics (calculated as `succeeded + aborted + filtered - input`) to track dropped events **EVENT_FILTER and DEST_TRANSFORMER stages:** - Capture `filtered` metrics to track dropped events - Also capture `diff` metrics (redundant, since we explicitly track `succeeded`, `aborted`, and `filtered` states) ## Changes This PR makes metric collection consistent across all non-UT stages: 1. **DESTINATION_FILTER**: - ❌ Stop capturing `diff` metrics - ✅ Start capturing `filtered` metrics 2. **EVENT_FILTER and DEST_TRANSFORMER**: - ❌ Stop capturing `diff` metrics - ✅ Continue capturing `filtered` metrics (no change) ## Result All non-UT stages now consistently capture three explicit metric types: - `succeeded` - `aborted` - `filtered` The `diff` metrics have been completely removed as they are redundant when we explicitly track all three states. ## Linear Ticket Fixes [PIPE-2507](https://linear.app/rudderstack/issue/PIPE-2507/populated-diff-counts-in-hourly-and-daily-agg) ## Security - [ ] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 45e5dba commit 399c4c6

File tree

2 files changed

+45
-90
lines changed

2 files changed

+45
-90
lines changed

integration_test/reporting_dropped_events/reporting_dropped_events_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
)
3434

3535
func TestReportingDroppedEvents(t *testing.T) {
36-
// FIXME: destination filter should drop events using a [filtered] status instead of a [diff] status with negative count
3736
t.Run("Events dropped in destination filter stage", func(t *testing.T) {
3837
config.Reset()
3938
defer config.Reset()
@@ -83,11 +82,11 @@ func TestReportingDroppedEvents(t *testing.T) {
8382
}, 20*time.Second, 1*time.Second, "all gw events should be successfully processed")
8483

8584
require.Eventually(t, func() bool {
86-
var droppedCount sql.NullInt64
87-
require.NoError(t, postgresContainer.DB.QueryRow("SELECT sum(count) FROM reports WHERE source_id = 'source-1' and destination_id = '' AND pu = 'destination_filter' and status = 'diff' and error_type = ''").Scan(&droppedCount))
88-
t.Logf("destination_filter diff count: %d", droppedCount.Int64)
85+
var filteredCount sql.NullInt64
86+
require.NoError(t, postgresContainer.DB.QueryRow("SELECT sum(count) FROM reports WHERE source_id = 'source-1' and destination_id = '' AND pu = 'destination_filter' and status = 'filtered' and error_type = ''").Scan(&filteredCount))
87+
t.Logf("destination_filter filtered count: %d", filteredCount.Int64)
8988
logRows(t, postgresContainer.DB, "SELECT * FROM reports")
90-
return droppedCount.Int64 == -10
89+
return filteredCount.Int64 == 10
9190
}, 10*time.Second, 1*time.Second, "all events should be dropped in destination_filter stage")
9291

9392
cancel()

processor/processor.go

Lines changed: 41 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,9 +1657,6 @@ type preTransformationMessage struct {
16571657
eventBlockingStatusDetailsMap map[string]map[string]*reportingtypes.StatusDetail
16581658
destFilterStatusDetailMap map[string]map[string]*reportingtypes.StatusDetail
16591659
reportMetrics []*reportingtypes.PUReportedMetric
1660-
inCountMetadataMap map[string]MetricMetadata
1661-
inCountMap map[string]int64
1662-
outCountMap map[string]int64
16631660
totalEvents int
16641661
marshalStart time.Time
16651662
groupedEventsBySourceId map[SourceIDT][]types.TransformerEvent
@@ -1717,12 +1714,8 @@ func (proc *Handle) preprocessStage(partition string, subJobs subJob, delay time
17171714
sourceDupStats := make(map[dupStatKey]int)
17181715

17191716
reportMetrics := make([]*reportingtypes.PUReportedMetric, 0)
1720-
inCountMap := make(map[string]int64)
1721-
inCountMetadataMap := make(map[string]MetricMetadata)
17221717
connectionDetailsMap := make(map[string]*reportingtypes.ConnectionDetails)
17231718
statusDetailsMap := make(map[string]map[string]*reportingtypes.StatusDetail)
1724-
1725-
outCountMap := make(map[string]int64) // destinations enabled
17261719
destFilterStatusDetailMap := make(map[string]map[string]*reportingtypes.StatusDetail)
17271720
enricherStatusDetailsMap := make(map[string]map[string]*reportingtypes.StatusDetail)
17281721
botManagementStatusDetailsMap := make(map[string]map[string]*reportingtypes.StatusDetail)
@@ -2016,8 +2009,8 @@ func (proc *Handle) preprocessStage(partition string, subJobs subJob, delay time
20162009
// REPORTING - GATEWAY metrics - START
20172010
if proc.isReportingEnabled() {
20182011
proc.updateMetricMaps(
2019-
inCountMetadataMap,
2020-
inCountMap,
2012+
nil,
2013+
nil,
20212014
connectionDetailsMap,
20222015
statusDetailsMap,
20232016
transformerEvent,
@@ -2063,6 +2056,25 @@ func (proc *Handle) preprocessStage(partition string, subJobs subJob, delay time
20632056
// if empty destinationID is passed in this fn all the destinations for the source are validated
20642057
// else only passed destinationID will be validated
20652058
if !proc.isDestinationAvailable(event.singularEvent, sourceId, event.eventParams.DestinationID) {
2059+
// REPORTING - DESTINATION_FILTER filtered metrics - START
2060+
if proc.isReportingEnabled() {
2061+
transformerEvent.StatusCode = reportingtypes.FilterEventCode
2062+
proc.updateMetricMaps(
2063+
nil,
2064+
nil,
2065+
connectionDetailsMap,
2066+
destFilterStatusDetailMap,
2067+
transformerEvent,
2068+
jobsdb.Filtered.State,
2069+
reportingtypes.DESTINATION_FILTER,
2070+
func() json.RawMessage {
2071+
return nil
2072+
},
2073+
nil,
2074+
)
2075+
transformerEvent.StatusCode = 0
2076+
}
2077+
// REPORTING - DESTINATION_FILTER filtered metrics - END
20662078
continue
20672079
}
20682080

@@ -2093,10 +2105,6 @@ func (proc *Handle) preprocessStage(partition string, subJobs subJob, delay time
20932105
shallowEventCopy.Metadata.MergedTpConfig = event.source.DgSourceTrackingPlanConfig.GetMergedConfig(commonMetadataFromSingularEvent.EventType)
20942106

20952107
groupedEventsBySourceId[SourceIDT(sourceId)] = append(groupedEventsBySourceId[SourceIDT(sourceId)], shallowEventCopy)
2096-
2097-
if proc.isReportingEnabled() {
2098-
proc.updateMetricMaps(inCountMetadataMap, outCountMap, connectionDetailsMap, destFilterStatusDetailMap, transformerEvent, jobsdb.Succeeded.State, reportingtypes.DESTINATION_FILTER, func() json.RawMessage { return nil }, nil)
2099-
}
21002108
}
21012109

21022110
if len(statusList) != len(jobList) {
@@ -2122,9 +2130,6 @@ func (proc *Handle) preprocessStage(partition string, subJobs subJob, delay time
21222130
eventBlockingStatusDetailsMap: eventBlockingStatusDetailsMap,
21232131
reportMetrics: reportMetrics,
21242132
destFilterStatusDetailMap: destFilterStatusDetailMap,
2125-
inCountMetadataMap: inCountMetadataMap,
2126-
inCountMap: inCountMap,
2127-
outCountMap: outCountMap,
21282133
totalEvents: totalEvents,
21292134
marshalStart: marshalStart,
21302135
groupedEventsBySourceId: groupedEventsBySourceId,
@@ -2222,21 +2227,6 @@ func (proc *Handle) pretransformStage(partition string, preTrans *preTransformat
22222227
preTrans.reportMetrics = append(preTrans.reportMetrics, destFilterMetric)
22232228
}
22242229
}
2225-
2226-
// empty failedCountMap because no failures,
2227-
// events are just dropped at this point if no destination is found to route the events
2228-
diffMetrics := getDiffMetrics(
2229-
reportingtypes.GATEWAY,
2230-
reportingtypes.DESTINATION_FILTER,
2231-
preTrans.inCountMetadataMap,
2232-
map[string]MetricMetadata{},
2233-
preTrans.inCountMap,
2234-
preTrans.outCountMap,
2235-
map[string]int64{},
2236-
map[string]int64{},
2237-
proc.statsFactory,
2238-
)
2239-
preTrans.reportMetrics = append(preTrans.reportMetrics, diffMetrics...)
22402230
}
22412231
// REPORTING - END
22422232

@@ -2960,18 +2950,15 @@ type destTransformOutput struct {
29602950

29612951
// userTransformAndFilterOutput holds the data passed between preprocessing and postprocessing steps
29622952
type userTransformAndFilterOutput struct {
2963-
eventsToTransform []types.TransformerEvent
2964-
commonMetaData *types.Metadata
2965-
reportMetrics []*reportingtypes.PUReportedMetric
2966-
procErrorJobsByDestID map[string][]*jobsdb.JobT
2967-
droppedJobs []*jobsdb.JobT
2968-
inCountMap map[string]int64
2969-
inCountMetadataMap map[string]MetricMetadata
2970-
successCountMetadataMap map[string]MetricMetadata
2971-
eventsByMessageID map[string]types.SingularEventWithReceivedAt
2972-
srcAndDestKey string
2973-
response types.Response
2974-
transformAt string
2953+
eventsToTransform []types.TransformerEvent
2954+
commonMetaData *types.Metadata
2955+
reportMetrics []*reportingtypes.PUReportedMetric
2956+
procErrorJobsByDestID map[string][]*jobsdb.JobT
2957+
droppedJobs []*jobsdb.JobT
2958+
eventsByMessageID map[string]types.SingularEventWithReceivedAt
2959+
srcAndDestKey string
2960+
response types.Response
2961+
transformAt string
29752962
}
29762963

29772964
func (proc *Handle) userTransformAndFilter(
@@ -3218,7 +3205,7 @@ func (proc *Handle) userTransformAndFilter(
32183205
// REPORTING - START
32193206
if proc.isReportingEnabled() {
32203207
diffMetrics := getDiffMetrics(
3221-
reportingtypes.DESTINATION_FILTER,
3208+
inPU,
32223209
reportingtypes.USER_TRANSFORMER,
32233210
inCountMetadataMap,
32243211
successCountMetadataMap,
@@ -3253,8 +3240,6 @@ func (proc *Handle) userTransformAndFilter(
32533240
reportMetrics: reportMetrics,
32543241
procErrorJobsByDestID: procErrorJobsByDestID,
32553242
droppedJobs: droppedJobs,
3256-
inCountMap: inCountMap,
3257-
inCountMetadataMap: inCountMetadataMap,
32583243
eventsByMessageID: eventsByMessageID,
32593244
srcAndDestKey: srcAndDestKey,
32603245
}
@@ -3305,21 +3290,9 @@ func (proc *Handle) userTransformAndFilter(
33053290

33063291
// REPORTING - START
33073292
if proc.isReportingEnabled() {
3308-
diffMetrics := getDiffMetrics(
3309-
inPU,
3310-
reportingtypes.EVENT_FILTER,
3311-
inCountMetadataMap,
3312-
successCountMetadataMap,
3313-
inCountMap,
3314-
successCountMap,
3315-
nonSuccessMetrics.failedCountMap,
3316-
nonSuccessMetrics.filteredCountMap,
3317-
proc.statsFactory,
3318-
)
33193293
reportMetrics = append(reportMetrics, successMetrics...)
33203294
reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...)
33213295
reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...)
3322-
reportMetrics = append(reportMetrics, diffMetrics...)
33233296

33243297
// successCountMap will be inCountMap for destination transform
33253298
inCountMap = successCountMap
@@ -3335,18 +3308,15 @@ func (proc *Handle) userTransformAndFilter(
33353308
// Filtering events based on the supported message types - END
33363309

33373310
return userTransformAndFilterOutput{
3338-
eventsToTransform: eventsToTransform,
3339-
commonMetaData: commonMetaData,
3340-
reportMetrics: reportMetrics,
3341-
procErrorJobsByDestID: procErrorJobsByDestID,
3342-
droppedJobs: droppedJobs,
3343-
inCountMap: inCountMap,
3344-
inCountMetadataMap: inCountMetadataMap,
3345-
eventsByMessageID: eventsByMessageID,
3346-
srcAndDestKey: srcAndDestKey,
3347-
response: response,
3348-
successCountMetadataMap: successCountMetadataMap,
3349-
transformAt: transformAt,
3311+
eventsToTransform: eventsToTransform,
3312+
commonMetaData: commonMetaData,
3313+
reportMetrics: reportMetrics,
3314+
procErrorJobsByDestID: procErrorJobsByDestID,
3315+
droppedJobs: droppedJobs,
3316+
eventsByMessageID: eventsByMessageID,
3317+
srcAndDestKey: srcAndDestKey,
3318+
response: response,
3319+
transformAt: transformAt,
33503320
}
33513321
}
33523322

@@ -3419,10 +3389,9 @@ func (proc *Handle) destTransform(
34193389
successMetrics := make([]*reportingtypes.PUReportedMetric, 0)
34203390
connectionDetailsMap := make(map[string]*reportingtypes.ConnectionDetails)
34213391
statusDetailsMap := make(map[string]map[string]*reportingtypes.StatusDetail)
3422-
successCountMap := make(map[string]int64)
34233392
for i := range response.Events {
34243393
// Update metrics maps
3425-
proc.updateMetricMaps(nil, successCountMap, connectionDetailsMap, statusDetailsMap, &response.Events[i], jobsdb.Succeeded.State, reportingtypes.DEST_TRANSFORMER, func() json.RawMessage { return nil }, nil)
3394+
proc.updateMetricMaps(nil, nil, connectionDetailsMap, statusDetailsMap, &response.Events[i], jobsdb.Succeeded.State, reportingtypes.DEST_TRANSFORMER, func() json.RawMessage { return nil }, nil)
34263395
}
34273396
reportingtypes.AssertSameKeys(connectionDetailsMap, statusDetailsMap)
34283397

@@ -3437,22 +3406,9 @@ func (proc *Handle) destTransform(
34373406
}
34383407
}
34393408

3440-
diffMetrics := getDiffMetrics(
3441-
reportingtypes.EVENT_FILTER,
3442-
reportingtypes.DEST_TRANSFORMER,
3443-
data.inCountMetadataMap,
3444-
data.successCountMetadataMap,
3445-
data.inCountMap,
3446-
successCountMap,
3447-
nonSuccessMetrics.failedCountMap,
3448-
nonSuccessMetrics.filteredCountMap,
3449-
proc.statsFactory,
3450-
)
3451-
34523409
data.reportMetrics = append(data.reportMetrics, nonSuccessMetrics.failedMetrics...)
34533410
data.reportMetrics = append(data.reportMetrics, nonSuccessMetrics.filteredMetrics...)
34543411
data.reportMetrics = append(data.reportMetrics, successMetrics...)
3455-
data.reportMetrics = append(data.reportMetrics, diffMetrics...)
34563412
}
34573413
// REPORTING - PROCESSOR metrics - END
34583414
})

0 commit comments

Comments
 (0)