Skip to content

Commit 68e6309

Browse files
authored
fix: warehouse transformations concurrent writes (#5724)
1 parent 8c35d0a commit 68e6309

File tree

6 files changed

+87
-49
lines changed

6 files changed

+87
-49
lines changed

warehouse/transformer/benchmark/main.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/google/uuid"
16+
"github.com/samber/lo"
1617
"golang.org/x/sync/errgroup"
1718

1819
"github.com/rudderlabs/rudder-go-kit/config"
@@ -58,16 +59,16 @@ func run(ctx context.Context) error {
5859
logFactory := logger.Default
5960
l := logFactory.NewLogger().Child("warehouse-transformer-benchmark").Child(mode)
6061

61-
clientEvents := make([]types.TransformerEvent, eventsInBatch)
62-
for i := 0; i < len(clientEvents); i++ {
63-
var transformerEvent types.TransformerEvent
64-
err := jsonrs.Unmarshal([]byte(sampleEvent), &transformerEvent)
65-
if err != nil {
66-
return fmt.Errorf("could not unmarshal sample client event: %w", err)
67-
}
68-
clientEvents[i] = transformerEvent
62+
var te types.TransformerEvent
63+
err := jsonrs.Unmarshal([]byte(sampleEvent), &te)
64+
if err != nil {
65+
return fmt.Errorf("could not unmarshal sample client event: %w", err)
6966
}
7067

68+
clientEvents := lo.RepeatBy(eventsInBatch, func(int) types.TransformerEvent {
69+
return te
70+
})
71+
7172
t, err := selectTransformer(mode, conf, l)
7273
if err != nil {
7374
return fmt.Errorf("could not select transformer: %w", err)

warehouse/transformer/events.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (t *Transformer) trackCommonProps(tec *transformEventContext) (map[string]a
3636
commonData := make(map[string]any)
3737
commonMetadata := make(map[string]string)
3838

39-
if err := setDataAndMetadataFromInput(tec, tec.event.Message["context"], commonData, commonMetadata, &prefixInfo{
39+
if err := setDataAndMetadataFromInput(tec, t.eventContext(tec), commonData, commonMetadata, &prefixInfo{
4040
completePrefix: "track_context_",
4141
completeLevel: 2,
4242
prefix: "context_",
@@ -83,7 +83,7 @@ func (t *Transformer) tracksResponse(tec *transformEventContext, commonData map[
8383
if err := setDataAndMetadataFromRules(tec, data, metadata, rules.TrackTableRules); err != nil {
8484
return nil, fmt.Errorf("tracks response: setting data and column types from rules: %w", err)
8585
}
86-
if err := storeRudderEvent(tec, data, metadata); err != nil {
86+
if err := t.storeRudderEvent(tec, data, metadata); err != nil {
8787
return nil, fmt.Errorf("tracks response: storing rudder event: %w", err)
8888
}
8989

@@ -167,7 +167,7 @@ func (t *Transformer) extractEvents(tec *transformEventContext) ([]map[string]an
167167
data := make(map[string]any)
168168
metadata := make(map[string]string)
169169

170-
if err := setDataAndMetadataFromInput(tec, tec.event.Message["context"], data, metadata, &prefixInfo{
170+
if err := setDataAndMetadataFromInput(tec, t.eventContext(tec), data, metadata, &prefixInfo{
171171
completePrefix: "extract_context_",
172172
completeLevel: 2,
173173
prefix: "context_",
@@ -279,7 +279,7 @@ func (t *Transformer) identifyCommonProps(tec *transformEventContext) (map[strin
279279
}); err != nil {
280280
return nil, nil, fmt.Errorf("identify common props: setting data and column types from message: %w", err)
281281
}
282-
if err := setDataAndMetadataFromInput(tec, tec.event.Message["context"], commonData, commonMetadata, &prefixInfo{
282+
if err := setDataAndMetadataFromInput(tec, t.eventContext(tec), commonData, commonMetadata, &prefixInfo{
283283
completePrefix: "identify_context_",
284284
completeLevel: 2,
285285
prefix: "context_",
@@ -310,7 +310,7 @@ func (t *Transformer) identifiesResponse(tec *transformEventContext, commonData
310310
if err := setDataAndMetadataFromRules(tec, data, metadata, rules.DefaultRules); err != nil {
311311
return nil, fmt.Errorf("identifies response: setting data and column types from rules: %w", err)
312312
}
313-
if err := storeRudderEvent(tec, data, metadata); err != nil {
313+
if err := t.storeRudderEvent(tec, data, metadata); err != nil {
314314
return nil, fmt.Errorf("identifies response: storing rudder event: %w", err)
315315
}
316316

@@ -412,7 +412,7 @@ func (t *Transformer) pageEvents(tec *transformEventContext) ([]map[string]any,
412412
}); err != nil {
413413
return nil, fmt.Errorf("page: setting data and column types from input: %w", err)
414414
}
415-
if err := setDataAndMetadataFromInput(tec, tec.event.Message["context"], data, metadata, &prefixInfo{
415+
if err := setDataAndMetadataFromInput(tec, t.eventContext(tec), data, metadata, &prefixInfo{
416416
completePrefix: "page_context_",
417417
completeLevel: 2,
418418
prefix: "context_",
@@ -426,7 +426,7 @@ func (t *Transformer) pageEvents(tec *transformEventContext) ([]map[string]any,
426426
return nil, fmt.Errorf("page: setting data and column types from rules: %w", err)
427427
}
428428

429-
if err := storeRudderEvent(tec, data, metadata); err != nil {
429+
if err := t.storeRudderEvent(tec, data, metadata); err != nil {
430430
return nil, fmt.Errorf("page: storing rudder event: %w", err)
431431
}
432432

@@ -466,7 +466,7 @@ func (t *Transformer) screenEvents(tec *transformEventContext) ([]map[string]any
466466
}); err != nil {
467467
return nil, fmt.Errorf("screen: setting data and column types from input: %w", err)
468468
}
469-
if err := setDataAndMetadataFromInput(tec, tec.event.Message["context"], data, metadata, &prefixInfo{
469+
if err := setDataAndMetadataFromInput(tec, t.eventContext(tec), data, metadata, &prefixInfo{
470470
completePrefix: "screen_context_",
471471
completeLevel: 2,
472472
prefix: "context_",
@@ -480,7 +480,7 @@ func (t *Transformer) screenEvents(tec *transformEventContext) ([]map[string]any
480480
return nil, fmt.Errorf("screen: setting data and column types from rules: %w", err)
481481
}
482482

483-
if err := storeRudderEvent(tec, data, metadata); err != nil {
483+
if err := t.storeRudderEvent(tec, data, metadata); err != nil {
484484
return nil, fmt.Errorf("screen: storing rudder event: %w", err)
485485
}
486486

@@ -520,7 +520,7 @@ func (t *Transformer) groupEvents(tec *transformEventContext) ([]map[string]any,
520520
}); err != nil {
521521
return nil, fmt.Errorf("group: setting data and column types from input: %w", err)
522522
}
523-
if err := setDataAndMetadataFromInput(tec, tec.event.Message["context"], data, metadata, &prefixInfo{
523+
if err := setDataAndMetadataFromInput(tec, t.eventContext(tec), data, metadata, &prefixInfo{
524524
completePrefix: "group_context_",
525525
completeLevel: 2,
526526
prefix: "context_",
@@ -534,7 +534,7 @@ func (t *Transformer) groupEvents(tec *transformEventContext) ([]map[string]any,
534534
return nil, fmt.Errorf("group: setting data and column types from rules: %w", err)
535535
}
536536

537-
if err := storeRudderEvent(tec, data, metadata); err != nil {
537+
if err := t.storeRudderEvent(tec, data, metadata); err != nil {
538538
return nil, fmt.Errorf("group: storing rudder event: %w", err)
539539
}
540540

@@ -574,7 +574,7 @@ func (t *Transformer) aliasEvents(tec *transformEventContext) ([]map[string]any,
574574
}); err != nil {
575575
return nil, fmt.Errorf("alias: setting data and column types from input: %w", err)
576576
}
577-
if err := setDataAndMetadataFromInput(tec, tec.event.Message["context"], data, metadata, &prefixInfo{
577+
if err := setDataAndMetadataFromInput(tec, t.eventContext(tec), data, metadata, &prefixInfo{
578578
completePrefix: "alias_context_",
579579
completeLevel: 2,
580580
prefix: "context_",
@@ -588,7 +588,7 @@ func (t *Transformer) aliasEvents(tec *transformEventContext) ([]map[string]any,
588588
return nil, fmt.Errorf("alias: setting data and column types from rules: %w", err)
589589
}
590590

591-
if err := storeRudderEvent(tec, data, metadata); err != nil {
591+
if err := t.storeRudderEvent(tec, data, metadata); err != nil {
592592
return nil, fmt.Errorf("alias: storing rudder event: %w", err)
593593
}
594594

warehouse/transformer/internal/rules/rules.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ type Rules func(event *types.TransformerEvent) (any, error)
1616

1717
var (
1818
DefaultRules = map[string]Rules{
19-
"id": staticRule("messageId"),
19+
"id": messageIDFromMetadata(), // Use metadata, since we are not modifying Message
2020
"anonymous_id": staticRule("anonymousId"),
2121
"user_id": staticRule("userId"),
2222
"sent_at": staticRule("sentAt"),
2323
"timestamp": staticRule("timestamp"),
24-
"received_at": staticRule("receivedAt"),
24+
"received_at": receivedAtFromMetadata(), // Use metadata, since we are not modifying Message
2525
"original_timestamp": staticRule("originalTimestamp"),
2626
"channel": staticRule("channel"),
2727
"context_ip": func(event *types.TransformerEvent) (any, error) {
@@ -101,7 +101,7 @@ var (
101101
"id": func(event *types.TransformerEvent) (any, error) {
102102
return extractRecordID(&event.Metadata)
103103
},
104-
"received_at": staticRule("receivedAt"),
104+
"received_at": receivedAtFromMetadata(), // Use metadata, since we are not modifying Message
105105
"event": staticRule("event"),
106106
}
107107
)
@@ -112,6 +112,18 @@ func staticRule(key string) Rules {
112112
}
113113
}
114114

115+
func messageIDFromMetadata() Rules {
116+
return func(event *types.TransformerEvent) (any, error) {
117+
return event.Metadata.MessageID, nil
118+
}
119+
}
120+
121+
func receivedAtFromMetadata() Rules {
122+
return func(event *types.TransformerEvent) (any, error) {
123+
return event.Metadata.ReceivedAt, nil
124+
}
125+
}
126+
115127
var rudderReservedColumns = map[string]map[string]struct{}{
116128
"track": createReservedColumns(DefaultRules, TrackRules, TrackTableRules, TrackEventTableRules),
117129
"identify": createReservedColumns(DefaultRules, IdentifyRules),

warehouse/transformer/set.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"strings"
66

77
"github.com/samber/lo"
8+
"github.com/tidwall/sjson"
89

910
"github.com/rudderlabs/rudder-server/jsonrs"
1011
"github.com/rudderlabs/rudder-server/warehouse/transformer/internal/rules"
@@ -187,7 +188,7 @@ func setDataAndMetadataFromRules(
187188
return nil
188189
}
189190

190-
func storeRudderEvent(
191+
func (t *Transformer) storeRudderEvent(
191192
tec *transformEventContext,
192193
data map[string]any, metadata map[string]string,
193194
) error {
@@ -204,6 +205,24 @@ func storeRudderEvent(
204205
if err != nil {
205206
return fmt.Errorf("marshalling event: %w", err)
206207
}
208+
if t.config.populateSrcDestInfoInContext.Load() {
209+
eventJSON, err = sjson.SetBytes(eventJSON, "context.sourceId", tec.event.Metadata.SourceID)
210+
if err != nil {
211+
return fmt.Errorf("setting source id: %w", err)
212+
}
213+
eventJSON, err = sjson.SetBytes(eventJSON, "context.sourceType", tec.event.Metadata.SourceType)
214+
if err != nil {
215+
return fmt.Errorf("setting source type: %w", err)
216+
}
217+
eventJSON, err = sjson.SetBytes(eventJSON, "context.destinationId", tec.event.Metadata.DestinationID)
218+
if err != nil {
219+
return fmt.Errorf("setting destination id: %w", err)
220+
}
221+
eventJSON, err = sjson.SetBytes(eventJSON, "context.destinationType", tec.event.Metadata.DestinationType)
222+
if err != nil {
223+
return fmt.Errorf("setting destination type: %w", err)
224+
}
225+
}
207226

208227
data[columnName] = string(eventJSON)
209228
metadata[columnName] = utils.GetFullEventColumnTypeByDestType(tec.event.Metadata.DestinationType)

warehouse/transformer/transformer.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"maps"
78
"net/http"
89
"sort"
910
"strings"
@@ -133,35 +134,43 @@ func (t *Transformer) Transform(_ context.Context, clientEvents []types.Transfor
133134
}
134135

135136
func (t *Transformer) processWarehouseMessage(cache *cache, event *types.TransformerEvent) ([]map[string]any, error) {
136-
if err := t.enhanceContextWithSourceDestInfo(event); err != nil {
137-
return nil, fmt.Errorf("enhancing context with source and destination info: %w", err)
137+
if err := t.checkValidContext(event); err != nil {
138+
return nil, fmt.Errorf("checking valid context: %w", err)
138139
}
139140
t.addMandatoryFields(event)
140141
return t.handleEvent(event, cache)
141142
}
142143

143-
func (t *Transformer) enhanceContextWithSourceDestInfo(event *types.TransformerEvent) error {
144+
func (t *Transformer) checkValidContext(event *types.TransformerEvent) error {
144145
if !t.config.populateSrcDestInfoInContext.Load() {
145146
return nil
146147
}
147-
148-
messageContext, ok := event.Message["context"]
149-
if !ok || messageContext == nil {
150-
messageContext = map[string]any{}
148+
contextVal, exists := event.Message["context"]
149+
if !exists || contextVal == nil {
150+
return nil
151151
}
152-
messageContextMap, ok := messageContext.(map[string]any)
153-
if !ok {
152+
if !utils.IsObject(contextVal) {
154153
return response.ErrContextNotMap
155154
}
156-
messageContextMap["sourceId"] = event.Metadata.SourceID
157-
messageContextMap["sourceType"] = event.Metadata.SourceType
158-
messageContextMap["destinationId"] = event.Metadata.DestinationID
159-
messageContextMap["destinationType"] = event.Metadata.DestinationType
160-
161-
event.Message["context"] = messageContextMap
162155
return nil
163156
}
164157

158+
func (t *Transformer) eventContext(tec *transformEventContext) any {
159+
contextVal, exists := tec.event.Message["context"]
160+
if !t.config.populateSrcDestInfoInContext.Load() {
161+
return contextVal
162+
}
163+
if !exists || contextVal == nil {
164+
contextVal = map[string]interface{}{}
165+
}
166+
clonedContext := maps.Clone(contextVal.(map[string]any))
167+
clonedContext["sourceId"] = tec.event.Metadata.SourceID
168+
clonedContext["sourceType"] = tec.event.Metadata.SourceType
169+
clonedContext["destinationId"] = tec.event.Metadata.DestinationID
170+
clonedContext["destinationType"] = tec.event.Metadata.DestinationType
171+
return clonedContext
172+
}
173+
165174
func (t *Transformer) addMandatoryFields(event *types.TransformerEvent) {
166175
t.ensureMessageID(event)
167176
t.ensureReceivedAt(event)
@@ -171,7 +180,6 @@ func (t *Transformer) ensureMessageID(event *types.TransformerEvent) {
171180
messageID, exists := event.Message["messageId"]
172181
if !exists || utils.IsBlank(messageID) {
173182
event.Metadata.MessageID = "auto-" + t.uuidGenerator()
174-
event.Message["messageId"] = event.Metadata.MessageID
175183
}
176184
}
177185

@@ -192,7 +200,6 @@ func (t *Transformer) setDefaultReceivedAt(event *types.TransformerEvent) {
192200
if !utils.ValidTimestamp(event.Metadata.ReceivedAt) {
193201
event.Metadata.ReceivedAt = t.now().Format(misc.RFC3339Milli)
194202
}
195-
event.Message["receivedAt"] = event.Metadata.ReceivedAt
196203
}
197204

198205
func (t *Transformer) handleEvent(event *types.TransformerEvent, cache *cache) ([]map[string]any, error) {

warehouse/transformer/transformer_benchmark_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,18 @@ import (
1010
"github.com/rudderlabs/rudder-go-kit/config"
1111
"github.com/rudderlabs/rudder-go-kit/logger"
1212
"github.com/rudderlabs/rudder-go-kit/stats"
13+
1314
"github.com/rudderlabs/rudder-server/jsonrs"
1415
"github.com/rudderlabs/rudder-server/processor/types"
1516
)
1617

1718
/*
18-
Benchmark_Transformer/Identify_(POSTGRES)_-_1000_events
19-
Benchmark_Transformer/Identify_(POSTGRES)_-_1000_events-12 9 112701398 ns/op
20-
21-
Benchmark_Transformer/Identify_(POSTGRES)_-_1000_events
22-
Benchmark_Transformer/Identify_(POSTGRES)_-_1000_events-12 28 37071064 ns/op
19+
Benchmark_Transformer
20+
Benchmark_Transformer/Identify_(POSTGRES)_-_10000_events
21+
Benchmark_Transformer/Identify_(POSTGRES)_-_10000_events-12 12 85358024 ns/op
2322
*/
2423
func Benchmark_Transformer(b *testing.B) {
25-
b.Run("Identify (POSTGRES) - 1000 events", func(t *testing.B) {
24+
b.Run("Identify (POSTGRES) - 10000 events", func(t *testing.B) {
2625
b.StopTimer()
2726
eventPayload := `{"type":"identify","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"review_id":"86ac1cd43","product_id":"9578257311"},"userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`
2827
metadata := getMetadata("identify", "POSTGRES")
@@ -34,7 +33,7 @@ func Benchmark_Transformer(b *testing.B) {
3433
err := jsonrs.Unmarshal([]byte(eventPayload), &singularEvent)
3534
require.NoError(t, err)
3635

37-
batchSize := 1000
36+
batchSize := 10000
3837
events := lo.Times(batchSize, func(int) types.TransformerEvent {
3938
return types.TransformerEvent{
4039
Message: singularEvent,

0 commit comments

Comments
 (0)