Skip to content

Draft: Do not review, add support for specific geometry types #2803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 71 additions & 11 deletions flow/connectors/mysql/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,26 @@ func qkindFromMysql(field *mysql.Field) (qvalue.QValueKind, error) {
case mysql.MYSQL_TYPE_VAR_STRING, mysql.MYSQL_TYPE_STRING:
return qvalue.QValueKindString, nil
case mysql.MYSQL_TYPE_GEOMETRY:
return qvalue.QValueKindGeometry, nil
// Check the column type name to determine specific geometry type
colType := strings.ToLower(string(field.Name))
switch {
case strings.Contains(colType, "point"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HasPrefix?

Copy link
Member

@serprex serprex Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field.Name is column's name, not column's type. We don't have enough information here to figure it out. May need to make geometry all fall under one QValueKind & put SRID as a field for QValueGeometry

return qvalue.QValueKindPoint, nil
case strings.Contains(colType, "linestring"):
return qvalue.QValueKindLineString, nil
case strings.Contains(colType, "polygon"):
return qvalue.QValueKindPolygon, nil
case strings.Contains(colType, "multipoint"):
return qvalue.QValueKindMultiPoint, nil
case strings.Contains(colType, "multilinestring"):
return qvalue.QValueKindMultiLineString, nil
case strings.Contains(colType, "multipolygon"):
return qvalue.QValueKindMultiPolygon, nil
case strings.Contains(colType, "geometrycollection"):
return qvalue.QValueKindGeometryCollection, nil
default:
return qvalue.QValueKindGeometry, nil
}
case mysql.MYSQL_TYPE_VECTOR:
return qvalue.QValueKindArrayFloat32, nil
default:
Expand Down Expand Up @@ -154,8 +173,25 @@ func qkindFromMysqlColumnType(ct string) (qvalue.QValueKind, error) {
}
case "vector":
return qvalue.QValueKindArrayFloat32, nil
case "geometry", "point", "polygon", "linestring", "multipoint", "multipolygon", "geomcollection":
return qvalue.QValueKindGeometry, nil
case "geometry", "point", "linestring", "polygon", "multipoint", "multilinestring", "multipolygon", "geometrycollection":
switch ct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier check is on strings.ToLower(ct) this check is on ct,
maybe we should instead merge with parent switch?

case "point":
return qvalue.QValueKindPoint, nil
case "linestring":
return qvalue.QValueKindLineString, nil
case "polygon":
return qvalue.QValueKindPolygon, nil
case "multipoint":
return qvalue.QValueKindMultiPoint, nil
case "multilinestring":
return qvalue.QValueKindMultiLineString, nil
case "multipolygon":
return qvalue.QValueKindMultiPolygon, nil
case "geometrycollection":
return qvalue.QValueKindGeometryCollection, nil
default:
return qvalue.QValueKindGeometry, nil
}
default:
return qvalue.QValueKind(""), fmt.Errorf("unknown mysql type %s", ct)
}
Expand Down Expand Up @@ -213,17 +249,39 @@ func geometryValueFromBytes(wkbData []byte) (string, error) {
return wkt, nil
}

// Helper function to process geometry data and return a QValueGeometry
func processGeometryData(data []byte) qvalue.QValueGeometry {
// Helper function to process geometry data and return a QValue
func processGeometryData(data []byte, qkind qvalue.QValueKind) qvalue.QValue {
var strVal string
// For geometry data, we need to convert from MySQL's binary format to WKT
if len(data) > 4 {
wkt, err := geometryValueFromBytes(data)
if err == nil {
return qvalue.QValueGeometry{Val: wkt}
strVal = wkt
} else {
strVal = string(data)
}
} else {
strVal = string(data)
}

switch qkind {
case qvalue.QValueKindPoint:
return qvalue.QValuePoint{Val: strVal}
case qvalue.QValueKindLineString:
return qvalue.QValueLineString{Val: strVal}
case qvalue.QValueKindPolygon:
return qvalue.QValuePolygon{Val: strVal}
case qvalue.QValueKindMultiPoint:
return qvalue.QValueMultiPoint{Val: strVal}
case qvalue.QValueKindMultiLineString:
return qvalue.QValueMultiLineString{Val: strVal}
case qvalue.QValueKindMultiPolygon:
return qvalue.QValueMultiPolygon{Val: strVal}
case qvalue.QValueKindGeometryCollection:
return qvalue.QValueGeometryCollection{Val: strVal}
default:
return qvalue.QValueGeometry{Val: strVal}
}
strVal := string(data)
return qvalue.QValueGeometry{Val: strVal}
}

func QValueFromMysqlFieldValue(qkind qvalue.QValueKind, fv mysql.FieldValue) (qvalue.QValue, error) {
Expand Down Expand Up @@ -306,8 +364,10 @@ func QValueFromMysqlFieldValue(qkind qvalue.QValueKind, fv mysql.FieldValue) (qv
return qvalue.QValueBytes{Val: slices.Clone(v)}, nil
case qvalue.QValueKindJSON:
return qvalue.QValueJSON{Val: string(v)}, nil
case qvalue.QValueKindGeometry:
return processGeometryData(v), nil
case qvalue.QValueKindGeometry, qvalue.QValueKindPoint, qvalue.QValueKindLineString,
qvalue.QValueKindPolygon, qvalue.QValueKindMultiPoint, qvalue.QValueKindMultiLineString,
qvalue.QValueKindMultiPolygon, qvalue.QValueKindGeometryCollection:
return processGeometryData(v, qkind), nil
case qvalue.QValueKindNumeric:
val, err := decimal.NewFromString(unsafeString)
if err != nil {
Expand Down Expand Up @@ -438,7 +498,7 @@ func QValueFromMysqlRowEvent(
return qvalue.QValueJSON{Val: string(val)}, nil
case qvalue.QValueKindGeometry:
// Handle geometry data as binary (WKB format)
return processGeometryData(val), nil
return processGeometryData(val, qkind), nil
case qvalue.QValueKindArrayFloat32:
floats := make([]float32, 0, len(val)/4)
for i := 0; i < len(val); i += 4 {
Expand Down
Loading
Loading