diff --git a/processor/internal/transformer/destination_transformer/destination_transformer.go b/processor/internal/transformer/destination_transformer/destination_transformer.go index 2c209ffc55..a25cac172a 100644 --- a/processor/internal/transformer/destination_transformer/destination_transformer.go +++ b/processor/internal/transformer/destination_transformer/destination_transformer.go @@ -9,27 +9,26 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - "github.com/rudderlabs/rudder-server/jsonrs" - transformerfs "github.com/rudderlabs/rudder-server/services/transformer" - - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - "github.com/cenkalti/backoff" "github.com/samber/lo" "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" - + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client" + "github.com/rudderlabs/rudder-server/jsonrs" "github.com/rudderlabs/rudder-server/processor/integrations" transformerutils "github.com/rudderlabs/rudder-server/processor/internal/transformer" "github.com/rudderlabs/rudder-server/processor/internal/transformer/destination_transformer/embedded/kafka" "github.com/rudderlabs/rudder-server/processor/internal/transformer/destination_transformer/embedded/pubsub" "github.com/rudderlabs/rudder-server/processor/types" + transformerfs "github.com/rudderlabs/rudder-server/services/transformer" "github.com/rudderlabs/rudder-server/utils/httputil" reportingtypes "github.com/rudderlabs/rudder-server/utils/types" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -78,9 +77,12 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt) handle.stats.matchedEvents = handle.stat.NewStat("embedded_destination_transform_matched_events", stats.CountType) handle.stats.mismatchedEvents = handle.stat.NewStat("embedded_destination_transform_mismatched_events", stats.CountType) - handle.loggedEvents = 0 - handle.loggedEventsMu = sync.Mutex{} - handle.loggedFileName = generateLogFileName() + var err error + handle.samplingFileManager, err = getSamplingUploader(conf, log) + if err != nil { + log.Errorn("failed to create dt sampling file manager", obskit.Error(err)) + handle.samplingFileManager = nil + } handle.config.compactionEnabled = conf.GetReloadableBoolVar(false, "Processor.DestinationTransformer.compactionEnabled", "Transformer.compactionEnabled") @@ -116,9 +118,8 @@ type Client struct { mismatchedEvents stats.Counter } - loggedEventsMu sync.Mutex - loggedEvents int64 - loggedFileName string + loggedEvents atomic.Int64 + samplingFileManager *filemanager.S3Manager } func (d *Client) transform(ctx context.Context, clientEvents []types.TransformerEvent) types.Response { @@ -391,7 +392,9 @@ func (c *Client) Transform(ctx context.Context, clientEvents []types.Transformer legacyTransformerResponse := c.transform(ctx, clientEvents) embeddedTransformerResponse := impl(ctx, clientEvents) - c.CompareAndLog(embeddedTransformerResponse, legacyTransformerResponse) + go func() { + c.CompareAndLog(ctx, embeddedTransformerResponse, legacyTransformerResponse) + }() return legacyTransformerResponse } @@ -429,3 +432,32 @@ func (d *Client) getRequestPayload(data []types.TransformerEvent, compactRequest } return jsonrs.Marshal(data) } + +func getSamplingUploader(conf *config.Config, log logger.Logger) (*filemanager.S3Manager, error) { + var ( + bucket = conf.GetString("DTSampling.Bucket", "processor-dt-sampling") + endpoint = conf.GetString("DTSampling.Endpoint", "") + accessKeyID = conf.GetStringVar("", "DTSampling.AccessKeyId", "AWS_ACCESS_KEY_ID") + accessKey = conf.GetStringVar("", "DTSampling.AccessKey", "AWS_SECRET_ACCESS_KEY") + s3ForcePathStyle = conf.GetBool("DTSampling.S3ForcePathStyle", false) + disableSSL = conf.GetBool("DTSampling.DisableSsl", false) + enableSSE = conf.GetBoolVar(false, "DTSampling.EnableSse", "AWS_ENABLE_SSE") + useGlue = conf.GetBool("DTSampling.UseGlue", false) + region = conf.GetStringVar("us-east-1", "DTSampling.Region", "AWS_DEFAULT_REGION") + ) + s3Config := map[string]any{ + "bucketName": bucket, + "endpoint": endpoint, + "accessKeyID": accessKeyID, + "accessKey": accessKey, + "s3ForcePathStyle": s3ForcePathStyle, + "disableSSL": disableSSL, + "enableSSE": enableSSE, + "useGlue": useGlue, + "region": region, + } + + return filemanager.NewS3Manager(s3Config, log.Withn(logger.NewStringField("component", "dt-uploader")), func() time.Duration { + return conf.GetDuration("DTSampling.Timeout", 120, time.Second) + }) +} diff --git a/processor/internal/transformer/destination_transformer/logger.go b/processor/internal/transformer/destination_transformer/logger.go index 0ec0928fcd..88654a729a 100644 --- a/processor/internal/transformer/destination_transformer/logger.go +++ b/processor/internal/transformer/destination_transformer/logger.go @@ -1,27 +1,32 @@ package destination_transformer import ( + "bytes" + "context" "fmt" + "path" "github.com/google/go-cmp/cmp" "github.com/google/uuid" - "github.com/samber/lo" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - "github.com/rudderlabs/rudder-go-kit/stringify" - + "github.com/rudderlabs/rudder-server/jsonrs" "github.com/rudderlabs/rudder-server/processor/types" - "github.com/rudderlabs/rudder-server/utils/misc" ) func (c *Client) CompareAndLog( + ctx context.Context, embeddedResponse, legacyResponse types.Response, ) { - c.loggedEventsMu.Lock() - defer c.loggedEventsMu.Unlock() + if c.samplingFileManager == nil { // Cannot upload, we should just report the issue with no diff + c.log.Errorn("DestinationTransformer sanity check failed") + return + } - if c.loggedEvents >= int64(c.config.maxLoggedEvents.Load()) { + if c.loggedEvents.Load() >= int64(c.config.maxLoggedEvents.Load()) { return } @@ -29,20 +34,29 @@ func (c *Client) CompareAndLog( differingResponse, sampleDiff := c.differingEvents(embeddedResponse, legacyResponse) if len(differingResponse) == 0 && sampleDiff == "" { - c.log.Infof("Embedded and legacy responses are matches") return } - logEntries := lo.Map(differingResponse, func(item types.TransformerResponse, index int) string { - return stringify.Any(item) - }) - if err := c.write(append([]string{sampleDiff}, logEntries...)); err != nil { - c.log.Warnn("Error logging events", obskit.Error(err)) + objName := path.Join("embedded-dt-samples", config.GetKubeNamespace(), uuid.New().String()) + differingResponseJSON, err := jsonrs.Marshal(differingResponse) + if err != nil { + c.log.Errorn("DestinationTransformer sanity check failed (cannot encode differingResponse)", obskit.Error(err)) + return + } + + // upload sample diff and differing response to s3 + file, err := c.samplingFileManager.UploadReader(ctx, objName, bytes.NewReader(append([]byte(sampleDiff), differingResponseJSON...))) + if err != nil { + c.log.Errorn("Error uploading DestinationTransformer sanity check diff file", obskit.Error(err)) return } - c.log.Infof("Successfully logged events: %d", len(logEntries)) - c.loggedEvents += int64(len(logEntries)) + c.log.Errorn("DestinationTransformer sanity check failed", + logger.NewStringField("location", file.Location), + logger.NewStringField("objectName", file.ObjectName), + ) + + c.loggedEvents.And(int64(len(differingResponse))) } func (c *Client) differingEvents( @@ -95,22 +109,3 @@ func (c *Client) differingEvents( c.stats.mismatchedEvents.Count(differedEventsCount) return differedSampleEvents, sampleDiff } - -func (c *Client) write(data []string) error { - writer, err := misc.CreateGZ(c.loggedFileName) - if err != nil { - return fmt.Errorf("creating buffered writer: %w", err) - } - defer func() { _ = writer.Close() }() - - for _, entry := range data { - if _, err := writer.Write([]byte(entry + "\n")); err != nil { - return fmt.Errorf("writing log entry: %w", err) - } - } - return nil -} - -func generateLogFileName() string { - return fmt.Sprintf("destination_transformations_debug_%s.log.gz", uuid.NewString()) -}