Skip to content

Commit d800f4c

Browse files
authored
Merge pull request #123 from planetscale/add-null-to-schema
**Breaking change**: Supports multiple column types (mostly to support nullable column types)
2 parents 0c0cb75 + 7c823df commit d800f4c

6 files changed

+58
-41
lines changed

cmd/airbyte-source/read.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func ReadCommand(ch *Helper) *cobra.Command {
6464

6565
catalog, err := readCatalog(readSourceCatalogPath)
6666
if err != nil {
67-
ch.Logger.Error("Unable to read catalog")
67+
ch.Logger.Error(fmt.Sprintf("Unable to read catalog: %+v", err))
6868
os.Exit(1)
6969
}
7070

cmd/airbyte-source/read_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestRead_StartingGtidsAndState(t *testing.T) {
2828
Type: "object",
2929
Properties: map[string]internal.PropertyType{
3030
"id": {
31-
Type: "number",
31+
Type: []string{"number"},
3232
AirbyteType: "integer",
3333
},
3434
},
@@ -55,7 +55,7 @@ func TestRead_StartingGtidsAndState(t *testing.T) {
5555
Type: "object",
5656
Properties: map[string]internal.PropertyType{
5757
"id": {
58-
Type: "number",
58+
Type: []string{"number"},
5959
AirbyteType: "integer",
6060
},
6161
},

cmd/internal/planetscale_edge_database.go

+18-9
Original file line numberDiff line numberDiff line change
@@ -132,28 +132,37 @@ func (p PlanetScaleEdgeDatabase) getStreamForTable(ctx context.Context, psc Plan
132132
}
133133

134134
// Convert columnType to Airbyte type.
135-
func getJsonSchemaType(mysqlType string, treatTinyIntAsBoolean bool) PropertyType {
135+
func getJsonSchemaType(mysqlType string, treatTinyIntAsBoolean bool, nullable string) PropertyType {
136136
// Support custom airbyte types documented here :
137137
// https://docs.airbyte.com/understanding-airbyte/supported-data-types/#the-types
138+
var propertyType PropertyType
139+
138140
switch {
139141
case strings.HasPrefix(mysqlType, "tinyint(1)"):
140142
if treatTinyIntAsBoolean {
141-
return PropertyType{Type: "boolean"}
143+
propertyType = PropertyType{Type: []string{"boolean"}}
144+
} else {
145+
propertyType = PropertyType{Type: []string{"number"}, AirbyteType: "integer"}
142146
}
143-
return PropertyType{Type: "number", AirbyteType: "integer"}
144147
case strings.HasPrefix(mysqlType, "int"), strings.HasPrefix(mysqlType, "smallint"), strings.HasPrefix(mysqlType, "mediumint"), strings.HasPrefix(mysqlType, "bigint"), strings.HasPrefix(mysqlType, "tinyint"):
145-
return PropertyType{Type: "number", AirbyteType: "integer"}
148+
propertyType = PropertyType{Type: []string{"number"}, AirbyteType: "integer"}
146149
case strings.HasPrefix(mysqlType, "decimal"), strings.HasPrefix(mysqlType, "double"), strings.HasPrefix(mysqlType, "float"):
147-
return PropertyType{Type: "number"}
150+
propertyType = PropertyType{Type: []string{"number"}}
148151
case strings.HasPrefix(mysqlType, "datetime"), strings.HasPrefix(mysqlType, "timestamp"):
149-
return PropertyType{Type: "string", CustomFormat: "date-time", AirbyteType: "timestamp_without_timezone"}
152+
propertyType = PropertyType{Type: []string{"string"}, CustomFormat: "date-time", AirbyteType: "timestamp_without_timezone"}
150153
case strings.HasPrefix(mysqlType, "date"):
151-
return PropertyType{Type: "string", CustomFormat: "date", AirbyteType: "date"}
154+
propertyType = PropertyType{Type: []string{"string"}, CustomFormat: "date", AirbyteType: "date"}
152155
case strings.HasPrefix(mysqlType, "time"):
153-
return PropertyType{Type: "string", CustomFormat: "time", AirbyteType: "time_without_timezone"}
156+
propertyType = PropertyType{Type: []string{"string"}, CustomFormat: "time", AirbyteType: "time_without_timezone"}
154157
default:
155-
return PropertyType{Type: "string"}
158+
propertyType = PropertyType{Type: []string{"string"}}
159+
}
160+
161+
if strings.ToLower(nullable) == "yes" {
162+
propertyType.Type = append(propertyType.Type, "null")
156163
}
164+
165+
return propertyType
157166
}
158167

159168
func (p PlanetScaleEdgeDatabase) Close() error {

cmd/internal/planetscale_edge_database_test.go

+30-23
Original file line numberDiff line numberDiff line change
@@ -359,126 +359,133 @@ func TestRead_CanPickRdonlyForShardedKeyspaces(t *testing.T) {
359359
func TestDiscover_CanPickRightAirbyteType(t *testing.T) {
360360
var tests = []struct {
361361
MysqlType string
362-
JSONSchemaType string
362+
JSONSchemaType []string
363363
AirbyteType string
364364
TreatTinyIntAsBoolean bool
365+
IsNullable string
365366
}{
366367
{
367368
MysqlType: "int(11)",
368-
JSONSchemaType: "number",
369+
JSONSchemaType: []string{"number"},
369370
AirbyteType: "integer",
370371
},
371372
{
372373
MysqlType: "smallint(4)",
373-
JSONSchemaType: "number",
374+
JSONSchemaType: []string{"number"},
374375
AirbyteType: "integer",
375376
},
376377
{
377378
MysqlType: "mediumint(8)",
378-
JSONSchemaType: "number",
379+
JSONSchemaType: []string{"number"},
379380
AirbyteType: "integer",
380381
},
381382
{
382383
MysqlType: "tinyint",
383-
JSONSchemaType: "number",
384+
JSONSchemaType: []string{"number"},
384385
AirbyteType: "integer",
385386
TreatTinyIntAsBoolean: true,
386387
},
387388
{
388389
MysqlType: "tinyint(1)",
389-
JSONSchemaType: "boolean",
390+
JSONSchemaType: []string{"boolean"},
390391
AirbyteType: "",
391392
TreatTinyIntAsBoolean: true,
392393
},
393394
{
394395
MysqlType: "tinyint(1) unsigned",
395-
JSONSchemaType: "boolean",
396+
JSONSchemaType: []string{"boolean"},
396397
AirbyteType: "",
397398
TreatTinyIntAsBoolean: true,
398399
},
399400
{
400401
MysqlType: "tinyint(1)",
401-
JSONSchemaType: "number",
402+
JSONSchemaType: []string{"number"},
402403
AirbyteType: "integer",
403404
TreatTinyIntAsBoolean: false,
404405
},
405406
{
406407
MysqlType: "tinyint(1) unsigned",
407-
JSONSchemaType: "number",
408+
JSONSchemaType: []string{"number"},
408409
AirbyteType: "integer",
409410
TreatTinyIntAsBoolean: false,
410411
},
411412
{
412413
MysqlType: "bigint(16)",
413-
JSONSchemaType: "number",
414+
JSONSchemaType: []string{"number"},
414415
AirbyteType: "integer",
415416
},
416417
{
417418
MysqlType: "bigint unsigned",
418-
JSONSchemaType: "number",
419+
JSONSchemaType: []string{"number"},
419420
AirbyteType: "integer",
420421
},
421422
{
422423
MysqlType: "bigint zerofill",
423-
JSONSchemaType: "number",
424+
JSONSchemaType: []string{"number"},
424425
AirbyteType: "integer",
425426
},
426427
{
427428
MysqlType: "datetime",
428-
JSONSchemaType: "string",
429+
JSONSchemaType: []string{"string"},
429430
AirbyteType: "timestamp_without_timezone",
430431
},
431432
{
432433
MysqlType: "datetime(6)",
433-
JSONSchemaType: "string",
434+
JSONSchemaType: []string{"string"},
434435
AirbyteType: "timestamp_without_timezone",
435436
},
436437
{
437438
MysqlType: "time",
438-
JSONSchemaType: "string",
439+
JSONSchemaType: []string{"string"},
439440
AirbyteType: "time_without_timezone",
440441
},
441442
{
442443
MysqlType: "time(6)",
443-
JSONSchemaType: "string",
444+
JSONSchemaType: []string{"string"},
444445
AirbyteType: "time_without_timezone",
445446
},
446447
{
447448
MysqlType: "date",
448-
JSONSchemaType: "string",
449+
JSONSchemaType: []string{"string"},
449450
AirbyteType: "date",
450451
},
451452
{
452453
MysqlType: "text",
453-
JSONSchemaType: "string",
454+
JSONSchemaType: []string{"string"},
454455
AirbyteType: "",
455456
},
456457
{
457458
MysqlType: "varchar(256)",
458-
JSONSchemaType: "string",
459+
JSONSchemaType: []string{"string"},
459460
AirbyteType: "",
460461
},
462+
{
463+
MysqlType: "varchar(256)",
464+
JSONSchemaType: []string{"string", "null"},
465+
AirbyteType: "",
466+
IsNullable: "YES",
467+
},
461468
{
462469
MysqlType: "decimal(12,5)",
463-
JSONSchemaType: "number",
470+
JSONSchemaType: []string{"number"},
464471
AirbyteType: "",
465472
},
466473
{
467474
MysqlType: "double",
468-
JSONSchemaType: "number",
475+
JSONSchemaType: []string{"number"},
469476
AirbyteType: "",
470477
},
471478
{
472479
MysqlType: "float(30)",
473-
JSONSchemaType: "number",
480+
JSONSchemaType: []string{"number"},
474481
AirbyteType: "",
475482
},
476483
}
477484

478485
for _, typeTest := range tests {
479486

480487
t.Run(fmt.Sprintf("mysql_type_%v", typeTest.MysqlType), func(t *testing.T) {
481-
p := getJsonSchemaType(typeTest.MysqlType, typeTest.TreatTinyIntAsBoolean)
488+
p := getJsonSchemaType(typeTest.MysqlType, typeTest.TreatTinyIntAsBoolean, typeTest.IsNullable)
482489
assert.Equal(t, typeTest.AirbyteType, p.AirbyteType)
483490
assert.Equal(t, typeTest.JSONSchemaType, p.Type)
484491
})

cmd/internal/planetscale_edge_mysql.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan
157157

158158
columnNamesQR, err := p.db.QueryContext(
159159
ctx,
160-
"select column_name, column_type from information_schema.columns where table_name=? AND table_schema=?;",
160+
"select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=?;",
161161
tableName, psc.Database,
162162
)
163163
if err != nil {
@@ -168,12 +168,13 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan
168168
var (
169169
name string
170170
columnType string
171+
nullable string
171172
)
172-
if err = columnNamesQR.Scan(&name, &columnType); err != nil {
173+
if err = columnNamesQR.Scan(&name, &columnType, &nullable); err != nil {
173174
return properties, errors.Wrapf(err, "Unable to scan row for column names & types of table %v", tableName)
174175
}
175176

176-
properties[name] = getJsonSchemaType(columnType, !psc.Options.DoNotTreatTinyIntAsBoolean)
177+
properties[name] = getJsonSchemaType(columnType, !psc.Options.DoNotTreatTinyIntAsBoolean, nullable)
177178
}
178179

179180
if err := columnNamesQR.Err(); err != nil {

cmd/internal/types.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ const (
5454
)
5555

5656
type PropertyType struct {
57-
Type string `json:"type"`
58-
CustomFormat string `json:"format,omitempty"`
59-
AirbyteType string `json:"airbyte_type,omitempty"`
57+
Type []string `json:"type"`
58+
CustomFormat string `json:"format,omitempty"`
59+
AirbyteType string `json:"airbyte_type,omitempty"`
6060
}
6161

6262
type StreamSchema struct {

0 commit comments

Comments
 (0)