diff --git a/cmd/airbyte-source/read.go b/cmd/airbyte-source/read.go index 2c72e1f..3f15235 100644 --- a/cmd/airbyte-source/read.go +++ b/cmd/airbyte-source/read.go @@ -64,7 +64,7 @@ func ReadCommand(ch *Helper) *cobra.Command { catalog, err := readCatalog(readSourceCatalogPath) if err != nil { - ch.Logger.Error("Unable to read catalog") + ch.Logger.Error(fmt.Sprintf("Unable to read catalog: %+v", err)) os.Exit(1) } diff --git a/cmd/airbyte-source/read_test.go b/cmd/airbyte-source/read_test.go index 10f94c8..3100225 100644 --- a/cmd/airbyte-source/read_test.go +++ b/cmd/airbyte-source/read_test.go @@ -28,7 +28,7 @@ func TestRead_StartingGtidsAndState(t *testing.T) { Type: "object", Properties: map[string]internal.PropertyType{ "id": { - Type: "number", + Type: []string{"number"}, AirbyteType: "integer", }, }, @@ -55,7 +55,7 @@ func TestRead_StartingGtidsAndState(t *testing.T) { Type: "object", Properties: map[string]internal.PropertyType{ "id": { - Type: "number", + Type: []string{"number"}, AirbyteType: "integer", }, }, diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 33d90b9..97013e2 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -132,28 +132,37 @@ func (p PlanetScaleEdgeDatabase) getStreamForTable(ctx context.Context, psc Plan } // Convert columnType to Airbyte type. -func getJsonSchemaType(mysqlType string, treatTinyIntAsBoolean bool) PropertyType { +func getJsonSchemaType(mysqlType string, treatTinyIntAsBoolean bool, nullable string) PropertyType { // Support custom airbyte types documented here : // https://docs.airbyte.com/understanding-airbyte/supported-data-types/#the-types + var propertyType PropertyType + switch { case strings.HasPrefix(mysqlType, "tinyint(1)"): if treatTinyIntAsBoolean { - return PropertyType{Type: "boolean"} + propertyType = PropertyType{Type: []string{"boolean"}} + } else { + propertyType = PropertyType{Type: []string{"number"}, AirbyteType: "integer"} } - return PropertyType{Type: "number", AirbyteType: "integer"} case strings.HasPrefix(mysqlType, "int"), strings.HasPrefix(mysqlType, "smallint"), strings.HasPrefix(mysqlType, "mediumint"), strings.HasPrefix(mysqlType, "bigint"), strings.HasPrefix(mysqlType, "tinyint"): - return PropertyType{Type: "number", AirbyteType: "integer"} + propertyType = PropertyType{Type: []string{"number"}, AirbyteType: "integer"} case strings.HasPrefix(mysqlType, "decimal"), strings.HasPrefix(mysqlType, "double"), strings.HasPrefix(mysqlType, "float"): - return PropertyType{Type: "number"} + propertyType = PropertyType{Type: []string{"number"}} case strings.HasPrefix(mysqlType, "datetime"), strings.HasPrefix(mysqlType, "timestamp"): - return PropertyType{Type: "string", CustomFormat: "date-time", AirbyteType: "timestamp_without_timezone"} + propertyType = PropertyType{Type: []string{"string"}, CustomFormat: "date-time", AirbyteType: "timestamp_without_timezone"} case strings.HasPrefix(mysqlType, "date"): - return PropertyType{Type: "string", CustomFormat: "date", AirbyteType: "date"} + propertyType = PropertyType{Type: []string{"string"}, CustomFormat: "date", AirbyteType: "date"} case strings.HasPrefix(mysqlType, "time"): - return PropertyType{Type: "string", CustomFormat: "time", AirbyteType: "time_without_timezone"} + propertyType = PropertyType{Type: []string{"string"}, CustomFormat: "time", AirbyteType: "time_without_timezone"} default: - return PropertyType{Type: "string"} + propertyType = PropertyType{Type: []string{"string"}} + } + + if strings.ToLower(nullable) == "yes" { + propertyType.Type = append(propertyType.Type, "null") } + + return propertyType } func (p PlanetScaleEdgeDatabase) Close() error { diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index 41b947e..dafeb00 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -359,118 +359,125 @@ func TestRead_CanPickRdonlyForShardedKeyspaces(t *testing.T) { func TestDiscover_CanPickRightAirbyteType(t *testing.T) { var tests = []struct { MysqlType string - JSONSchemaType string + JSONSchemaType []string AirbyteType string TreatTinyIntAsBoolean bool + IsNullable string }{ { MysqlType: "int(11)", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", }, { MysqlType: "smallint(4)", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", }, { MysqlType: "mediumint(8)", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", }, { MysqlType: "tinyint", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", TreatTinyIntAsBoolean: true, }, { MysqlType: "tinyint(1)", - JSONSchemaType: "boolean", + JSONSchemaType: []string{"boolean"}, AirbyteType: "", TreatTinyIntAsBoolean: true, }, { MysqlType: "tinyint(1) unsigned", - JSONSchemaType: "boolean", + JSONSchemaType: []string{"boolean"}, AirbyteType: "", TreatTinyIntAsBoolean: true, }, { MysqlType: "tinyint(1)", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", TreatTinyIntAsBoolean: false, }, { MysqlType: "tinyint(1) unsigned", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", TreatTinyIntAsBoolean: false, }, { MysqlType: "bigint(16)", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", }, { MysqlType: "bigint unsigned", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", }, { MysqlType: "bigint zerofill", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "integer", }, { MysqlType: "datetime", - JSONSchemaType: "string", + JSONSchemaType: []string{"string"}, AirbyteType: "timestamp_without_timezone", }, { MysqlType: "datetime(6)", - JSONSchemaType: "string", + JSONSchemaType: []string{"string"}, AirbyteType: "timestamp_without_timezone", }, { MysqlType: "time", - JSONSchemaType: "string", + JSONSchemaType: []string{"string"}, AirbyteType: "time_without_timezone", }, { MysqlType: "time(6)", - JSONSchemaType: "string", + JSONSchemaType: []string{"string"}, AirbyteType: "time_without_timezone", }, { MysqlType: "date", - JSONSchemaType: "string", + JSONSchemaType: []string{"string"}, AirbyteType: "date", }, { MysqlType: "text", - JSONSchemaType: "string", + JSONSchemaType: []string{"string"}, AirbyteType: "", }, { MysqlType: "varchar(256)", - JSONSchemaType: "string", + JSONSchemaType: []string{"string"}, AirbyteType: "", }, + { + MysqlType: "varchar(256)", + JSONSchemaType: []string{"string", "null"}, + AirbyteType: "", + IsNullable: "YES", + }, { MysqlType: "decimal(12,5)", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "", }, { MysqlType: "double", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "", }, { MysqlType: "float(30)", - JSONSchemaType: "number", + JSONSchemaType: []string{"number"}, AirbyteType: "", }, } @@ -478,7 +485,7 @@ func TestDiscover_CanPickRightAirbyteType(t *testing.T) { for _, typeTest := range tests { t.Run(fmt.Sprintf("mysql_type_%v", typeTest.MysqlType), func(t *testing.T) { - p := getJsonSchemaType(typeTest.MysqlType, typeTest.TreatTinyIntAsBoolean) + p := getJsonSchemaType(typeTest.MysqlType, typeTest.TreatTinyIntAsBoolean, typeTest.IsNullable) assert.Equal(t, typeTest.AirbyteType, p.AirbyteType) assert.Equal(t, typeTest.JSONSchemaType, p.Type) }) diff --git a/cmd/internal/planetscale_edge_mysql.go b/cmd/internal/planetscale_edge_mysql.go index b721656..bb3fffc 100644 --- a/cmd/internal/planetscale_edge_mysql.go +++ b/cmd/internal/planetscale_edge_mysql.go @@ -157,7 +157,7 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan columnNamesQR, err := p.db.QueryContext( ctx, - "select column_name, column_type from information_schema.columns where table_name=? AND table_schema=?;", + "select column_name, column_type, is_nullable from information_schema.columns where table_name=? AND table_schema=?;", tableName, psc.Database, ) if err != nil { @@ -168,12 +168,13 @@ func (p planetScaleEdgeMySQLAccess) GetTableSchema(ctx context.Context, psc Plan var ( name string columnType string + nullable string ) - if err = columnNamesQR.Scan(&name, &columnType); err != nil { + if err = columnNamesQR.Scan(&name, &columnType, &nullable); err != nil { return properties, errors.Wrapf(err, "Unable to scan row for column names & types of table %v", tableName) } - properties[name] = getJsonSchemaType(columnType, !psc.Options.DoNotTreatTinyIntAsBoolean) + properties[name] = getJsonSchemaType(columnType, !psc.Options.DoNotTreatTinyIntAsBoolean, nullable) } if err := columnNamesQR.Err(); err != nil { diff --git a/cmd/internal/types.go b/cmd/internal/types.go index e31f28b..cd5bc2e 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -54,9 +54,9 @@ const ( ) type PropertyType struct { - Type string `json:"type"` - CustomFormat string `json:"format,omitempty"` - AirbyteType string `json:"airbyte_type,omitempty"` + Type []string `json:"type"` + CustomFormat string `json:"format,omitempty"` + AirbyteType string `json:"airbyte_type,omitempty"` } type StreamSchema struct {