Skip to content

Commit 3db39a1

Browse files
authored
feat: warehouse transformer (#5205)
1 parent 93f30e1 commit 3db39a1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+13331
-12
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ require (
4242
github.com/databricks/databricks-sql-go v1.6.1
4343
github.com/denisenkom/go-mssqldb v0.12.3
4444
github.com/dgraph-io/badger/v4 v4.5.0
45+
github.com/dlclark/regexp2 v1.11.4
4546
github.com/docker/docker v27.5.0+incompatible
4647
github.com/go-chi/chi/v5 v5.2.0
4748
github.com/go-redis/redis v6.15.9+incompatible
@@ -193,7 +194,6 @@ require (
193194
github.com/danieljoos/wincred v1.2.2 // indirect
194195
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
195196
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
196-
github.com/dlclark/regexp2 v1.11.4 // indirect
197197
github.com/dnephin/pflag v1.0.7 // indirect
198198
github.com/docker/cli v27.2.1+incompatible // indirect
199199
github.com/docker/cli-docs-tool v0.8.0 // indirect

processor/processor.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/google/uuid"
1616

1717
"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
18+
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
1819

1920
"golang.org/x/sync/errgroup"
2021

@@ -57,6 +58,7 @@ import (
5758
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
5859
"github.com/rudderlabs/rudder-server/utils/types"
5960
"github.com/rudderlabs/rudder-server/utils/workerpool"
61+
wtrans "github.com/rudderlabs/rudder-server/warehouse/transformer"
6062
)
6163

6264
const (
@@ -84,12 +86,18 @@ type trackedUsersReporter interface {
8486
GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourceIdFilter map[string]bool) []*trackedusers.UsersReport
8587
}
8688

89+
type warehouseTransformer interface {
90+
transformer.DestinationTransformer
91+
CompareAndLog(events []transformer.TransformerEvent, pResponse, wResponse transformer.Response, metadata *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt)
92+
}
93+
8794
// Handle is a handle to the processor module
8895
type Handle struct {
89-
conf *config.Config
90-
tracer stats.Tracer
91-
backendConfig backendconfig.BackendConfig
92-
transformer transformer.Transformer
96+
conf *config.Config
97+
tracer stats.Tracer
98+
backendConfig backendconfig.BackendConfig
99+
transformer transformer.Transformer
100+
warehouseTransformer warehouseTransformer
93101

94102
gatewayDB jobsdb.JobsDB
95103
routerDB jobsdb.JobsDB
@@ -158,6 +166,7 @@ type Handle struct {
158166
eventAuditEnabled map[string]bool
159167
credentialsMap map[string][]transformer.Credential
160168
nonEventStreamSources map[string]bool
169+
enableWarehouseTransformations config.ValueLoader[bool]
161170
}
162171

163172
drainConfig struct {
@@ -617,6 +626,9 @@ func (proc *Handle) Setup(
617626
"partition": partition,
618627
})
619628
}
629+
630+
proc.warehouseTransformer = wtrans.New(proc.conf, proc.logger, proc.statsFactory)
631+
620632
if proc.config.enableDedup {
621633
var err error
622634
proc.dedup, err = dedup.New(proc.conf, proc.statsFactory)
@@ -817,6 +829,7 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv
817829
proc.config.archivalEnabled = config.GetReloadableBoolVar(true, "archival.Enabled")
818830
// Capture event name as a tag in event level stats
819831
proc.config.captureEventNameStats = config.GetReloadableBoolVar(false, "Processor.Stats.captureEventName")
832+
proc.config.enableWarehouseTransformations = config.GetReloadableBoolVar(false, "Processor.enableWarehouseTransformations")
820833
}
821834

822835
type connection struct {
@@ -2902,6 +2915,7 @@ func (proc *Handle) transformSrcDest(
29022915
proc.logger.Debug("Dest Transform input size", len(eventsToTransform))
29032916
s := time.Now()
29042917
response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
2918+
proc.handleWarehouseTransformations(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID)
29052919

29062920
destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination)
29072921
destTransformationStat.transformTime.Since(s)
@@ -3060,6 +3074,27 @@ func (proc *Handle) transformSrcDest(
30603074
}
30613075
}
30623076

3077+
func (proc *Handle) handleWarehouseTransformations(
3078+
ctx context.Context,
3079+
eventsToTransform []transformer.TransformerEvent,
3080+
pResponse transformer.Response,
3081+
commonMetaData *transformer.Metadata,
3082+
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
3083+
) {
3084+
if len(eventsToTransform) == 0 {
3085+
return
3086+
}
3087+
if _, ok := whutils.WarehouseDestinationMap[commonMetaData.DestinationType]; !ok {
3088+
return
3089+
}
3090+
if !proc.config.enableWarehouseTransformations.Load() {
3091+
return
3092+
}
3093+
3094+
wResponse := proc.warehouseTransformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
3095+
proc.warehouseTransformer.CompareAndLog(eventsToTransform, pResponse, wResponse, commonMetaData, eventsByMessageID)
3096+
}
3097+
30633098
func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error {
30643099
if len(droppedJobs) > 0 {
30653100
for i := range droppedJobs { // each dropped job should have a unique jobID in the scope of the batch

processor/transformer/transformer.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,25 @@ func WithClient(client HTTPDoer) Opt {
164164
}
165165
}
166166

167-
// Transformer provides methods to transform events
168-
type Transformer interface {
169-
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
167+
type UserTransformer interface {
170168
UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
169+
}
170+
171+
type DestinationTransformer interface {
172+
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
173+
}
174+
175+
type TrackingPlanValidator interface {
171176
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
172177
}
173178

179+
// Transformer provides methods to transform events
180+
type Transformer interface {
181+
UserTransformer
182+
DestinationTransformer
183+
TrackingPlanValidator
184+
}
185+
174186
type HTTPDoer interface {
175187
Do(req *http.Request) (*http.Response, error)
176188
}
@@ -591,7 +603,7 @@ func (trans *handle) destTransformURL(destType string) string {
591603
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", trans.config.destTransformationURL, strings.ToLower(destType))
592604

593605
if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
594-
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
606+
whSchemaVersionQueryParam := fmt.Sprintf("whIDResolve=%t", trans.conf.GetBool("Warehouse.enableIDResolution", false))
595607
switch destType {
596608
case warehouseutils.RS:
597609
return destinationEndPoint + "?" + whSchemaVersionQueryParam
@@ -603,7 +615,7 @@ func (trans *handle) destTransformURL(destType string) string {
603615
}
604616
}
605617
if destType == warehouseutils.SnowpipeStreaming {
606-
return fmt.Sprintf("%s?whSchemaVersion=%s&whIDResolve=%t", destinationEndPoint, trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
618+
return fmt.Sprintf("%s?whIDResolve=%t", destinationEndPoint, trans.conf.GetBool("Warehouse.enableIDResolution", false))
607619
}
608620
return destinationEndPoint
609621
}

runner/buckets.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,5 +145,18 @@ var (
145145
// 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s
146146
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1,
147147
},
148+
149+
"warehouse_dest_transform_mismatched_events": {
150+
0.1, 0.25, 0.5, 1, 2.5, 5, 7.5, 10, 12.5, 15, 30, 60, 120, 300, 600, 900, 1800, 3600,
151+
},
152+
"warehouse_dest_transform_input_events": {
153+
1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 12500, 15000, 30000, 50000, 75000, 100000,
154+
},
155+
"warehouse_dest_transform_output_events": {
156+
1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 12500, 15000, 30000, 50000, 75000, 100000,
157+
},
158+
"warehouse_dest_transform_output_failed_events": {
159+
1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 12500, 15000, 30000, 50000, 75000, 100000,
160+
},
148161
}
149162
)

warehouse/internal/model/schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const (
1717
JSONDataType SchemaType = "json"
1818
TextDataType SchemaType = "text"
1919
DateTimeDataType SchemaType = "datetime"
20-
ArrayOfBooleanDatatype SchemaType = "array(boolean)"
20+
ArrayOfBooleanDataType SchemaType = "array(boolean)"
2121
)
2222

2323
type WHSchema struct {

warehouse/slave/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (w *worker) processStagingFile(ctx context.Context, job payload) ([]uploadR
319319
}
320320

321321
columnVal = newColumnVal
322-
case model.ArrayOfBooleanDatatype:
322+
case model.ArrayOfBooleanDataType:
323323
if boolValue, ok := columnVal.([]interface{}); ok {
324324
newColumnVal := make([]interface{}, len(boolValue))
325325

warehouse/transformer/datatype.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package transformer
2+
3+
import (
4+
"reflect"
5+
6+
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
7+
"github.com/rudderlabs/rudder-server/warehouse/transformer/internal/utils"
8+
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
9+
)
10+
11+
func dataTypeFor(destType, key string, val any, isJSONKey bool) string {
12+
if typeName := primitiveType(val); typeName != "" {
13+
return typeName
14+
}
15+
if strVal, ok := val.(string); ok && utils.ValidTimestamp(strVal) {
16+
return model.DateTimeDataType
17+
}
18+
if override := dataTypeOverride(destType, key, val, isJSONKey); override != "" {
19+
return override
20+
}
21+
return model.StringDataType
22+
}
23+
24+
func primitiveType(val any) string {
25+
switch v := val.(type) {
26+
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
27+
return model.IntDataType
28+
case float64:
29+
return getFloatType(v)
30+
case float32:
31+
return getFloatType(float64(v))
32+
case bool:
33+
return model.BooleanDataType
34+
default:
35+
return ""
36+
}
37+
}
38+
39+
func getFloatType(v float64) string {
40+
// JSON unmarshalling treats all numbers as float64 by default, even if they are whole numbers
41+
// So, we need to check if the float is actually an integer
42+
if v == float64(int64(v)) {
43+
return model.IntDataType
44+
}
45+
return model.FloatDataType
46+
}
47+
48+
func dataTypeOverride(destType, key string, val any, isJSONKey bool) string {
49+
switch destType {
50+
case whutils.POSTGRES, whutils.SNOWFLAKE, whutils.SnowpipeStreaming:
51+
return overrideForPostgresSnowflake(key, isJSONKey)
52+
case whutils.RS:
53+
return overrideForRedshift(val, isJSONKey)
54+
default:
55+
return ""
56+
}
57+
}
58+
59+
func overrideForPostgresSnowflake(key string, isJSONKey bool) string {
60+
if isJSONKey || key == violationErrors {
61+
return model.JSONDataType
62+
}
63+
return model.StringDataType
64+
}
65+
66+
func overrideForRedshift(val any, isJSONKey bool) string {
67+
if isJSONKey {
68+
return model.JSONDataType
69+
}
70+
if val == nil {
71+
return model.StringDataType
72+
}
73+
switch reflect.TypeOf(val).Kind() {
74+
case reflect.Slice, reflect.Array:
75+
if jsonVal, _ := json.Marshal(val); len(jsonVal) > redshiftStringLimit {
76+
return model.TextDataType
77+
}
78+
return model.StringDataType
79+
case reflect.String:
80+
if len(val.(string)) > redshiftStringLimit {
81+
return model.TextDataType
82+
}
83+
return model.StringDataType
84+
default:
85+
return model.StringDataType
86+
}
87+
}
88+
89+
func convertValIfDateTime(val any, colType string) any {
90+
if colType == model.DateTimeDataType {
91+
return utils.ToTimestamp(val)
92+
}
93+
return val
94+
}

0 commit comments

Comments
 (0)