Skip to content

Commit 15faf6b

Browse files
authored
fix: warehouse transformations SourceDefinitionType as empty (#5510)
1 parent a8b3a00 commit 15faf6b

File tree

6 files changed

+33
-17
lines changed

6 files changed

+33
-17
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ require (
5151
github.com/golang-migrate/migrate/v4 v4.18.2
5252
github.com/golang/mock v1.6.0
5353
github.com/gomodule/redigo v1.9.2
54+
github.com/google/go-cmp v0.6.0
5455
github.com/google/uuid v1.6.0
5556
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0
5657
github.com/hashicorp/go-retryablehttp v0.7.7
@@ -231,7 +232,6 @@ require (
231232
github.com/golang/protobuf v1.5.4 // indirect
232233
github.com/golang/snappy v0.0.4 // indirect
233234
github.com/google/flatbuffers v24.12.23+incompatible // indirect
234-
github.com/google/go-cmp v0.6.0 // indirect
235235
github.com/google/gofuzz v1.2.0 // indirect
236236
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
237237
github.com/google/s2a-go v0.1.8 // indirect

runner/buckets.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ var (
146146
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1,
147147
},
148148

149+
"warehouse_dest_transform_matched_events": {
150+
0.1, 0.25, 0.5, 1, 2.5, 5, 7.5, 10, 12.5, 15, 30, 60, 120, 300, 600, 900, 1800, 3600,
151+
},
149152
"warehouse_dest_transform_mismatched_events": {
150153
0.1, 0.25, 0.5, 1, 2.5, 5, 7.5, 10, 12.5, 15, 30, 60, 120, 300, 600, 900, 1800, 3600,
151154
},

warehouse/transformer/logger.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package transformer
22

33
import (
44
"fmt"
5-
"reflect"
65

76
"github.com/google/uuid"
87
"github.com/samber/lo"
@@ -15,6 +14,8 @@ import (
1514
ptrans "github.com/rudderlabs/rudder-server/processor/transformer"
1615
"github.com/rudderlabs/rudder-server/utils/misc"
1716
"github.com/rudderlabs/rudder-server/utils/types"
17+
18+
"github.com/google/go-cmp/cmp"
1819
)
1920

2021
func (t *Transformer) CompareAndLog(
@@ -36,7 +37,7 @@ func (t *Transformer) CompareAndLog(
3637

3738
t.stats.comparisionTime.RecordDuration()()
3839

39-
differingEvents := t.differingEvents(events, pResponse, wResponse, eventsByMessageID)
40+
differingEvents, sampleDiff := t.differingEvents(events, pResponse, wResponse, eventsByMessageID)
4041
if len(differingEvents) == 0 {
4142
return
4243
}
@@ -47,7 +48,7 @@ func (t *Transformer) CompareAndLog(
4748
Metadata: *metadata,
4849
})
4950
})
50-
if err := t.writeLogEntries(logEntries); err != nil {
51+
if err := t.write(append([]string{sampleDiff}, logEntries...)); err != nil {
5152
t.logger.Warnn("Error logging events", obskit.Error(err))
5253
return
5354
}
@@ -60,45 +61,50 @@ func (t *Transformer) differingEvents(
6061
eventsToTransform []ptrans.TransformerEvent,
6162
pResponse, wResponse ptrans.Response,
6263
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
63-
) []types.SingularEventT {
64+
) ([]types.SingularEventT, string) {
6465
// If the event counts differ, return all events in the transformation
6566
if len(pResponse.Events) != len(wResponse.Events) || len(pResponse.FailedEvents) != len(wResponse.FailedEvents) {
6667
events := lo.Map(eventsToTransform, func(e ptrans.TransformerEvent, _ int) types.SingularEventT {
6768
return eventsByMessageID[e.Metadata.MessageID].SingularEvent
6869
})
6970
t.stats.mismatchedEvents.Observe(float64(len(events)))
70-
return events
71+
return events, ""
7172
}
7273

7374
var (
7475
differedSampleEvents []types.SingularEventT
7576
differedEventsCount int
77+
sampleDiff string
7678
)
7779

7880
for i := range pResponse.Events {
79-
if reflect.DeepEqual(pResponse.Events[i], wResponse.Events[i]) {
81+
diff := cmp.Diff(wResponse.Events[i], pResponse.Events[i])
82+
if len(diff) == 0 {
8083
continue
8184
}
85+
8286
if differedEventsCount == 0 {
8387
// Collect the mismatched messages and break (sample only)
8488
differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.Events[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
8589
return eventsByMessageID[msgID].SingularEvent
8690
})...)
91+
sampleDiff = diff
8792
}
8893
differedEventsCount++
8994
}
95+
t.stats.matchedEvents.Observe(float64(len(pResponse.Events) - differedEventsCount))
9096
t.stats.mismatchedEvents.Observe(float64(differedEventsCount))
91-
return differedSampleEvents
97+
return differedSampleEvents, sampleDiff
9298
}
9399

94-
func (t *Transformer) writeLogEntries(entries []string) error {
100+
func (t *Transformer) write(data []string) error {
95101
writer, err := misc.CreateGZ(t.loggedFileName)
96102
if err != nil {
97103
return fmt.Errorf("creating buffered writer: %w", err)
98104
}
99105
defer func() { _ = writer.Close() }()
100106

101-
for _, entry := range entries {
107+
for _, entry := range data {
102108
if _, err := writer.Write([]byte(entry + "\n")); err != nil {
103109
return fmt.Errorf("writing log entry: %w", err)
104110
}
@@ -107,5 +113,5 @@ func (t *Transformer) writeLogEntries(entries []string) error {
107113
}
108114

109115
func generateLogFileName() string {
110-
return fmt.Sprintf("warehouse_transformations_debug_%s.log", uuid.NewString())
116+
return fmt.Sprintf("warehouse_transformations_debug_%s.log.gz", uuid.NewString())
111117
}

warehouse/transformer/transformer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func New(conf *config.Config, logger logger.Logger, statsFactory stats.Stats) *T
3737
loggedFileName: generateLogFileName(),
3838
}
3939

40+
t.stats.matchedEvents = t.statsFactory.NewStat("warehouse_dest_transform_matched_events", stats.HistogramType)
4041
t.stats.mismatchedEvents = t.statsFactory.NewStat("warehouse_dest_transform_mismatched_events", stats.HistogramType)
4142
t.stats.comparisionTime = t.statsFactory.NewStat("warehouse_dest_transform_comparison_time", stats.TimerType)
4243

@@ -80,6 +81,7 @@ func (t *Transformer) Transform(_ context.Context, clientEvents []ptrans.Transfo
8081
}
8182

8283
res.Events = append(res.Events, lo.Map(r, func(item map[string]any, index int) ptrans.TransformerResponse {
84+
clientEvent.Metadata.SourceDefinitionType = "" // TODO: Currently, it's getting ignored during JSON marshalling Remove this once we start using it.
8385
return ptrans.TransformerResponse{
8486
Output: item,
8587
Metadata: clientEvent.Metadata,

warehouse/transformer/transformer_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ func getTrackMetadata(destinationType, sourceCategory string) ptrans.Metadata {
626626
}
627627

628628
func TestTransformer_CompareAndLog(t *testing.T) {
629-
tmpFile, err := os.CreateTemp("", "transformer_compare_log.*.txt")
629+
tmpFile, err := os.CreateTemp("", "transformer_compare_log.*.txt.gz")
630630
require.NoError(t, err)
631631
require.NoError(t, tmpFile.Close())
632632

@@ -691,11 +691,12 @@ func TestTransformer_CompareAndLog(t *testing.T) {
691691
"event": "track" + strconv.Itoa(index+i+1),
692692
},
693693
Metadata: ptrans.Metadata{
694-
MessageID: strconv.Itoa(index + i + 1),
695-
SourceID: "sourceID",
696-
DestinationID: "destinationID",
697-
SourceType: "sourceType",
698-
DestinationType: "destinationType",
694+
MessageID: strconv.Itoa(index + i + 1),
695+
SourceID: "sourceID",
696+
DestinationID: "destinationID",
697+
SourceType: "sourceType",
698+
DestinationType: "destinationType",
699+
SourceDefinitionType: "sourceDefinitionType",
699700
},
700701
}
701702
}),
@@ -714,6 +715,9 @@ func TestTransformer_CompareAndLog(t *testing.T) {
714715
require.NoError(t, f.Close())
715716

716717
differingEvents := strings.Split(strings.Trim(string(data), "\n"), "\n")
718+
differingEvents = lo.Filter(differingEvents, func(item string, index int) bool {
719+
return strings.Contains(item, "message") // Filtering raw events as the file contains sample diff as well
720+
})
717721
require.Len(t, differingEvents, maxLoggedEvents)
718722

719723
for i := 0; i < maxLoggedEvents; i++ {

warehouse/transformer/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type (
2020

2121
stats struct {
2222
comparisionTime stats.Timer
23+
matchedEvents stats.Histogram
2324
mismatchedEvents stats.Histogram
2425
}
2526
config struct {

0 commit comments

Comments
 (0)