Skip to content

Commit 0b19968

Browse files
authored
fix: embedded transformations upload (#5848)
1 parent 1b4ea06 commit 0b19968

File tree

5 files changed

+129
-100
lines changed

5 files changed

+129
-100
lines changed

processor/internal/transformer/destination_transformer/destination_transformer.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -399,12 +399,10 @@ func (c *Client) Transform(ctx context.Context, clientEvents []types.Transformer
399399
}
400400

401401
destType := clientEvents[0].Destination.DestinationDefinition.Name
402-
if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok && c.config.warehouseTransformations.enable.Load() {
402+
if c.canRunWarehouseTransformations(destType) {
403403
if c.config.warehouseTransformations.verify.Load() {
404404
legacyResponse := c.transform(ctx, clientEvents)
405-
go func() {
406-
c.warehouseClient.CompareResponsesAndUpload(ctx, clientEvents, legacyResponse)
407-
}()
405+
c.warehouseClient.CompareResponsesAndUpload(ctx, clientEvents, legacyResponse)
408406
return legacyResponse
409407
}
410408
return c.warehouseClient.Transform(ctx, clientEvents)
@@ -420,16 +418,22 @@ func (c *Client) Transform(ctx context.Context, clientEvents []types.Transformer
420418
if c.conf.GetBoolVar(true, "Processor.Transformer.Embedded."+destType+".Verify") {
421419
legacyTransformerResponse := c.transform(ctx, clientEvents)
422420
embeddedTransformerResponse := impl(ctx, clientEvents)
423-
424-
go func() {
425-
c.CompareAndLog(ctx, embeddedTransformerResponse, legacyTransformerResponse)
426-
}()
427-
421+
c.CompareAndLog(ctx, embeddedTransformerResponse, legacyTransformerResponse)
428422
return legacyTransformerResponse
429423
}
430424
return impl(ctx, clientEvents)
431425
}
432426

427+
func (c *Client) canRunWarehouseTransformations(destType string) bool {
428+
if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
429+
return c.config.warehouseTransformations.enable.Load()
430+
}
431+
if destType == warehouseutils.SnowpipeStreaming {
432+
return c.config.warehouseTransformations.enable.Load()
433+
}
434+
return false
435+
}
436+
433437
func (d *Client) compactRequestPayloads() bool {
434438
return (d.config.compactionSupported && d.config.compactionEnabled.Load())
435439
}

processor/internal/transformer/destination_transformer/embedded/warehouse/transformer_test.go

Lines changed: 0 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
package warehouse_test
22

33
import (
4-
"context"
54
"fmt"
65
"net/http"
7-
"strconv"
8-
"strings"
96
"testing"
107

118
"github.com/ory/dockertest/v3"
129
"github.com/samber/lo"
1310
"github.com/stretchr/testify/require"
1411

15-
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
16-
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
17-
1812
"github.com/rudderlabs/rudder-go-kit/config"
1913
"github.com/rudderlabs/rudder-go-kit/logger"
2014
"github.com/rudderlabs/rudder-go-kit/stats"
@@ -673,83 +667,3 @@ func getTrackMetadata(destinationType, sourceCategory string) types.Metadata {
673667
metadata.SourceCategory = sourceCategory
674668
return metadata
675669
}
676-
677-
func TestTransformer_CompareResponsesAndUpload(t *testing.T) {
678-
pool, err := dockertest.NewPool("")
679-
require.NoError(t, err)
680-
minioResource, err := minio.Setup(pool, t)
681-
require.NoError(t, err)
682-
683-
ctx := context.Background()
684-
maxLoggedEvents := 10
685-
686-
c := config.New()
687-
c.Set("Warehouse.Transformer.Sampling.maxLoggedEvents", maxLoggedEvents)
688-
c.Set("Warehouse.Transformer.Sampling.Bucket", minioResource.BucketName)
689-
c.Set("Warehouse.Transformer.Sampling.Endpoint", minioResource.Endpoint)
690-
c.Set("Warehouse.Transformer.Sampling.AccessKey", minioResource.AccessKeyID)
691-
c.Set("Warehouse.Transformer.Sampling.SecretAccessKey", minioResource.AccessKeySecret)
692-
c.Set("Warehouse.Transformer.Sampling.S3ForcePathStyle", true)
693-
c.Set("Warehouse.Transformer.Sampling.DisableSsl", true)
694-
695-
statsStore, err := memstats.New()
696-
require.NoError(t, err)
697-
698-
trans := warehouse.New(c, logger.NOP, statsStore)
699-
700-
eventsByMessageID := make(map[string]types.SingularEventWithReceivedAt, 50)
701-
for index := 0; index < 50; index++ {
702-
eventsByMessageID[strconv.Itoa(index)] = types.SingularEventWithReceivedAt{
703-
SingularEvent: map[string]interface{}{
704-
"event": "track" + strconv.Itoa(index),
705-
},
706-
}
707-
}
708-
709-
events := []types.TransformerEvent{
710-
{
711-
Message: types.SingularEventT{
712-
"event": "track",
713-
"context": "context",
714-
"properties": "properties",
715-
},
716-
},
717-
}
718-
719-
for i := 0; i < 1000; i++ {
720-
legacyResponse := types.Response{
721-
Events: []types.TransformerResponse{
722-
{
723-
Output: types.SingularEventT{
724-
"event": "track",
725-
},
726-
Metadata: types.Metadata{
727-
MessageID: "messageID",
728-
SourceID: "sourceID",
729-
DestinationID: "destinationID",
730-
SourceType: "sourceType",
731-
DestinationType: "destinationType",
732-
},
733-
},
734-
},
735-
}
736-
trans.CompareResponsesAndUpload(ctx, events, legacyResponse)
737-
}
738-
739-
minioContents, err := minioResource.Contents(ctx, "")
740-
require.NoError(t, err)
741-
require.Len(t, minioContents, maxLoggedEvents)
742-
743-
differingEvents := lo.Map(minioContents, func(item minio.File, index int) string {
744-
return item.Content
745-
})
746-
differingEvents = lo.Filter(differingEvents, func(item string, index int) bool {
747-
return strings.Contains(item, "message") // Filtering raw events as the file contains sample diff as well
748-
})
749-
require.Len(t, differingEvents, maxLoggedEvents)
750-
751-
for i := 0; i < maxLoggedEvents; i++ {
752-
require.Contains(t, differingEvents[i], "track")
753-
}
754-
require.EqualValues(t, []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, statsStore.Get("warehouse_dest_transform_mismatched_events", stats.Tags{}).Values())
755-
}

processor/internal/transformer/destination_transformer/embedded/warehouse/uploader.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@ import (
1717

1818
"github.com/rudderlabs/rudder-go-kit/config"
1919
"github.com/rudderlabs/rudder-go-kit/filemanager"
20-
20+
"github.com/rudderlabs/rudder-go-kit/stringify"
2121
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
2222

2323
"github.com/rudderlabs/rudder-go-kit/logger"
24-
"github.com/rudderlabs/rudder-go-kit/stringify"
2524

2625
wtypes "github.com/rudderlabs/rudder-server/processor/internal/transformer/destination_transformer/embedded/warehouse/internal/types"
2726
"github.com/rudderlabs/rudder-server/processor/types"
@@ -36,6 +35,12 @@ func (t *Transformer) CompareResponsesAndUpload(ctx context.Context, events []ty
3635
}
3736
defer t.stats.comparisonTime.RecordDuration()()
3837

38+
go func() {
39+
t.compareResponsesAndUpload(ctx, events, legacyResponse)
40+
}()
41+
}
42+
43+
func (t *Transformer) compareResponsesAndUpload(ctx context.Context, events []types.TransformerEvent, legacyResponse types.Response) {
3944
sampleDiff := t.sampleDiff(events, legacyResponse, t.Transform(ctx, events))
4045
if len(sampleDiff) == 0 {
4146
return
@@ -54,7 +59,7 @@ func (t *Transformer) CompareResponsesAndUpload(ctx context.Context, events []ty
5459
objName := path.Join("embedded-wt-samples", t.config.instanceID, uuid.NewString()) + ".log.gz"
5560
uploadFile, err := t.loggedSamplesUploader.UploadReader(ctx, objName, &b)
5661
if err != nil {
57-
t.logger.Warn("Unable to upload sample diff", obskit.Error(err))
62+
t.logger.Warnn("Unable to upload sample diff", obskit.Error(err))
5863
return
5964
}
6065

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package warehouse
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"strings"
7+
"testing"
8+
9+
"github.com/ory/dockertest/v3"
10+
"github.com/samber/lo"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/rudderlabs/rudder-go-kit/config"
14+
"github.com/rudderlabs/rudder-go-kit/logger"
15+
"github.com/rudderlabs/rudder-go-kit/stats"
16+
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
17+
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
18+
19+
"github.com/rudderlabs/rudder-server/processor/types"
20+
)
21+
22+
func TestTransformer_CompareResponsesAndUpload(t *testing.T) {
23+
pool, err := dockertest.NewPool("")
24+
require.NoError(t, err)
25+
minioResource, err := minio.Setup(pool, t)
26+
require.NoError(t, err)
27+
28+
ctx := context.Background()
29+
maxLoggedEvents := 10
30+
31+
c := config.New()
32+
c.Set("Warehouse.Transformer.Sampling.maxLoggedEvents", maxLoggedEvents)
33+
c.Set("Warehouse.Transformer.Sampling.Bucket", minioResource.BucketName)
34+
c.Set("Warehouse.Transformer.Sampling.Endpoint", minioResource.Endpoint)
35+
c.Set("Warehouse.Transformer.Sampling.AccessKey", minioResource.AccessKeyID)
36+
c.Set("Warehouse.Transformer.Sampling.SecretAccessKey", minioResource.AccessKeySecret)
37+
c.Set("Warehouse.Transformer.Sampling.S3ForcePathStyle", true)
38+
c.Set("Warehouse.Transformer.Sampling.DisableSsl", true)
39+
40+
statsStore, err := memstats.New()
41+
require.NoError(t, err)
42+
43+
trans := New(c, logger.NOP, statsStore)
44+
45+
eventsByMessageID := make(map[string]types.SingularEventWithReceivedAt, 50)
46+
for index := 0; index < 50; index++ {
47+
eventsByMessageID[strconv.Itoa(index)] = types.SingularEventWithReceivedAt{
48+
SingularEvent: map[string]interface{}{
49+
"event": "track" + strconv.Itoa(index),
50+
},
51+
}
52+
}
53+
54+
events := []types.TransformerEvent{
55+
{
56+
Message: types.SingularEventT{
57+
"event": "track",
58+
"context": "context",
59+
"properties": "properties",
60+
},
61+
},
62+
}
63+
64+
for i := 0; i < maxLoggedEvents; i++ {
65+
legacyResponse := types.Response{
66+
Events: []types.TransformerResponse{
67+
{
68+
Output: types.SingularEventT{
69+
"event": "track",
70+
},
71+
Metadata: types.Metadata{
72+
MessageID: "messageID",
73+
SourceID: "sourceID",
74+
DestinationID: "destinationID",
75+
SourceType: "sourceType",
76+
DestinationType: "destinationType",
77+
},
78+
},
79+
},
80+
}
81+
trans.compareResponsesAndUpload(ctx, events, legacyResponse)
82+
}
83+
84+
minioContents, err := minioResource.Contents(ctx, "")
85+
require.NoError(t, err)
86+
require.Len(t, minioContents, maxLoggedEvents)
87+
88+
differingEvents := lo.Map(minioContents, func(item minio.File, index int) string {
89+
return item.Content
90+
})
91+
differingEvents = lo.Filter(differingEvents, func(item string, index int) bool {
92+
return strings.Contains(item, "message") // Filtering raw events as the file contains sample diff as well
93+
})
94+
require.Len(t, differingEvents, maxLoggedEvents)
95+
96+
for i := 0; i < maxLoggedEvents; i++ {
97+
require.Contains(t, differingEvents[i], "track")
98+
}
99+
require.EqualValues(t, []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, statsStore.Get("warehouse_dest_transform_mismatched_events", stats.Tags{}).Values())
100+
}

processor/internal/transformer/destination_transformer/logger.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,19 @@ func (c *Client) CompareAndLog(
2525
c.log.Warnn("DestinationTransformer sanity check failed")
2626
return
2727
}
28-
2928
if c.loggedEvents.Load() >= int64(c.config.maxLoggedEvents.Load()) {
3029
return
3130
}
32-
3331
defer c.stats.comparisonTime.RecordDuration()()
32+
go func() {
33+
c.compareAndLog(ctx, embeddedResponse, legacyResponse)
34+
}()
35+
}
3436

37+
func (c *Client) compareAndLog(
38+
ctx context.Context,
39+
embeddedResponse, legacyResponse types.Response,
40+
) {
3541
differingResponse, sampleDiff := c.differingEvents(embeddedResponse, legacyResponse)
3642
noOfDifferences := int64(len(differingResponse))
3743
if noOfDifferences == 0 && sampleDiff == "" {

0 commit comments

Comments
 (0)