Skip to content

Commit 4309aa9

Browse files
authored
chore: varchar handling for mssql and azure synapse (#5557)
1 parent a1b19ba commit 4309aa9

File tree

6 files changed

+458
-28
lines changed

6 files changed

+458
-28
lines changed

warehouse/integrations/azure-synapse/azure-synapse.go

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ import (
3636
)
3737

3838
const (
39-
stringLengthLimit = 512
40-
provider = warehouseutils.AzureSynapse
41-
tableNameLimit = 127
39+
varcharDefaultLength = 512
40+
varcharMaxLength = -1
41+
provider = warehouseutils.AzureSynapse
42+
tableNameLimit = 127
4243
)
4344

4445
var errorsMappings []model.JobError
@@ -77,6 +78,10 @@ var azureSynapseDataTypesMapToRudder = map[string]string{
7778
"bit": "boolean",
7879
}
7980

81+
var stringColumns = lo.Keys(lo.PickBy(azureSynapseDataTypesMapToRudder, func(_, value string) bool {
82+
return value == "string"
83+
}))
84+
8085
type AzureSynapse struct {
8186
db *sqlmw.DB
8287
namespace string
@@ -295,12 +300,18 @@ func (as *AzureSynapse) loadTable(
295300
return nil, "", fmt.Errorf("preparing copyIn statement: %w", err)
296301
}
297302

303+
varcharLengthMap, err := as.getVarcharLengthMap(ctx, tableName)
304+
if err != nil {
305+
return nil, "", fmt.Errorf("getting varchar column length map: %w", err)
306+
}
307+
298308
log.Infow("loading data into staging table")
299309
for _, fileName := range fileNames {
300310
err = as.loadDataIntoStagingTable(
301311
ctx, log, stmt,
302312
fileName, sortedColumnKeys,
303313
extraColumns, tableSchemaInUpload,
314+
varcharLengthMap,
304315
)
305316
if err != nil {
306317
return nil, "", fmt.Errorf("loading data into staging table from file %s: %w", fileName, err)
@@ -341,6 +352,47 @@ func (as *AzureSynapse) loadTable(
341352
}, stagingTableName, nil
342353
}
343354

355+
// getVarcharLengthMap retrieves the maximum allowed length for varchar columns in a given table.
356+
// A `CHARACTER_MAXIMUM_LENGTH` of `-1` indicates that the column has the maximum possible length (i.e., `varchar(max)`).
357+
func (as *AzureSynapse) getVarcharLengthMap(ctx context.Context, tableName string) (map[string]int, error) {
358+
dataTypes := "'" + strings.Join(stringColumns, "', '") + "'"
359+
query := fmt.Sprintf(`
360+
SELECT column_name, CHARACTER_MAXIMUM_LENGTH
361+
FROM INFORMATION_SCHEMA.COLUMNS
362+
WHERE TABLE_SCHEMA = @schema
363+
AND TABLE_NAME = @tableName
364+
AND DATA_TYPE IN (%s);
365+
`,
366+
dataTypes,
367+
)
368+
369+
columnsMap := make(map[string]int)
370+
rows, err := as.db.QueryContext(ctx, query,
371+
sql.Named("schema", as.namespace),
372+
sql.Named("tableName", tableName),
373+
)
374+
if errors.Is(err, io.EOF) {
375+
return columnsMap, nil
376+
}
377+
if err != nil {
378+
return nil, fmt.Errorf("querying varchar columns length: %w", err)
379+
}
380+
defer func() { _ = rows.Close() }()
381+
382+
for rows.Next() {
383+
var columnName string
384+
var maxLength int
385+
if err := rows.Scan(&columnName, &maxLength); err != nil {
386+
return nil, err
387+
}
388+
columnsMap[columnName] = maxLength
389+
}
390+
if err := rows.Err(); err != nil {
391+
return nil, fmt.Errorf("iterating varchar columns length: %w", err)
392+
}
393+
return columnsMap, nil
394+
}
395+
344396
func (as *AzureSynapse) loadDataIntoStagingTable(
345397
ctx context.Context,
346398
log logger.Logger,
@@ -349,6 +401,7 @@ func (as *AzureSynapse) loadDataIntoStagingTable(
349401
sortedColumnKeys []string,
350402
extraColumns []string,
351403
tableSchemaInUpload model.TableSchema,
404+
varcharLengthMap map[string]int,
352405
) error {
353406
gzipFile, err := os.Open(fileName)
354407
if err != nil {
@@ -405,9 +458,10 @@ func (as *AzureSynapse) loadDataIntoStagingTable(
405458
continue
406459
}
407460

408-
processedVal, err := as.ProcessColumnValue(
461+
processedVal, err := ProcessColumnValue(
409462
value.(string),
410463
valueType,
464+
varcharLengthMap[sortedColumnKeys[index]],
411465
)
412466
if err != nil {
413467
log.Warnw("mismatch in datatype",
@@ -445,9 +499,13 @@ func (as *AzureSynapse) loadDataIntoStagingTable(
445499
return nil
446500
}
447501

448-
func (as *AzureSynapse) ProcessColumnValue(
502+
// ProcessColumnValue processes the input string `value` based on its specified `valueType`.
503+
// It converts the value to the appropriate type and ensures it adheres to the constraints
504+
// such as `varcharLength` for string types.
505+
func ProcessColumnValue(
449506
value string,
450507
valueType string,
508+
varcharLength int,
451509
) (interface{}, error) {
452510
switch valueType {
453511
case model.IntDataType:
@@ -459,15 +517,21 @@ func (as *AzureSynapse) ProcessColumnValue(
459517
case model.BooleanDataType:
460518
return strconv.ParseBool(value)
461519
case model.StringDataType:
462-
if len(value) > stringLengthLimit {
463-
value = value[:stringLengthLimit]
520+
// If the varchar length is set to the maximum allowed, return the string as is.
521+
if varcharLength == varcharMaxLength {
522+
return value, nil
523+
}
524+
525+
maxStringLength := max(varcharLength, varcharDefaultLength)
526+
if len(value) > maxStringLength {
527+
value = value[:maxStringLength]
464528
}
465529
if !hasDiacritics(value) {
466530
return value, nil
467531
} else {
468532
byteArr := str2ucs2(value)
469-
if len(byteArr) > stringLengthLimit {
470-
byteArr = byteArr[:stringLengthLimit]
533+
if len(byteArr) > maxStringLength {
534+
byteArr = byteArr[:maxStringLength]
471535
}
472536
return byteArr, nil
473537
}

warehouse/integrations/azure-synapse/azure_synapse_test.go

Lines changed: 156 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -629,14 +629,160 @@ func TestIntegration(t *testing.T) {
629629
)
630630
require.Equal(t, records, whth.DiscardTestRecords())
631631
})
632+
t.Run("varchar max length", func(t *testing.T) {
633+
tableName := "varchar_max_length_test_table"
634+
smallString, bigString, biggerString := strings.Repeat("a", 512), strings.Repeat("a", 8000), strings.Repeat("a", 65535)
635+
636+
uploadOutput := whth.UploadLoadFile(t, fm, "testdata/load.maxvarchar.csv.gz", tableName)
637+
638+
loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
639+
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
640+
641+
az := azuresynapse.New(config.New(), logger.NOP, stats.NOP)
642+
require.NoError(t, az.Setup(ctx, warehouse, mockUploader))
643+
require.NoError(t, az.CreateSchema(ctx))
644+
require.NoError(t, az.CreateTable(ctx, tableName, schemaInWarehouse))
645+
646+
loadTableStat, err := az.LoadTable(ctx, tableName)
647+
require.NoError(t, err)
648+
require.Equal(t, loadTableStat.RowsInserted, int64(14))
649+
require.Equal(t, loadTableStat.RowsUpdated, int64(0))
650+
651+
records := whth.RetrieveRecordsFromWarehouse(t, db,
652+
fmt.Sprintf(`
653+
SELECT
654+
id,
655+
received_at,
656+
test_bool,
657+
test_datetime,
658+
cast(test_float AS float) AS test_float,
659+
test_int,
660+
test_string
661+
FROM
662+
%q.%q
663+
ORDER BY
664+
id;
665+
`,
666+
namespace,
667+
tableName,
668+
),
669+
)
670+
require.Equal(t, records, [][]string{
671+
{"6734e5db-f918-4efe-1421-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "125", ""},
672+
{"6734e5db-f918-4efe-2314-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "125.75", "", ""},
673+
{"6734e5db-f918-4efe-2352-872f66e235c5", "2022-12-15T06:53:49Z", "", "2022-12-15T06:53:49Z", "", "", ""},
674+
{"6734e5db-f918-4efe-2414-872f66e235c5", "2022-12-15T06:53:49Z", "false", "2022-12-15T06:53:49Z", "126.75", "126", smallString},
675+
{"6734e5db-f918-4efe-3555-872f66e235c5", "2022-12-15T06:53:49Z", "false", "", "", "", ""},
676+
{"6734e5db-f918-4efe-5152-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", smallString},
677+
{"6734e5db-f918-4efe-5323-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", ""},
678+
{"7274e5db-f918-4efe-1212-872f66e235c5", "2022-12-15T06:53:49Z", "true", "2022-12-15T06:53:49Z", "125.75", "125", smallString},
679+
{"7274e5db-f918-4efe-1454-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "125", ""},
680+
{"7274e5db-f918-4efe-1511-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", ""},
681+
{"7274e5db-f918-4efe-2323-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "125.75", "", ""},
682+
{"7274e5db-f918-4efe-4524-872f66e235c5", "2022-12-15T06:53:49Z", "true", "", "", "", ""},
683+
{"7274e5db-f918-4efe-5151-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", smallString},
684+
{"7274e5db-f918-4efe-5322-872f66e235c5", "2022-12-15T06:53:49Z", "", "2022-12-15T06:53:49Z", "", "", ""},
685+
})
686+
687+
t.Log("Test varchar for big string (8000)")
688+
_, err = db.ExecContext(ctx, "ALTER TABLE "+namespace+"."+tableName+" ALTER COLUMN test_string VARCHAR(8000);")
689+
require.NoError(t, err)
690+
691+
loadTableStat, err = az.LoadTable(ctx, tableName)
692+
require.NoError(t, err)
693+
require.Equal(t, loadTableStat.RowsInserted, int64(0))
694+
require.Equal(t, loadTableStat.RowsUpdated, int64(14))
695+
696+
records = whth.RetrieveRecordsFromWarehouse(t, db,
697+
fmt.Sprintf(`
698+
SELECT
699+
id,
700+
received_at,
701+
test_bool,
702+
test_datetime,
703+
cast(test_float AS float) AS test_float,
704+
test_int,
705+
test_string
706+
FROM
707+
%q.%q
708+
ORDER BY
709+
id;
710+
`,
711+
namespace,
712+
tableName,
713+
),
714+
)
715+
require.Equal(t, records, [][]string{
716+
{"6734e5db-f918-4efe-1421-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "125", ""},
717+
{"6734e5db-f918-4efe-2314-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "125.75", "", ""},
718+
{"6734e5db-f918-4efe-2352-872f66e235c5", "2022-12-15T06:53:49Z", "", "2022-12-15T06:53:49Z", "", "", ""},
719+
{"6734e5db-f918-4efe-2414-872f66e235c5", "2022-12-15T06:53:49Z", "false", "2022-12-15T06:53:49Z", "126.75", "126", bigString},
720+
{"6734e5db-f918-4efe-3555-872f66e235c5", "2022-12-15T06:53:49Z", "false", "", "", "", ""},
721+
{"6734e5db-f918-4efe-5152-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", bigString},
722+
{"6734e5db-f918-4efe-5323-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", ""},
723+
{"7274e5db-f918-4efe-1212-872f66e235c5", "2022-12-15T06:53:49Z", "true", "2022-12-15T06:53:49Z", "125.75", "125", bigString},
724+
{"7274e5db-f918-4efe-1454-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "125", ""},
725+
{"7274e5db-f918-4efe-1511-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", ""},
726+
{"7274e5db-f918-4efe-2323-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "125.75", "", ""},
727+
{"7274e5db-f918-4efe-4524-872f66e235c5", "2022-12-15T06:53:49Z", "true", "", "", "", ""},
728+
{"7274e5db-f918-4efe-5151-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", bigString},
729+
{"7274e5db-f918-4efe-5322-872f66e235c5", "2022-12-15T06:53:49Z", "", "2022-12-15T06:53:49Z", "", "", ""},
730+
})
731+
732+
t.Log("Test varchar for bigger string (65535)")
733+
_, err = db.ExecContext(ctx, "ALTER TABLE "+namespace+"."+tableName+" ALTER COLUMN test_string VARCHAR(MAX);")
734+
require.NoError(t, err)
735+
736+
loadTableStat, err = az.LoadTable(ctx, tableName)
737+
require.NoError(t, err)
738+
require.Equal(t, loadTableStat.RowsInserted, int64(0))
739+
require.Equal(t, loadTableStat.RowsUpdated, int64(14))
740+
741+
records = whth.RetrieveRecordsFromWarehouse(t, db,
742+
fmt.Sprintf(`
743+
SELECT
744+
id,
745+
received_at,
746+
test_bool,
747+
test_datetime,
748+
cast(test_float AS float) AS test_float,
749+
test_int,
750+
test_string
751+
FROM
752+
%q.%q
753+
ORDER BY
754+
id;
755+
`,
756+
namespace,
757+
tableName,
758+
),
759+
)
760+
require.Equal(t, records, [][]string{
761+
{"6734e5db-f918-4efe-1421-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "125", ""},
762+
{"6734e5db-f918-4efe-2314-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "125.75", "", ""},
763+
{"6734e5db-f918-4efe-2352-872f66e235c5", "2022-12-15T06:53:49Z", "", "2022-12-15T06:53:49Z", "", "", ""},
764+
{"6734e5db-f918-4efe-2414-872f66e235c5", "2022-12-15T06:53:49Z", "false", "2022-12-15T06:53:49Z", "126.75", "126", biggerString},
765+
{"6734e5db-f918-4efe-3555-872f66e235c5", "2022-12-15T06:53:49Z", "false", "", "", "", ""},
766+
{"6734e5db-f918-4efe-5152-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", biggerString},
767+
{"6734e5db-f918-4efe-5323-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", ""},
768+
{"7274e5db-f918-4efe-1212-872f66e235c5", "2022-12-15T06:53:49Z", "true", "2022-12-15T06:53:49Z", "125.75", "125", biggerString},
769+
{"7274e5db-f918-4efe-1454-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "125", ""},
770+
{"7274e5db-f918-4efe-1511-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", ""},
771+
{"7274e5db-f918-4efe-2323-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "125.75", "", ""},
772+
{"7274e5db-f918-4efe-4524-872f66e235c5", "2022-12-15T06:53:49Z", "true", "", "", "", ""},
773+
{"7274e5db-f918-4efe-5151-872f66e235c5", "2022-12-15T06:53:49Z", "", "", "", "", biggerString},
774+
{"7274e5db-f918-4efe-5322-872f66e235c5", "2022-12-15T06:53:49Z", "", "2022-12-15T06:53:49Z", "", "", ""},
775+
})
776+
})
632777
})
633778
}
634779

635-
func TestAzureSynapse_ProcessColumnValue(t *testing.T) {
780+
func TestProcessColumnValue(t *testing.T) {
636781
testCases := []struct {
637782
name string
638783
data string
639784
dataType string
785+
varcharLength int
640786
expectedValue interface{}
641787
wantError bool
642788
}{
@@ -695,11 +841,18 @@ func TestAzureSynapse_ProcessColumnValue(t *testing.T) {
695841
expectedValue: "test",
696842
},
697843
{
698-
name: "valid string exceeding max length",
844+
name: "valid string exceeding max length when varcharMaxlength is not set",
699845
data: strings.Repeat("test", 200),
700846
dataType: model.StringDataType,
701847
expectedValue: strings.Repeat("test", 128),
702848
},
849+
{
850+
name: "valid string exceeding max length when varcharMaxlength is set",
851+
data: strings.Repeat("test", 200),
852+
dataType: model.StringDataType,
853+
varcharLength: 1024,
854+
expectedValue: strings.Repeat("test", 200),
855+
},
703856
{
704857
name: "valid string with diacritics",
705858
data: "tést",
@@ -710,9 +863,7 @@ func TestAzureSynapse_ProcessColumnValue(t *testing.T) {
710863

711864
for _, tc := range testCases {
712865
t.Run(tc.name, func(t *testing.T) {
713-
az := azuresynapse.New(config.New(), logger.NOP, stats.NOP)
714-
715-
value, err := az.ProcessColumnValue(tc.data, tc.dataType)
866+
value, err := azuresynapse.ProcessColumnValue(tc.data, tc.dataType, tc.varcharLength)
716867
if tc.wantError {
717868
require.Error(t, err)
718869
return
Binary file not shown.

0 commit comments

Comments
 (0)