Skip to content

Commit f5ab51a

Browse files
devops-github-rudderstackachettyiitrshekhar-rudder
authored
chore: sync release v1.47.2 to main branch (#5760)
# Description Syncing patch release v1.47.2 to main branch **↓↓ Please review and edit commit overrides before merging ↓↓** BEGIN_COMMIT_OVERRIDE fix: race condition in table schema updates causing incorrect schema to be saved (#5756) fix: warehouse extracting messageId and receivedAt from event payload (#5757) END_COMMIT_OVERRIDE --------- Co-authored-by: Akash Chetty <[email protected]> Co-authored-by: shekhar-rudder <[email protected]>
1 parent 877da25 commit f5ab51a

File tree

15 files changed

+466
-259
lines changed

15 files changed

+466
-259
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
## [1.47.2](https://github.com/rudderlabs/rudder-server/compare/v1.47.1...v1.47.2) (2025-04-23)
4+
5+
6+
### Bug Fixes
7+
8+
* race condition in table schema updates causing incorrect schema to be saved ([#5756](https://github.com/rudderlabs/rudder-server/issues/5756)) ([5ec12a0](https://github.com/rudderlabs/rudder-server/commit/5ec12a074a76ea28d3bdcb80482abed0c62c2e9c))
9+
* warehouse extracting messageId and receivedAt from event payload ([#5757](https://github.com/rudderlabs/rudder-server/issues/5757)) ([2532122](https://github.com/rudderlabs/rudder-server/commit/253212274862af9bf64eea4ee4d29bc987e5bde4))
10+
311
## [1.47.1](https://github.com/rudderlabs/rudder-server/compare/v1.47.0...v1.47.1) (2025-04-16)
412

513

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ALTER TABLE wh_staging_files ADD COLUMN bytes_per_table JSONB;
1+
ALTER TABLE wh_staging_files ADD COLUMN IF NOT EXISTS bytes_per_table JSONB;

warehouse/router/upload.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -167,19 +167,10 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
167167
stagingFileRepo: repo.NewStagingFiles(f.db),
168168
loadFilesRepo: repo.NewLoadFiles(f.db, f.conf),
169169
whSchemaRepo: repo.NewWHSchemas(f.db),
170-
schemaHandle: schema.New(
171-
f.db,
172-
dto.Warehouse,
173-
f.conf,
174-
f.logger.Child("warehouse"),
175-
f.statsFactory,
176-
whManager,
177-
),
178-
179-
upload: dto.Upload,
180-
warehouse: dto.Warehouse,
181-
stagingFiles: dto.StagingFiles,
182-
stagingFileIDs: repo.StagingFileIDs(dto.StagingFiles),
170+
upload: dto.Upload,
171+
warehouse: dto.Warehouse,
172+
stagingFiles: dto.StagingFiles,
173+
stagingFileIDs: repo.StagingFileIDs(dto.StagingFiles),
183174

184175
pendingTableUploadsRepo: repo.NewUploads(f.db),
185176
pendingTableUploads: []model.PendingTableUpload{},
@@ -302,6 +293,21 @@ func (job *UploadJob) run() (err error) {
302293
}
303294
defer whManager.Cleanup(job.ctx)
304295

296+
job.schemaHandle, err = schema.New(
297+
job.ctx,
298+
job.warehouse,
299+
job.conf,
300+
job.logger.Child("warehouse"),
301+
job.statsFactory,
302+
whManager,
303+
repo.NewWHSchemas(job.db),
304+
repo.NewStagingFiles(job.db),
305+
)
306+
if err != nil {
307+
_, _ = job.setUploadError(err, InternalProcessingFailed)
308+
return err
309+
}
310+
305311
var (
306312
newStatus string
307313
nextUploadState *state

warehouse/router/upload_test.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,20 @@ import (
2424
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
2525
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
2626
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
27+
"github.com/rudderlabs/rudder-server/warehouse/schema"
2728
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
2829
)
2930

31+
type mockSchemaRepo struct{}
32+
33+
func (m *mockSchemaRepo) GetForNamespace(ctx context.Context, sourceID, destinationID, namespace string) (model.WHSchema, error) {
34+
return model.WHSchema{}, nil
35+
}
36+
37+
func (m *mockSchemaRepo) Insert(ctx context.Context, schema *model.WHSchema) (int64, error) {
38+
return 0, nil
39+
}
40+
3041
func TestExtractUploadErrorsByState(t *testing.T) {
3142
input := []struct {
3243
InitialErrorState []byte
@@ -153,24 +164,27 @@ func TestColumnCountStat(t *testing.T) {
153164
whManager, err := manager.New(warehouseutils.POSTGRES, conf, logger.NOP, statsStore)
154165
require.NoError(t, err)
155166
ctx := context.Background()
167+
warehouse := model.Warehouse{
168+
Type: tc.destinationType,
169+
Destination: backendconfig.DestinationT{
170+
ID: destinationID,
171+
Name: destinationName,
172+
},
173+
Source: backendconfig.SourceT{
174+
ID: sourceID,
175+
Name: sourceName,
176+
},
177+
}
156178
j := uploadJobFactory.NewUploadJob(ctx, &model.UploadJob{
157179
Upload: model.Upload{
158180
WorkspaceID: workspaceID,
159181
DestinationID: destinationID,
160182
SourceID: sourceID,
161183
},
162-
Warehouse: model.Warehouse{
163-
Type: tc.destinationType,
164-
Destination: backendconfig.DestinationT{
165-
ID: destinationID,
166-
Name: destinationName,
167-
},
168-
Source: backendconfig.SourceT{
169-
ID: sourceID,
170-
Name: sourceName,
171-
},
172-
},
184+
Warehouse: warehouse,
173185
}, whManager)
186+
j.schemaHandle, err = schema.New(ctx, warehouse, conf, logger.NOP, statsStore, nil, &mockSchemaRepo{}, nil)
187+
require.NoError(t, err)
174188
err = j.schemaHandle.UpdateTableSchema(ctx, tableName, model.TableSchema{
175189
"test-column-1": "string",
176190
"test-column-2": "string",

warehouse/schema/schema.go

Lines changed: 77 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919

2020
"github.com/rudderlabs/rudder-server/jsonrs"
2121
"github.com/rudderlabs/rudder-server/utils/timeutil"
22-
"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
2322
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
2423
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
2524
"github.com/rudderlabs/rudder-server/warehouse/logfield"
@@ -78,56 +77,89 @@ type schema struct {
7877
fetchSchemaRepo fetchSchemaRepo
7978
now func() time.Time
8079
cachedSchema model.Schema
81-
cacheExpiry time.Time
8280
cachedSchemaMu sync.RWMutex
8381
}
8482

8583
func New(
86-
db *sqlquerywrapper.DB,
84+
ctx context.Context,
8785
warehouse model.Warehouse,
8886
conf *config.Config,
89-
logger logger.Logger,
87+
slogger logger.Logger,
9088
statsFactory stats.Stats,
9189
fetchSchemaRepo fetchSchemaRepo,
92-
) Handler {
90+
schemaRepo schemaRepo,
91+
stagingFileRepo stagingFileRepo,
92+
) (Handler, error) {
9393
ttlInMinutes := conf.GetDurationVar(720, time.Minute, "Warehouse.schemaTTLInMinutes")
94-
schema := &schema{
94+
sh := &schema{
9595
warehouse: warehouse,
96-
log: logger.Child("schema"),
96+
log: slogger.Child("schema"),
9797
ttlInMinutes: ttlInMinutes,
98-
schemaRepo: repo.NewWHSchemas(db),
98+
schemaRepo: schemaRepo,
9999
stagingFilesSchemaPaginationSize: conf.GetInt("Warehouse.stagingFilesSchemaPaginationSize", 100),
100-
stagingFileRepo: repo.NewStagingFiles(db),
100+
stagingFileRepo: stagingFileRepo,
101101
fetchSchemaRepo: fetchSchemaRepo,
102102
enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false),
103103
now: timeutil.Now,
104104
}
105-
schema.stats.schemaSize = statsFactory.NewTaggedStat("warehouse_schema_size", stats.HistogramType, stats.Tags{
105+
sh.stats.schemaSize = statsFactory.NewTaggedStat("warehouse_schema_size", stats.HistogramType, stats.Tags{
106106
"module": "warehouse",
107107
"workspaceId": warehouse.WorkspaceID,
108108
"destType": warehouse.Destination.DestinationDefinition.Name,
109109
"sourceId": warehouse.Source.ID,
110110
"destinationId": warehouse.Destination.ID,
111111
})
112-
return schema
112+
// cachedSchema can be computed in the constructor
113+
// we need not worry about it getting expired in the middle of the job
114+
// since we need the schema to be the same for the entireduration of the job
115+
whSchema, err := sh.schemaRepo.GetForNamespace(
116+
ctx,
117+
sh.warehouse.Source.ID,
118+
sh.warehouse.Destination.ID,
119+
sh.warehouse.Namespace,
120+
)
121+
if err != nil {
122+
return nil, fmt.Errorf("getting schema for namespace: %w", err)
123+
}
124+
if whSchema.Schema == nil {
125+
sh.cachedSchema = model.Schema{}
126+
return sh, nil
127+
}
128+
if whSchema.ExpiresAt.After(sh.now()) {
129+
sh.cachedSchema = whSchema.Schema
130+
return sh, nil
131+
}
132+
sh.log.Infon("Schema expired", obskit.DestinationID(sh.warehouse.Destination.ID), obskit.Namespace(sh.warehouse.Namespace), logger.NewTimeField("expiresAt", whSchema.ExpiresAt))
133+
return sh, sh.fetchSchemaFromWarehouse(ctx)
113134
}
114135

115-
func (sh *schema) IsSchemaEmpty(ctx context.Context) bool {
116-
schema, err := sh.getSchema(ctx)
136+
func (sh *schema) fetchSchemaFromWarehouse(ctx context.Context) error {
137+
start := sh.now()
138+
warehouseSchema, err := sh.fetchSchemaRepo.FetchSchema(ctx)
139+
if err != nil {
140+
return fmt.Errorf("fetching schema: %w", err)
141+
}
142+
duration := math.Round((sh.now().Sub(start).Minutes() * 1000)) / 1000
143+
sh.log.Infon("Fetched schema from warehouse", obskit.DestinationID(sh.warehouse.Destination.ID), obskit.Namespace(sh.warehouse.Type), logger.NewFloatField("timeTakenInMinutes", duration))
144+
removeDeprecatedColumns(warehouseSchema, sh.warehouse, sh.log)
145+
err = sh.saveSchema(ctx, warehouseSchema)
117146
if err != nil {
118-
sh.log.Warnn("error getting schema", obskit.Error(err))
119-
return true
147+
return fmt.Errorf("saving schema: %w", err)
120148
}
121-
return len(schema) == 0
149+
sh.cachedSchema = warehouseSchema
150+
return nil
151+
}
152+
153+
func (sh *schema) IsSchemaEmpty(ctx context.Context) bool {
154+
sh.cachedSchemaMu.RLock()
155+
defer sh.cachedSchemaMu.RUnlock()
156+
return len(sh.cachedSchema) == 0
122157
}
123158

124159
func (sh *schema) GetTableSchema(ctx context.Context, tableName string) model.TableSchema {
125-
schema, err := sh.getSchema(ctx)
126-
if err != nil {
127-
sh.log.Warnn("error getting schema", obskit.Error(err))
128-
return model.TableSchema{}
129-
}
130-
return schema[tableName]
160+
sh.cachedSchemaMu.RLock()
161+
defer sh.cachedSchemaMu.RUnlock()
162+
return sh.cachedSchema[tableName]
131163
}
132164

133165
func (sh *schema) UpdateSchema(ctx context.Context, updatedSchema model.Schema) error {
@@ -136,32 +168,31 @@ func (sh *schema) UpdateSchema(ctx context.Context, updatedSchema model.Schema)
136168
return fmt.Errorf("marshaling schema: %w", err)
137169
}
138170
sh.stats.schemaSize.Observe(float64(len(updatedSchemaInBytes)))
139-
return sh.saveSchema(ctx, updatedSchema)
171+
sh.cachedSchemaMu.Lock()
172+
defer sh.cachedSchemaMu.Unlock()
173+
err = sh.saveSchema(ctx, updatedSchema)
174+
if err != nil {
175+
return fmt.Errorf("saving schema: %w", err)
176+
}
177+
sh.cachedSchema = updatedSchema
178+
return nil
140179
}
141180

142181
func (sh *schema) UpdateTableSchema(ctx context.Context, tableName string, tableSchema model.TableSchema) error {
143-
schema, err := sh.getSchema(ctx)
144-
if err != nil {
145-
return fmt.Errorf("getting schema: %w", err)
146-
}
147-
schemaCopy := make(model.Schema)
148-
for k, v := range schema {
149-
schemaCopy[k] = v
150-
}
151-
schemaCopy[tableName] = tableSchema
152-
err = sh.saveSchema(ctx, schemaCopy)
182+
sh.cachedSchemaMu.Lock()
183+
defer sh.cachedSchemaMu.Unlock()
184+
sh.cachedSchema[tableName] = tableSchema
185+
err := sh.saveSchema(ctx, sh.cachedSchema)
153186
if err != nil {
154187
return fmt.Errorf("saving schema: %w", err)
155188
}
156189
return nil
157190
}
158191

159192
func (sh *schema) GetColumnsCount(ctx context.Context, tableName string) (int, error) {
160-
schema, err := sh.getSchema(ctx)
161-
if err != nil {
162-
return 0, fmt.Errorf("getting schema: %w", err)
163-
}
164-
return len(schema[tableName]), nil
193+
sh.cachedSchemaMu.RLock()
194+
defer sh.cachedSchemaMu.RUnlock()
195+
return len(sh.cachedSchema[tableName]), nil
165196
}
166197

167198
func (sh *schema) ConsolidateStagingFilesSchema(ctx context.Context, stagingFiles []*model.StagingFile) (model.Schema, error) {
@@ -175,12 +206,10 @@ func (sh *schema) ConsolidateStagingFilesSchema(ctx context.Context, stagingFile
175206

176207
consolidatedSchema = consolidateStagingSchemas(consolidatedSchema, schemas)
177208
}
178-
schema, err := sh.getSchema(ctx)
179-
if err != nil {
180-
return nil, fmt.Errorf("getting schema: %v", err)
181-
}
182-
consolidatedSchema = consolidateWarehouseSchema(consolidatedSchema, schema)
183-
consolidatedSchema = overrideUsersWithIdentifiesSchema(consolidatedSchema, sh.warehouse.Type, schema)
209+
sh.cachedSchemaMu.RLock()
210+
defer sh.cachedSchemaMu.RUnlock()
211+
consolidatedSchema = consolidateWarehouseSchema(consolidatedSchema, sh.cachedSchema)
212+
consolidatedSchema = overrideUsersWithIdentifiesSchema(consolidatedSchema, sh.warehouse.Type, sh.cachedSchema)
184213
consolidatedSchema = enhanceDiscardsSchema(consolidatedSchema, sh.warehouse.Type)
185214
consolidatedSchema = enhanceSchemaWithIDResolution(consolidatedSchema, sh.isIDResolutionEnabled(), sh.warehouse.Type)
186215

@@ -192,23 +221,9 @@ func (sh *schema) isIDResolutionEnabled() bool {
192221
}
193222

194223
func (sh *schema) TableSchemaDiff(ctx context.Context, tableName string, tableSchema model.TableSchema) (whutils.TableSchemaDiff, error) {
195-
schema, err := sh.getSchema(ctx)
196-
if err != nil {
197-
return whutils.TableSchemaDiff{}, fmt.Errorf("getting schema: %w", err)
198-
}
199-
return tableSchemaDiff(tableName, schema, tableSchema), nil
200-
}
201-
202-
func (sh *schema) fetchSchemaFromWarehouse(ctx context.Context) (model.Schema, error) {
203-
start := sh.now()
204-
warehouseSchema, err := sh.fetchSchemaRepo.FetchSchema(ctx)
205-
if err != nil {
206-
return nil, fmt.Errorf("fetching schema: %w", err)
207-
}
208-
duration := math.Round((sh.now().Sub(start).Minutes() * 1000)) / 1000
209-
sh.log.Infon("Fetched schema from warehouse", obskit.DestinationID(sh.warehouse.Destination.ID), obskit.Namespace(sh.warehouse.Type), logger.NewFloatField("timeTakenInMinutes", duration))
210-
removeDeprecatedColumns(warehouseSchema, sh.warehouse, sh.log)
211-
return warehouseSchema, sh.saveSchema(ctx, warehouseSchema)
224+
sh.cachedSchemaMu.RLock()
225+
defer sh.cachedSchemaMu.RUnlock()
226+
return tableSchemaDiff(tableName, sh.cachedSchema, tableSchema), nil
212227
}
213228

214229
func (sh *schema) saveSchema(ctx context.Context, newSchema model.Schema) error {
@@ -224,48 +239,10 @@ func (sh *schema) saveSchema(ctx context.Context, newSchema model.Schema) error
224239
if err != nil {
225240
return fmt.Errorf("inserting schema: %w", err)
226241
}
227-
sh.cachedSchemaMu.Lock()
228-
sh.cachedSchema = newSchema
229-
sh.cachedSchemaMu.Unlock()
230-
sh.cacheExpiry = expiresAt
242+
sh.log.Infon("Saved schema", obskit.DestinationID(sh.warehouse.Destination.ID), obskit.Namespace(sh.warehouse.Namespace))
231243
return nil
232244
}
233245

234-
func (sh *schema) getSchema(ctx context.Context) (model.Schema, error) {
235-
sh.cachedSchemaMu.RLock()
236-
if sh.cachedSchema != nil && sh.cacheExpiry.After(sh.now()) {
237-
defer sh.cachedSchemaMu.RUnlock()
238-
sh.log.Debugn("Returning cached schema", obskit.DestinationID(sh.warehouse.Destination.ID), obskit.Namespace(sh.warehouse.Type))
239-
return sh.cachedSchema, nil
240-
}
241-
sh.cachedSchemaMu.RUnlock()
242-
whSchema, err := sh.schemaRepo.GetForNamespace(
243-
ctx,
244-
sh.warehouse.Source.ID,
245-
sh.warehouse.Destination.ID,
246-
sh.warehouse.Namespace,
247-
)
248-
if err != nil {
249-
return nil, fmt.Errorf("getting schema for namespace: %w", err)
250-
}
251-
if whSchema.Schema == nil {
252-
sh.cachedSchemaMu.Lock()
253-
defer sh.cachedSchemaMu.Unlock()
254-
sh.cachedSchema = model.Schema{}
255-
sh.cacheExpiry = sh.now().Add(sh.ttlInMinutes)
256-
return sh.cachedSchema, nil
257-
}
258-
if whSchema.ExpiresAt.Before(sh.now()) {
259-
sh.log.Infon("Schema expired", obskit.DestinationID(sh.warehouse.Destination.ID), obskit.Namespace(sh.warehouse.Namespace), logger.NewTimeField("expiresAt", whSchema.ExpiresAt))
260-
return sh.fetchSchemaFromWarehouse(ctx)
261-
}
262-
sh.cachedSchemaMu.Lock()
263-
defer sh.cachedSchemaMu.Unlock()
264-
sh.cachedSchema = whSchema.Schema
265-
sh.cacheExpiry = whSchema.ExpiresAt
266-
return sh.cachedSchema, nil
267-
}
268-
269246
// consolidateStagingSchemas merges multiple schemas into one
270247
// Prefer the type of the first schema, If the type is text, prefer text
271248
func consolidateStagingSchemas(consolidatedSchema model.Schema, schemas []model.Schema) model.Schema {

0 commit comments

Comments
 (0)