Skip to content

Commit c81001f

Browse files
authored
chore: upload embedded dt response difference samples to s3 (#5792)
1 parent 4656247 commit c81001f

File tree

5 files changed

+84
-49
lines changed

5 files changed

+84
-49
lines changed

processor/internal/transformer/destination_transformer/destination_transformer.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strconv"
1010
"strings"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
@@ -21,6 +22,7 @@ import (
2122
"github.com/samber/lo"
2223

2324
"github.com/rudderlabs/rudder-go-kit/config"
25+
"github.com/rudderlabs/rudder-go-kit/filemanager"
2426
"github.com/rudderlabs/rudder-go-kit/logger"
2527
"github.com/rudderlabs/rudder-go-kit/stats"
2628

@@ -72,15 +74,18 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
7274
handle.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.DestinationTransformer.maxRetryBackoffInterval", "Processor.maxRetryBackoffInterval")
7375
handle.config.batchSize = conf.GetReloadableIntVar(100, 1, "Processor.DestinationTransformer.batchSize", "Processor.transformBatchSize")
7476

75-
handle.config.maxLoggedEvents = conf.GetReloadableIntVar(10000, 1, "Processor.DestinationTransformer.maxLoggedEvents")
77+
handle.config.maxLoggedEvents = conf.GetReloadableIntVar(100, 1, "Processor.DestinationTransformer.maxLoggedEvents")
7678

7779
handle.stats.comparisonTime = handle.stat.NewStat("embedded_destination_transform_comparison_time", stats.TimerType)
7880
handle.stats.matchedEvents = handle.stat.NewStat("embedded_destination_transform_matched_events", stats.CountType)
7981
handle.stats.mismatchedEvents = handle.stat.NewStat("embedded_destination_transform_mismatched_events", stats.CountType)
8082

81-
handle.loggedEvents = 0
82-
handle.loggedEventsMu = sync.Mutex{}
83-
handle.loggedFileName = generateLogFileName()
83+
var err error
84+
handle.samplingFileManager, err = getSamplingUploader(conf, log)
85+
if err != nil {
86+
log.Errorn("failed to create dt sampling file manager", obskit.Error(err))
87+
handle.samplingFileManager = nil
88+
}
8489

8590
handle.config.compactionEnabled = conf.GetReloadableBoolVar(false, "Processor.DestinationTransformer.compactionEnabled", "Transformer.compactionEnabled")
8691

@@ -116,9 +121,8 @@ type Client struct {
116121
mismatchedEvents stats.Counter
117122
}
118123

119-
loggedEventsMu sync.Mutex
120-
loggedEvents int64
121-
loggedFileName string
124+
loggedEvents atomic.Int64
125+
samplingFileManager *filemanager.S3Manager
122126
}
123127

124128
func (d *Client) transform(ctx context.Context, clientEvents []types.TransformerEvent) types.Response {
@@ -391,7 +395,9 @@ func (c *Client) Transform(ctx context.Context, clientEvents []types.Transformer
391395
legacyTransformerResponse := c.transform(ctx, clientEvents)
392396
embeddedTransformerResponse := impl(ctx, clientEvents)
393397

394-
c.CompareAndLog(embeddedTransformerResponse, legacyTransformerResponse)
398+
go func() {
399+
c.CompareAndLog(ctx, embeddedTransformerResponse, legacyTransformerResponse)
400+
}()
395401

396402
return legacyTransformerResponse
397403
}
@@ -429,3 +435,32 @@ func (d *Client) getRequestPayload(data []types.TransformerEvent, compactRequest
429435
}
430436
return jsonrs.Marshal(data)
431437
}
438+
439+
func getSamplingUploader(conf *config.Config, log logger.Logger) (*filemanager.S3Manager, error) {
440+
var (
441+
bucket = conf.GetString("DTSampling.Bucket", "processor-dt-sampling")
442+
endpoint = conf.GetString("DTSampling.Endpoint", "")
443+
accessKeyID = conf.GetStringVar("", "DTSampling.AccessKeyId", "AWS_ACCESS_KEY_ID")
444+
accessKey = conf.GetStringVar("", "DTSampling.AccessKey", "AWS_SECRET_ACCESS_KEY")
445+
s3ForcePathStyle = conf.GetBool("DTSampling.S3ForcePathStyle", false)
446+
disableSSL = conf.GetBool("DTSampling.DisableSsl", false)
447+
enableSSE = conf.GetBoolVar(false, "DTSampling.EnableSse", "AWS_ENABLE_SSE")
448+
useGlue = conf.GetBool("DTSampling.UseGlue", false)
449+
region = conf.GetStringVar("us-east-1", "DTSampling.Region", "AWS_DEFAULT_REGION")
450+
)
451+
s3Config := map[string]any{
452+
"bucketName": bucket,
453+
"endpoint": endpoint,
454+
"accessKeyID": accessKeyID,
455+
"accessKey": accessKey,
456+
"s3ForcePathStyle": s3ForcePathStyle,
457+
"disableSSL": disableSSL,
458+
"enableSSE": enableSSE,
459+
"useGlue": useGlue,
460+
"region": region,
461+
}
462+
463+
return filemanager.NewS3Manager(s3Config, log.Withn(logger.NewStringField("component", "dt-uploader")), func() time.Duration {
464+
return conf.GetDuration("DTSampling.Timeout", 120, time.Second)
465+
})
466+
}

processor/internal/transformer/destination_transformer/embedded/pubsub/pubsub.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ func getTopic(event types.TransformerEvent, topicMap map[string]string) (string,
110110
return topic, nil
111111
}
112112

113-
return "", fmt.Errorf("no topic set for this event")
113+
// Capital "No" needed for mirroring/comparison; re-enable lint after mirroring ends.
114+
//nolint:staticcheck
115+
return "", fmt.Errorf("No topic set for this event")
114116
}
115117

