Skip to content

Commit ddce7e4

Browse files
authored
fix: update table schema local repository (#6255)
# Description - When performing Operations like CreateTable, AddColumns, and AlterColumn on LocalSchemaRepository: - Read schema from PostgreSQL via GetLocalSchema - Modify the schema in memory - Write back to PostgreSQL via UpdateLocalSchema - Adding a lock to prevent these race conditions when multiple goroutines modify the schema concurrently. ## Linear Ticket - Resolves WAR-1046, WAR-1027 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 5e6e480 commit ddce7e4

File tree

2 files changed

+245
-161
lines changed

2 files changed

+245
-161
lines changed

warehouse/integrations/datalake/schema-repository/local.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,63 @@ package schemarepository
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
8-
9-
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
9+
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
1010
)
1111

12+
type Uploader interface {
13+
GetLocalSchema(ctx context.Context) (model.Schema, error)
14+
UpdateLocalSchema(ctx context.Context, schema model.Schema) error
15+
}
1216
type LocalSchemaRepository struct {
1317
warehouse model.Warehouse
14-
uploader warehouseutils.Uploader
18+
uploader Uploader
19+
20+
// mu protects the read-modify-write pattern for schema operations
21+
// Operations like CreateTable, AddColumns, AlterColumn:
22+
// 1. Read schema from PostgreSQL via GetLocalSchema
23+
// 2. Modify the schema in memory
24+
// 3. Write back to PostgreSQL via UpdateLocalSchema
25+
// This prevents race conditions when multiple goroutines modify the schema concurrently
26+
mu sync.RWMutex
1527
}
1628

17-
func NewLocalSchemaRepository(wh model.Warehouse, uploader warehouseutils.Uploader) (*LocalSchemaRepository, error) {
29+
func NewLocalSchemaRepository(warehouse model.Warehouse, uploader Uploader) (*LocalSchemaRepository, error) {
1830
ls := LocalSchemaRepository{
19-
warehouse: wh,
31+
warehouse: warehouse,
2032
uploader: uploader,
2133
}
22-
2334
return &ls, nil
2435
}
2536

2637
func (ls *LocalSchemaRepository) FetchSchema(ctx context.Context, _ model.Warehouse) (model.Schema, error) {
38+
ls.mu.RLock()
39+
defer ls.mu.RUnlock()
40+
2741
schema, err := ls.uploader.GetLocalSchema(ctx)
2842
if err != nil {
2943
return model.Schema{}, fmt.Errorf("fetching local schema: %w", err)
3044
}
31-
3245
return schema, nil
3346
}
3447

35-
func (*LocalSchemaRepository) CreateSchema(context.Context) (err error) {
48+
func (*LocalSchemaRepository) CreateSchema(context.Context) error {
3649
return nil
3750
}
3851

39-
func (ls *LocalSchemaRepository) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) (err error) {
52+
func (ls *LocalSchemaRepository) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) error {
53+
ls.mu.Lock()
54+
defer ls.mu.Unlock()
55+
4056
// fetch schema from local db
4157
schema, err := ls.uploader.GetLocalSchema(ctx)
4258
if err != nil {
4359
return fmt.Errorf("fetching local schema: %w", err)
4460
}
4561

62+
// check if table already exists
4663
if _, ok := schema[tableName]; ok {
4764
return fmt.Errorf("failed to create table: table %s already exists", tableName)
4865
}
@@ -54,7 +71,10 @@ func (ls *LocalSchemaRepository) CreateTable(ctx context.Context, tableName stri
5471
return ls.uploader.UpdateLocalSchema(ctx, schema)
5572
}
5673

57-
func (ls *LocalSchemaRepository) AddColumns(ctx context.Context, tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
74+
func (ls *LocalSchemaRepository) AddColumns(ctx context.Context, tableName string, columnsInfo []whutils.ColumnInfo) error {
75+
ls.mu.Lock()
76+
defer ls.mu.Unlock()
77+
5878
// fetch schema from local db
5979
schema, err := ls.uploader.GetLocalSchema(ctx)
6080
if err != nil {
@@ -75,6 +95,9 @@ func (ls *LocalSchemaRepository) AddColumns(ctx context.Context, tableName strin
7595
}
7696

7797
func (ls *LocalSchemaRepository) AlterColumn(ctx context.Context, tableName, columnName, columnType string) (model.AlterTableResponse, error) {
98+
ls.mu.Lock()
99+
defer ls.mu.Unlock()
100+
78101
// fetch schema from local db
79102
schema, err := ls.uploader.GetLocalSchema(ctx)
80103
if err != nil {
@@ -91,12 +114,13 @@ func (ls *LocalSchemaRepository) AlterColumn(ctx context.Context, tableName, col
91114
return model.AlterTableResponse{}, fmt.Errorf("failed to alter column: column %s does not exist in table %s", columnName, tableName)
92115
}
93116

117+
// update column type
94118
schema[tableName][columnName] = columnType
95119

96120
// update schema
97121
return model.AlterTableResponse{}, ls.uploader.UpdateLocalSchema(ctx, schema)
98122
}
99123

100-
func (*LocalSchemaRepository) RefreshPartitions(context.Context, string, []warehouseutils.LoadFile) error {
124+
func (*LocalSchemaRepository) RefreshPartitions(context.Context, string, []whutils.LoadFile) error {
101125
return nil
102126
}

0 commit comments

Comments
 (0)