Skip to content

Commit ea0dab3

Browse files
authored
fix: use source-id for getting cached schemas (#6268)
# Description - Use sourceID for getting schemas instead of ID to be more deterministic in cases where schemas are available for the same destinationID and namespace. ## Linear Ticket - Resolves WAR-1064 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent f51f6ad commit ea0dab3

File tree

2 files changed

+65
-8
lines changed

2 files changed

+65
-8
lines changed

warehouse/internal/repo/schema.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8-
"reflect"
98
"strings"
109
"time"
1110

11+
"github.com/google/go-cmp/cmp"
12+
1213
"github.com/rudderlabs/rudder-go-kit/config"
1314
"github.com/rudderlabs/rudder-go-kit/jsonrs"
1415

@@ -234,8 +235,9 @@ func (sh *WHSchema) GetForNamespace(ctx context.Context, destID, namespace strin
234235
if err != nil {
235236
return model.WHSchema{}, err
236237
}
237-
if !reflect.DeepEqual(originalSchema.Schema, tableLevelSchemas) {
238-
return model.WHSchema{}, errors.New("parent schema does not match parent schema")
238+
diff := cmp.Diff(originalSchema.Schema, tableLevelSchemas)
239+
if len(diff) > 0 {
240+
return model.WHSchema{}, fmt.Errorf("parent schema does not match: %s", diff)
239241
}
240242
return originalSchema, nil
241243
}
@@ -247,7 +249,7 @@ func (sh *WHSchema) getForNamespace(ctx context.Context, destID, namespace strin
247249
namespace = $2 AND
248250
table_name = ''
249251
ORDER BY
250-
id DESC;
252+
source_id DESC;
251253
`
252254

253255
rows, err := sh.db.QueryContext(
@@ -329,13 +331,13 @@ func (sh *WHSchema) getTableLevelSchemasForNamespaceWithTx(ctx context.Context,
329331
schema,
330332
ROW_NUMBER() OVER (
331333
PARTITION BY destination_id, namespace, table_name
332-
ORDER BY id DESC
334+
ORDER BY source_id DESC
333335
) as rn
334336
FROM ` + whSchemaTableName + `
335337
WHERE
336-
destination_id = $1
337-
AND namespace = $2
338-
AND table_name != ''
338+
destination_id = $1 AND
339+
namespace = $2 AND
340+
table_name != ''
339341
) t
340342
WHERE rn = 1;
341343
`

warehouse/internal/repo/schema_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,61 @@ func TestWHSchemasRepo(t *testing.T) {
443443
}
444444
}
445445

446+
func TestWHSchemasRepo_GetForNamespace(t *testing.T) {
447+
t.Run("SourceID ordering", func(t *testing.T) {
448+
db, ctx := setupDB(t), context.Background()
449+
now := time.Now().Truncate(time.Second).UTC()
450+
r := repo.NewWHSchemas(db, config.New(), repo.WithNow(func() time.Time {
451+
return now
452+
}))
453+
454+
require.NoError(t, r.Insert(ctx, &model.WHSchema{
455+
SourceID: "source_id_2",
456+
Namespace: "namespace_1",
457+
DestinationID: "destination_id_1",
458+
DestinationType: "destination_type_1",
459+
Schema: model.Schema{
460+
"table_name_1": {
461+
"column_name_1": "string",
462+
"column_name_2": "int",
463+
"column_name_3": "boolean",
464+
},
465+
},
466+
}))
467+
require.NoError(t, r.Insert(ctx, &model.WHSchema{
468+
SourceID: "source_id_1",
469+
Namespace: "namespace_1",
470+
DestinationID: "destination_id_1",
471+
DestinationType: "destination_type_1",
472+
Schema: model.Schema{
473+
"table_name_1": {
474+
"column_name_1": "string",
475+
"column_name_2": "int",
476+
"column_name_3": "boolean",
477+
},
478+
},
479+
CreatedAt: now,
480+
UpdatedAt: now,
481+
}))
482+
483+
expectedSchema, err := r.GetForNamespace(ctx, "destination_id_1", "namespace_1")
484+
require.NoError(t, err)
485+
require.Equal(t, "source_id_2", expectedSchema.SourceID)
486+
require.Equal(t, "namespace_1", expectedSchema.Namespace)
487+
require.Equal(t, "destination_id_1", expectedSchema.DestinationID)
488+
require.Equal(t, "destination_type_1", expectedSchema.DestinationType)
489+
require.Equal(t, model.Schema{
490+
"table_name_1": {
491+
"column_name_1": "string",
492+
"column_name_2": "int",
493+
"column_name_3": "boolean",
494+
},
495+
}, expectedSchema.Schema)
496+
require.Equal(t, now, expectedSchema.CreatedAt)
497+
require.Equal(t, now, expectedSchema.UpdatedAt)
498+
})
499+
}
500+
446501
func TestWHSchemasRepo_GetDestinationNamespaces(t *testing.T) {
447502
destinationID := "test_destination_id"
448503
conf := config.New()

0 commit comments

Comments
 (0)