116118
func getAttributeKeysFromEvent(event types.TransformerEvent, attributesMap map[string][]string) []string {

processor/internal/transformer/destination_transformer/embedded/pubsub/pubsub_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ func TestTransform(t *testing.T) {
370370
want: types.Response{
371371
FailedEvents: []types.TransformerResponse{
372372
{
373-
Error: "no topic set for this event",
373+
Error: "No topic set for this event",
374374
Metadata: types.Metadata{},
375375
StatusCode: http.StatusBadRequest,
376376
StatTags: destinationStatTags,
@@ -416,7 +416,7 @@ func TestTransform(t *testing.T) {
416416
want: types.Response{
417417
FailedEvents: []types.TransformerResponse{
418418
{
419-
Error: "no topic set for this event",
419+
Error: "No topic set for this event",
420420
Metadata: types.Metadata{},
421421
StatusCode: http.StatusBadRequest,
422422
StatTags: destinationStatTags,

processor/internal/transformer/destination_transformer/logger.go

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,65 @@
11
package destination_transformer
22

33
import (
4+
"bytes"
5+
"context"
46
"fmt"
7+
"path"
58

69
"github.com/google/go-cmp/cmp"
710
"github.com/google/uuid"
8-
"github.com/samber/lo"
911

12+
"github.com/rudderlabs/rudder-go-kit/config"
13+
"github.com/rudderlabs/rudder-go-kit/logger"
1014
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
1115

12-
"github.com/rudderlabs/rudder-go-kit/stringify"
13-
16+
"github.com/rudderlabs/rudder-server/jsonrs"
1417
"github.com/rudderlabs/rudder-server/processor/types"
15-
"github.com/rudderlabs/rudder-server/utils/misc"
1618
)
1719

1820
func (c *Client) CompareAndLog(
21+
ctx context.Context,
1922
embeddedResponse, legacyResponse types.Response,
2023
) {
21-
c.loggedEventsMu.Lock()
22-
defer c.loggedEventsMu.Unlock()
24+
if c.samplingFileManager == nil { // Cannot upload, we should just report the issue with no diff
25+
c.log.Warnn("DestinationTransformer sanity check failed")
26+
return
27+
}
2328

24-
if c.loggedEvents >= int64(c.config.maxLoggedEvents.Load()) {
29+
if c.loggedEvents.Load() >= int64(c.config.maxLoggedEvents.Load()) {
2530
return
2631
}
2732

28-
c.stats.comparisonTime.RecordDuration()()
33+
defer c.stats.comparisonTime.RecordDuration()()
2934

3035
differingResponse, sampleDiff := c.differingEvents(embeddedResponse, legacyResponse)
31-
if len(differingResponse) == 0 && sampleDiff == "" {
32-
c.log.Infof("Embedded and legacy responses are matches")
36+
noOfDifferences := int64(len(differingResponse))
37+
if noOfDifferences == 0 && sampleDiff == "" {
38+
return
39+
}
40+
41+
c.loggedEvents.Add(noOfDifferences)
42+
43+
objName := path.Join("embedded-dt-samples", config.GetKubeNamespace(), uuid.New().String())
44+
differingResponseJSON, err := jsonrs.Marshal(differingResponse)
45+
if err != nil {
46+
c.loggedEvents.Add(-noOfDifferences)
47+
c.log.Errorn("DestinationTransformer sanity check failed (cannot encode differingResponse)", obskit.Error(err))
3348
return
3449
}
3550

36-
logEntries := lo.Map(differingResponse, func(item types.TransformerResponse, index int) string {
37-
return stringify.Any(item)
38-
})
39-
if err := c.write(append([]string{sampleDiff}, logEntries...)); err != nil {
40-
c.log.Warnn("Error logging events", obskit.Error(err))
51+
// upload sample diff and differing response to s3
52+
file, err := c.samplingFileManager.UploadReader(ctx, objName, bytes.NewReader(append([]byte(sampleDiff), differingResponseJSON...)))
53+
if err != nil {
54+
c.loggedEvents.Add(-noOfDifferences)
55+
c.log.Errorn("Error uploading DestinationTransformer sanity check diff file", obskit.Error(err))
4156
return
4257
}
4358

44-
c.log.Infof("Successfully logged events: %d", len(logEntries))
45-
c.loggedEvents += int64(len(logEntries))
59+
c.log.Warnn("DestinationTransformer sanity check failed",
60+
logger.NewStringField("location", file.Location),
61+
logger.NewStringField("objectName", file.ObjectName),
62+
)
4663
}
4764

4865
func (c *Client) differingEvents(
@@ -95,22 +112,3 @@ func (c *Client) differingEvents(
95112
c.stats.mismatchedEvents.Count(differedEventsCount)
96113
return differedSampleEvents, sampleDiff
97114
}
98-
99-
func (c *Client) write(data []string) error {
100-
writer, err := misc.CreateGZ(c.loggedFileName)
101-
if err != nil {
102-
return fmt.Errorf("creating buffered writer: %w", err)
103-
}
104-
defer func() { _ = writer.Close() }()
105-
106-
for _, entry := range data {
107-
if _, err := writer.Write([]byte(entry + "\n")); err != nil {
108-
return fmt.Errorf("writing log entry: %w", err)
109-
}
110-
}
111-
return nil
112-
}
113-
114-
func generateLogFileName() string {
115-
return fmt.Sprintf("destination_transformations_debug_%s.log.gz", uuid.NewString())
116-
}

processor/processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2939,7 +2939,7 @@ func (proc *Handle) userTransformAndFilter(
29392939
}
29402940

29412941
if proc.utSamplingFileManager == nil { // Cannot upload, we should just report the issue with no diff
2942-
log.Errorn("UserTransform sanity check failed")
2942+
log.Warnn("UserTransform sanity check failed")
29432943
return
29442944
}
29452945

@@ -2969,7 +2969,7 @@ func (proc *Handle) userTransformAndFilter(
29692969
return
29702970
}
29712971

2972-
log.Errorn("UserTransform sanity check failed",
2972+
log.Warnn("UserTransform sanity check failed",
29732973
logger.NewStringField("diffLocation", diffFile.Location),
29742974
logger.NewStringField("diffObjectName", diffFile.ObjectName),
29752975
logger.NewStringField("clientEventsLocation", clientEventsFile.Location),

0 commit comments

Comments
 (0)