Skip to content

Commit 2bbe140

Browse files
authored
fix: warehouse transformations for data_warehouse json paths (#5653)
1 parent ff799d4 commit 2bbe140

File tree

7 files changed

+175
-10
lines changed

7 files changed

+175
-10
lines changed

warehouse/transformer/events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,9 @@ func (t *Transformer) usersResponse(tec *transformEventContext, commonData map[s
397397
}
398398

399399
func shouldSkipUsersTable(tec *transformEventContext) bool {
400-
return tec.event.Metadata.DestinationType == whutils.SnowpipeStreaming || tec.destOpts.skipUsersTable || tec.intrOpts.skipUsersTable
400+
return tec.event.Metadata.DestinationType == whutils.SnowpipeStreaming ||
401+
tec.destOpts.skipUsersTable ||
402+
tec.intrOpts.skipUsersTable
401403
}
402404

403405
func (t *Transformer) pageEvents(tec *transformEventContext) ([]map[string]any, error) {

warehouse/transformer/events_test.go

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ func TestEvents(t *testing.T) {
626626
"userId": "",
627627
}
628628
}
629-
trackMergedefaultOutput := func() testhelper.OutputBuilder {
629+
trackMergeDefaultOutput := func() testhelper.OutputBuilder {
630630
return testhelper.OutputBuilder{
631631
"data": map[string]any{
632632
"merge_property_1_type": "anonymous_id",
@@ -1843,6 +1843,166 @@ func TestEvents(t *testing.T) {
18431843
},
18441844
},
18451845
},
1846+
{
1847+
name: "track (POSTGRES) jsonPaths (legacy destOpts for properties)",
1848+
eventPayload: `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","properties":{"location": {"city":"Palo Alto","state":"California","country":"USA","coordinates":{"latitude":37.4419,"longitude":-122.143,"geo":{"altitude":30.5,"accuracy":5,"details":{"altitudeUnits":"meters","accuracyUnits":"meters"}}}},"review_id":"86ac1cd43","product_id":"9578257311"},"userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`,
1849+
metadata: getTrackMetadata("POSTGRES", "webhook"),
1850+
destination: getDestination("POSTGRES", map[string]any{
1851+
"jsonPaths": "location",
1852+
}),
1853+
expectedResponse: types.Response{
1854+
Events: []types.TransformerResponse{
1855+
{
1856+
Output: trackDefaultOutput(),
1857+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1858+
StatusCode: http.StatusOK,
1859+
},
1860+
{
1861+
Output: trackEventDefaultOutput().
1862+
SetDataField("location", "{\"city\":\"Palo Alto\",\"coordinates\":{\"geo\":{\"accuracy\":5,\"altitude\":30.5,\"details\":{\"accuracyUnits\":\"meters\",\"altitudeUnits\":\"meters\"}},\"latitude\":37.4419,\"longitude\":-122.143},\"country\":\"USA\",\"state\":\"California\"}").
1863+
SetColumnField("location", "json"),
1864+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1865+
StatusCode: http.StatusOK,
1866+
},
1867+
},
1868+
},
1869+
},
1870+
{
1871+
name: "track (POSTGRES) jsonPaths (legacy destOpts for user properties)",
1872+
eventPayload: `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","properties":{"review_id":"86ac1cd43","product_id":"9578257311"},"userProperties":{"location": {"city":"Palo Alto","state":"California","country":"USA","coordinates":{"latitude":37.4419,"longitude":-122.143,"geo":{"altitude":30.5,"accuracy":5,"details":{"altitudeUnits":"meters","accuracyUnits":"meters"}}}}, "rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`,
1873+
metadata: getTrackMetadata("POSTGRES", "webhook"),
1874+
destination: getDestination("POSTGRES", map[string]any{
1875+
"jsonPaths": "location",
1876+
}),
1877+
expectedResponse: types.Response{
1878+
Events: []types.TransformerResponse{
1879+
{
1880+
Output: trackDefaultOutput(),
1881+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1882+
StatusCode: http.StatusOK,
1883+
},
1884+
{
1885+
Output: trackEventDefaultOutput().
1886+
SetDataField("location", "{\"city\":\"Palo Alto\",\"coordinates\":{\"geo\":{\"accuracy\":5,\"altitude\":30.5,\"details\":{\"accuracyUnits\":\"meters\",\"altitudeUnits\":\"meters\"}},\"latitude\":37.4419,\"longitude\":-122.143},\"country\":\"USA\",\"state\":\"California\"}").
1887+
SetColumnField("location", "json"),
1888+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1889+
StatusCode: http.StatusOK,
1890+
},
1891+
},
1892+
},
1893+
},
1894+
{
1895+
name: "track (POSTGRES) jsonPaths (destOpts)",
1896+
eventPayload: `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","properties":{"review_id":"86ac1cd43","product_id":"9578257311", "location": {"city":"Palo Alto","state":"California","country":"USA","coordinates":{"latitude":37.4419,"longitude":-122.143,"geo":{"altitude":30.5,"accuracy":5,"details":{"altitudeUnits":"meters","accuracyUnits":"meters"}}}}},"userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`,
1897+
metadata: getTrackMetadata("POSTGRES", "webhook"),
1898+
destination: getDestination("POSTGRES", map[string]any{
1899+
"jsonPaths": "track.properties.location",
1900+
}),
1901+
expectedResponse: types.Response{
1902+
Events: []types.TransformerResponse{
1903+
{
1904+
Output: trackDefaultOutput(),
1905+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1906+
StatusCode: http.StatusOK,
1907+
},
1908+
{
1909+
Output: trackEventDefaultOutput().
1910+
SetDataField("location", "{\"city\":\"Palo Alto\",\"coordinates\":{\"geo\":{\"accuracy\":5,\"altitude\":30.5,\"details\":{\"accuracyUnits\":\"meters\",\"altitudeUnits\":\"meters\"}},\"latitude\":37.4419,\"longitude\":-122.143},\"country\":\"USA\",\"state\":\"California\"}").
1911+
SetColumnField("location", "json"),
1912+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1913+
StatusCode: http.StatusOK,
1914+
},
1915+
},
1916+
},
1917+
},
1918+
{
1919+
name: "track (POSTGRES) jsonPaths (intrOpts)",
1920+
eventPayload: `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","properties":{"review_id":"86ac1cd43","product_id":"9578257311", "location": {"city":"Palo Alto","state":"California","country":"USA","coordinates":{"latitude":37.4419,"longitude":-122.143,"geo":{"altitude":30.5,"accuracy":5,"details":{"altitudeUnits":"meters","accuracyUnits":"meters"}}}}},"userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"},"integrations":{"POSTGRES":{"options":{"jsonPaths":["track.properties.location"]}}}}`,
1921+
metadata: getTrackMetadata("POSTGRES", "webhook"),
1922+
destination: getDestination("POSTGRES", map[string]any{}),
1923+
expectedResponse: types.Response{
1924+
Events: []types.TransformerResponse{
1925+
{
1926+
Output: trackDefaultOutput(),
1927+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1928+
StatusCode: http.StatusOK,
1929+
},
1930+
{
1931+
Output: trackEventDefaultOutput().
1932+
SetDataField("location", "{\"city\":\"Palo Alto\",\"coordinates\":{\"geo\":{\"accuracy\":5,\"altitude\":30.5,\"details\":{\"accuracyUnits\":\"meters\",\"altitudeUnits\":\"meters\"}},\"latitude\":37.4419,\"longitude\":-122.143},\"country\":\"USA\",\"state\":\"California\"}").
1933+
SetColumnField("location", "json"),
1934+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1935+
StatusCode: http.StatusOK,
1936+
},
1937+
},
1938+
},
1939+
},
1940+
{
1941+
name: "track (POSTGRES) jsonPaths (DATA_WAREHOUSE)",
1942+
eventPayload: `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","properties":{"review_id":"86ac1cd43","product_id":"9578257311", "location": {"city":"Palo Alto","state":"California","country":"USA","coordinates":{"latitude":37.4419,"longitude":-122.143,"geo":{"altitude":30.5,"accuracy":5,"details":{"altitudeUnits":"meters","accuracyUnits":"meters"}}}}},"userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"},"integrations":{"DATA_WAREHOUSE":{"options":{"jsonPaths":["track.properties.location"]}}}}`,
1943+
metadata: getTrackMetadata("POSTGRES", "webhook"),
1944+
destination: getDestination("POSTGRES", map[string]any{}),
1945+
expectedResponse: types.Response{
1946+
Events: []types.TransformerResponse{
1947+
{
1948+
Output: trackDefaultOutput(),
1949+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1950+
StatusCode: http.StatusOK,
1951+
},
1952+
{
1953+
Output: trackEventDefaultOutput().
1954+
SetDataField("location", "{\"city\":\"Palo Alto\",\"coordinates\":{\"geo\":{\"accuracy\":5,\"altitude\":30.5,\"details\":{\"accuracyUnits\":\"meters\",\"altitudeUnits\":\"meters\"}},\"latitude\":37.4419,\"longitude\":-122.143},\"country\":\"USA\",\"state\":\"California\"}").
1955+
SetColumnField("location", "json"),
1956+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1957+
StatusCode: http.StatusOK,
1958+
},
1959+
},
1960+
},
1961+
},
1962+
{
1963+
name: "track (POSTGRES) jsonPaths (intrOpts with higher path)",
1964+
eventPayload: `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","properties":{"review_id":"86ac1cd43","product_id":"9578257311", "location": {"city":"Palo Alto","state":"California","country":"USA","coordinates":{"latitude":37.4419,"longitude":-122.143,"geo":{"altitude":30.5,"accuracy":5,"details":{"altitudeUnits":"meters","accuracyUnits":"meters"}}}}},"userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"},"integrations":{"DATA_WAREHOUSE":{"options":{"jsonPaths":["track.properties.location"]}},"POSTGRES":{"options":{"jsonPaths":["track.properties.location.coordinates"]}}}}`,
1965+
metadata: getTrackMetadata("POSTGRES", "webhook"),
1966+
destination: getDestination("POSTGRES", map[string]any{}),
1967+
expectedResponse: types.Response{
1968+
Events: []types.TransformerResponse{
1969+
{
1970+
Output: trackDefaultOutput(),
1971+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1972+
StatusCode: http.StatusOK,
1973+
},
1974+
{
1975+
Output: trackEventDefaultOutput().
1976+
SetDataField("location", "{\"city\":\"Palo Alto\",\"coordinates\":{\"geo\":{\"accuracy\":5,\"altitude\":30.5,\"details\":{\"accuracyUnits\":\"meters\",\"altitudeUnits\":\"meters\"}},\"latitude\":37.4419,\"longitude\":-122.143},\"country\":\"USA\",\"state\":\"California\"}").
1977+
SetColumnField("location", "json"),
1978+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1979+
StatusCode: http.StatusOK,
1980+
},
1981+
},
1982+
},
1983+
},
1984+
{
1985+
name: "track (POSTGRES) jsonPaths (DATA_WAREHOUSE with higher path)",
1986+
eventPayload: `{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","properties":{"review_id":"86ac1cd43","product_id":"9578257311", "location": {"city":"Palo Alto","state":"California","country":"USA","coordinates":{"latitude":37.4419,"longitude":-122.143,"geo":{"altitude":30.5,"accuracy":5,"details":{"altitudeUnits":"meters","accuracyUnits":"meters"}}}}},"userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"},"integrations":{"DATA_WAREHOUSE":{"options":{"jsonPaths":["track.properties.location.coordinates"]}},"POSTGRES":{"options":{"jsonPaths":["track.properties.location"]}}}}`,
1987+
metadata: getTrackMetadata("POSTGRES", "webhook"),
1988+
destination: getDestination("POSTGRES", map[string]any{}),
1989+
expectedResponse: types.Response{
1990+
Events: []types.TransformerResponse{
1991+
{
1992+
Output: trackDefaultOutput(),
1993+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
1994+
StatusCode: http.StatusOK,
1995+
},
1996+
{
1997+
Output: trackEventDefaultOutput().
1998+
SetDataField("location", "{\"city\":\"Palo Alto\",\"coordinates\":{\"geo\":{\"accuracy\":5,\"altitude\":30.5,\"details\":{\"accuracyUnits\":\"meters\",\"altitudeUnits\":\"meters\"}},\"latitude\":37.4419,\"longitude\":-122.143},\"country\":\"USA\",\"state\":\"California\"}").
1999+
SetColumnField("location", "json"),
2000+
Metadata: getTrackMetadata("POSTGRES", "webhook"),
2001+
StatusCode: http.StatusOK,
2002+
},
2003+
},
2004+
},
2005+
},
18462006
{
18472007
name: "track (BQ) merge event",
18482008
configOverride: map[string]any{
@@ -1868,7 +2028,7 @@ func TestEvents(t *testing.T) {
18682028
StatusCode: http.StatusOK,
18692029
},
18702030
{
1871-
Output: trackMergedefaultOutput(),
2031+
Output: trackMergeDefaultOutput(),
18722032
Metadata: getTrackMetadata("BQ", "webhook"),
18732033
StatusCode: http.StatusOK,
18742034
},

warehouse/transformer/logger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (t *Transformer) CompareAndLog(
3333
return
3434
}
3535

36-
t.stats.comparisionTime.RecordDuration()()
36+
t.stats.comparisonTime.RecordDuration()()
3737

3838
differingEvents, sampleDiff := t.differingEvents(events, pResponse, wResponse, eventsByMessageID)
3939
if len(differingEvents) == 0 {

warehouse/transformer/options.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,14 @@ func mergeDataWarehouseIntrOpts(destType string, message map[string]any, opts in
4646

4747
setOption(srcMap, "jsonPaths", &jsonPaths)
4848
if len(jsonPaths) > 0 && utils.IsJSONPathSupportedAsPartOfConfig(destType) {
49-
for _, jp := range jsonPaths {
50-
if jpStr, ok := jp.(string); ok {
51-
opts.jsonPaths = append(opts.jsonPaths, jpStr)
49+
mergedJSONPaths := make([]string, 0, len(jsonPaths)+len(opts.jsonPaths))
50+
for _, jsonPath := range jsonPaths {
51+
if jsonPathStr, ok := jsonPath.(string); ok {
52+
mergedJSONPaths = append(mergedJSONPaths, jsonPathStr)
5253
}
5354
}
55+
mergedJSONPaths = append(mergedJSONPaths, opts.jsonPaths...)
56+
opts.jsonPaths = mergedJSONPaths
5457
}
5558
return opts
5659
}

warehouse/transformer/options_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestIntegrationOptions(t *testing.T) {
136136
require.False(t, opts.useBlendoCasing)
137137
require.True(t, opts.skipTracksTable)
138138
require.False(t, opts.skipUsersTable)
139-
require.Equal(t, []string{"path1", "path2", "path3", "path4", "path5"}, opts.jsonPaths)
139+
require.Equal(t, []string{"path4", "path5", "path1", "path2", "path3"}, opts.jsonPaths)
140140
})
141141
}
142142

warehouse/transformer/transformer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func New(conf *config.Config, logger logger.Logger, statsFactory stats.Stats) *T
3737

3838
t.stats.matchedEvents = t.statsFactory.NewStat("warehouse_dest_transform_matched_events", stats.HistogramType)
3939
t.stats.mismatchedEvents = t.statsFactory.NewStat("warehouse_dest_transform_mismatched_events", stats.HistogramType)
40-
t.stats.comparisionTime = t.statsFactory.NewStat("warehouse_dest_transform_comparison_time", stats.TimerType)
40+
t.stats.comparisonTime = t.statsFactory.NewStat("warehouse_dest_transform_comparison_time", stats.TimerType)
4141

4242
t.config.enableIDResolution = conf.GetReloadableBoolVar(false, "Warehouse.enableIDResolution")
4343
t.config.populateSrcDestInfoInContext = conf.GetReloadableBoolVar(true, "WH_POPULATE_SRC_DEST_INFO_IN_CONTEXT")

warehouse/transformer/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type (
1919
statsFactory stats.Stats
2020

2121
stats struct {
22-
comparisionTime stats.Timer
22+
comparisonTime stats.Timer
2323
matchedEvents stats.Histogram
2424
mismatchedEvents stats.Histogram
2525
}

0 commit comments

Comments
 (0)