diff --git a/flow/connectors/mysql/qvalue_convert.go b/flow/connectors/mysql/qvalue_convert.go index 4113f149eb..0d826b91b3 100644 --- a/flow/connectors/mysql/qvalue_convert.go +++ b/flow/connectors/mysql/qvalue_convert.go @@ -92,7 +92,26 @@ func qkindFromMysql(field *mysql.Field) (qvalue.QValueKind, error) { case mysql.MYSQL_TYPE_VAR_STRING, mysql.MYSQL_TYPE_STRING: return qvalue.QValueKindString, nil case mysql.MYSQL_TYPE_GEOMETRY: - return qvalue.QValueKindGeometry, nil + // Check the column type name to determine specific geometry type + colType := strings.ToLower(string(field.Name)) + switch { + case strings.Contains(colType, "point"): + return qvalue.QValueKindPoint, nil + case strings.Contains(colType, "linestring"): + return qvalue.QValueKindLineString, nil + case strings.Contains(colType, "polygon"): + return qvalue.QValueKindPolygon, nil + case strings.Contains(colType, "multipoint"): + return qvalue.QValueKindMultiPoint, nil + case strings.Contains(colType, "multilinestring"): + return qvalue.QValueKindMultiLineString, nil + case strings.Contains(colType, "multipolygon"): + return qvalue.QValueKindMultiPolygon, nil + case strings.Contains(colType, "geometrycollection"): + return qvalue.QValueKindGeometryCollection, nil + default: + return qvalue.QValueKindGeometry, nil + } case mysql.MYSQL_TYPE_VECTOR: return qvalue.QValueKindArrayFloat32, nil default: @@ -154,8 +173,25 @@ func qkindFromMysqlColumnType(ct string) (qvalue.QValueKind, error) { } case "vector": return qvalue.QValueKindArrayFloat32, nil - case "geometry", "point", "polygon", "linestring", "multipoint", "multipolygon", "geomcollection": - return qvalue.QValueKindGeometry, nil + case "geometry", "point", "linestring", "polygon", "multipoint", "multilinestring", "multipolygon", "geometrycollection": + switch ct { + case "point": + return qvalue.QValueKindPoint, nil + case "linestring": + return qvalue.QValueKindLineString, nil + case "polygon": + return qvalue.QValueKindPolygon, nil + case "multipoint": + return qvalue.QValueKindMultiPoint, nil + case "multilinestring": + return qvalue.QValueKindMultiLineString, nil + case "multipolygon": + return qvalue.QValueKindMultiPolygon, nil + case "geometrycollection": + return qvalue.QValueKindGeometryCollection, nil + default: + return qvalue.QValueKindGeometry, nil + } default: return qvalue.QValueKind(""), fmt.Errorf("unknown mysql type %s", ct) } @@ -213,17 +249,39 @@ func geometryValueFromBytes(wkbData []byte) (string, error) { return wkt, nil } -// Helper function to process geometry data and return a QValueGeometry -func processGeometryData(data []byte) qvalue.QValueGeometry { +// Helper function to process geometry data and return a QValue +func processGeometryData(data []byte, qkind qvalue.QValueKind) qvalue.QValue { + var strVal string // For geometry data, we need to convert from MySQL's binary format to WKT if len(data) > 4 { wkt, err := geometryValueFromBytes(data) if err == nil { - return qvalue.QValueGeometry{Val: wkt} + strVal = wkt + } else { + strVal = string(data) } + } else { + strVal = string(data) + } + + switch qkind { + case qvalue.QValueKindPoint: + return qvalue.QValuePoint{Val: strVal} + case qvalue.QValueKindLineString: + return qvalue.QValueLineString{Val: strVal} + case qvalue.QValueKindPolygon: + return qvalue.QValuePolygon{Val: strVal} + case qvalue.QValueKindMultiPoint: + return qvalue.QValueMultiPoint{Val: strVal} + case qvalue.QValueKindMultiLineString: + return qvalue.QValueMultiLineString{Val: strVal} + case qvalue.QValueKindMultiPolygon: + return qvalue.QValueMultiPolygon{Val: strVal} + case qvalue.QValueKindGeometryCollection: + return qvalue.QValueGeometryCollection{Val: strVal} + default: + return qvalue.QValueGeometry{Val: strVal} } - strVal := string(data) - return qvalue.QValueGeometry{Val: strVal} } func QValueFromMysqlFieldValue(qkind qvalue.QValueKind, fv mysql.FieldValue) (qvalue.QValue, error) { @@ -306,8 +364,10 @@ func QValueFromMysqlFieldValue(qkind qvalue.QValueKind, fv mysql.FieldValue) (qv return qvalue.QValueBytes{Val: slices.Clone(v)}, nil case qvalue.QValueKindJSON: return qvalue.QValueJSON{Val: string(v)}, nil - case qvalue.QValueKindGeometry: - return processGeometryData(v), nil + case qvalue.QValueKindGeometry, qvalue.QValueKindPoint, qvalue.QValueKindLineString, + qvalue.QValueKindPolygon, qvalue.QValueKindMultiPoint, qvalue.QValueKindMultiLineString, + qvalue.QValueKindMultiPolygon, qvalue.QValueKindGeometryCollection: + return processGeometryData(v, qkind), nil case qvalue.QValueKindNumeric: val, err := decimal.NewFromString(unsafeString) if err != nil { @@ -438,7 +498,7 @@ func QValueFromMysqlRowEvent( return qvalue.QValueJSON{Val: string(val)}, nil case qvalue.QValueKindGeometry: // Handle geometry data as binary (WKB format) - return processGeometryData(val), nil + return processGeometryData(val, qkind), nil case qvalue.QValueKindArrayFloat32: floats := make([]float32, 0, len(val)/4) for i := 0; i < len(val); i += 4 { diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 33161610fa..06f3df776c 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -1067,8 +1067,8 @@ func (s ClickHouseSuite) Test_Geometric_Types() { e2e.EnvWaitForCount(env, s, "waiting for initial snapshot count", dstTableName, "id", 2) // Insert a third row to test CDC - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` - INSERT INTO %[1]s ( + _, err = s.Conn().Exec(s.t.Context(), ` + INSERT INTO `+srcFullName+`( point_col, line_col, lseg_col, box_col, path_col, polygon_col, circle_col ) VALUES ( '(100,200)', -- POINT @@ -1078,7 +1078,7 @@ func (s ClickHouseSuite) Test_Geometric_Types() { '((100,200),(300,400),(500,600))', -- PATH '((100,200),(300,400),(500,600),(100,200))', -- POLYGON '<(100,200),300>' -- CIRCLE - );`, srcFullName)) + );`) require.NoError(s.t, err) // Wait for CDC to replicate the new row @@ -1176,25 +1176,52 @@ func (s ClickHouseSuite) Test_MySQL_Geometric_Types() { srcTableName := "test_mysql_geometric_types" srcFullName := s.attachSchemaSuffix(srcTableName) dstTableName := "test_mysql_geometric_types" + dbName := "e2e_test_" + s.suffix - // Create a table with a geometry column that can store any geometric type - _, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %[1]s( + // Set MySQL system variables and select database + err := s.source.Exec(s.t.Context(), fmt.Sprintf(` + SET SESSION sql_mode = ''; + USE %s; + `, dbName)) + require.NoError(s.t, err) + + // Create a table with both generic geometry and specific geometry type columns + err = s.source.Exec(s.t.Context(), fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %[1]s( id serial PRIMARY KEY, - geometry_col GEOMETRY - ); + geometry_col GEOMETRY NOT NULL SRID 0 + )`, srcTableName)) + require.NoError(s.t, err) - -- Insert test data with various geometric types - INSERT INTO %[1]s (geometry_col) VALUES - (ST_GeomFromText('POINT(1 2)')), - (ST_GeomFromText('LINESTRING(1 2, 3 4)')), - (ST_GeomFromText('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))')), - (ST_GeomFromText('MULTIPOINT((1 2), (3 4))')), - (ST_GeomFromText('MULTILINESTRING((1 2, 3 4), (5 6, 7 8))')), - (ST_GeomFromText('MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)), ((4 4, 6 4, 6 6, 4 6, 4 4)))')), - (ST_GeomFromText('GEOMETRYCOLLECTION(POINT(1 2), LINESTRING(1 2, 3 4))'));`, srcFullName)) + // Insert test data with various geometric types in the generic geometry column + err = s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %[1]s (geometry_col) VALUES + (ST_GeomFromWKT('POINT(1 2)', 0)), + (ST_GeomFromWKT('LINESTRING(1 2, 3 4)', 0)), + (ST_GeomFromWKT('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))', 0)), + (ST_GeomFromWKT('MULTIPOINT((1 2), (3 4))', 0)), + (ST_GeomFromWKT('MULTILINESTRING((1 2, 3 4), (5 6, 7 8))', 0)), + (ST_GeomFromWKT('MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)), ((4 4, 6 4, 6 6, 4 6, 4 4)))', 0)), + (ST_GeomFromWKT('GEOMETRYCOLLECTION(POINT(1 2), LINESTRING(1 2, 3 4))', 0))`, srcTableName)) require.NoError(s.t, err) + // Verify the data was inserted correctly in MySQL + rows, err := s.source.GetRows(s.t.Context(), s.suffix, srcTableName, ` + id, ST_AsWKT(geometry_col) as wkt`) + require.NoError(s.t, err) + + expectedGeometries := []string{ + "POINT(1 2)", + "LINESTRING(1 2,3 4)", + "POLYGON((1 1,3 1,3 3,1 3,1 1))", + "MULTIPOINT((1 2),(3 4))", + "MULTILINESTRING((1 2,3 4),(5 6,7 8))", + "MULTIPOLYGON(((1 1,3 1,3 3,1 3,1 1)),((4 4,6 4,6 6,4 6,4 4)))", + "GEOMETRYCOLLECTION(POINT(1 2),LINESTRING(1 2,3 4))", + } + + for i, record := range rows.Records { + require.Equal(s.t, expectedGeometries[i], record[1], "MySQL geometry value mismatch at row %d", i+1) + } + connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("clickhouse_test_mysql_geometric_types"), TableNameMapping: map[string]string{srcFullName: dstTableName}, @@ -1202,6 +1229,10 @@ func (s ClickHouseSuite) Test_MySQL_Geometric_Types() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{ + "PEERDB_DEVELOPMENT": "true", + "PEERDB_LOG_LEVEL": "debug", + } tc := e2e.NewTemporalClient(s.t) env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) @@ -1211,23 +1242,46 @@ func (s ClickHouseSuite) Test_MySQL_Geometric_Types() { e2e.EnvWaitForCount(env, s, "waiting for initial snapshot count", dstTableName, "id", 7) // Insert additional rows to test CDC - _, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(` - INSERT INTO %[1]s (geometry_col) VALUES - (ST_GeomFromText('POINT(10 20)')), - (ST_GeomFromText('LINESTRING(10 20, 30 40)')), - (ST_GeomFromText('POLYGON((10 10, 30 10, 30 30, 10 30, 10 10))'));`, srcFullName)) + err = s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %[1]s (geometry_col) VALUES + (ST_GeomFromWKT('POINT(10 20)', 0)), + (ST_GeomFromWKT('LINESTRING(10 20, 30 40)', 0)), + (ST_GeomFromWKT('POLYGON((10 10, 30 10, 30 30, 10 30, 10 10))', 0)), + (ST_GeomFromWKT('MULTIPOINT((10 20), (30 40))', 0)), + (ST_GeomFromWKT('MULTILINESTRING((10 20, 30 40), (50 60, 70 80))', 0)), + (ST_GeomFromWKT('MULTIPOLYGON(((10 10, 30 10, 30 30, 10 30, 10 10)), ((40 40, 60 40, 60 60, 40 60, 40 40)))', 0)), + (ST_GeomFromWKT('GEOMETRYCOLLECTION(POINT(10 20), LINESTRING(10 20, 30 40))', 0))`, srcTableName)) + require.NoError(s.t, err) + + // Verify the CDC data was inserted correctly in MySQL + rows, err = s.source.GetRows(s.t.Context(), s.suffix, srcTableName, ` + id, ST_AsWKT(geometry_col) as wkt`) require.NoError(s.t, err) + expectedGeometries = []string{ + "POINT(10 20)", + "LINESTRING(10 20,30 40)", + "POLYGON((10 10,30 10,30 30,10 30,10 10))", + "MULTIPOINT((10 20),(30 40))", + "MULTILINESTRING((10 20,30 40),(50 60,70 80))", + "MULTIPOLYGON(((10 10,30 10,30 30,10 30,10 10)),((40 40,60 40,60 60,40 60,40 40)))", + "GEOMETRYCOLLECTION(POINT(10 20),LINESTRING(10 20,30 40))", + } + + for i, record := range rows.Records { + require.Equal(s.t, expectedGeometries[i], record[1], "MySQL cdc geometry value mismatch at row %d", i+1) + } + // Wait for CDC to replicate the new rows - e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id", 10) + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id", 14) // Verify that the data was correctly replicated - rows, err := s.GetRows(dstTableName, "id, geometry_col") + rows, err = s.GetRows(dstTableName, "id, geometry_col") require.NoError(s.t, err) - require.Len(s.t, rows.Records, 10, "expected 10 rows") + require.Len(s.t, rows.Records, 14, "expected 14 rows") // Expected WKT format values for each geometric type expectedValues := []string{ + // Initial snapshot values "POINT(1 2)", "LINESTRING(1 2, 3 4)", "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", @@ -1235,9 +1289,14 @@ func (s ClickHouseSuite) Test_MySQL_Geometric_Types() { "MULTILINESTRING((1 2, 3 4), (5 6, 7 8))", "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)), ((4 4, 6 4, 6 6, 4 6, 4 4)))", "GEOMETRYCOLLECTION(POINT(1 2), LINESTRING(1 2, 3 4))", + // CDC values "POINT(10 20)", "LINESTRING(10 20, 30 40)", "POLYGON((10 10, 30 10, 30 30, 10 30, 10 10))", + "MULTIPOINT((10 20), (30 40))", + "MULTILINESTRING((10 20, 30 40), (50 60, 70 80))", + "MULTIPOLYGON(((10 10, 30 10, 30 30, 10 30, 10 10)), ((40 40, 60 40, 60 60, 40 60, 40 40)))", + "GEOMETRYCOLLECTION(POINT(10 20), LINESTRING(10 20, 30 40))", } for i, row := range rows.Records { @@ -1321,3 +1380,327 @@ func (s ClickHouseSuite) Test_SchemaAsColumn() { env.Cancel(s.t.Context()) e2e.RequireEnvCanceled(s.t, env) } + +func (s ClickHouseSuite) Test_MySQL_Generic_Geometric_Types() { + if _, ok := s.source.(*e2e.MySqlSource); !ok { + s.t.Skip("only applies to mysql") + } + + srcTableName := "test_mysql_generic_geometric_types" + srcFullName := s.attachSchemaSuffix(srcTableName) + dstTableName := "test_mysql_generic_geometric_types" + + // Create a table with a generic GEOMETRY column + err := s.source.Exec(s.t.Context(), fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %[1]s( + id serial PRIMARY KEY, + geometry_col GEOMETRY + )`, srcFullName)) + require.NoError(s.t, err) + + // Insert test data with various geometric types in the generic geometry column + err = s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %[1]s (geometry_col) VALUES + (ST_PointFromText('POINT(1 2)')), + (ST_LineFromText('LINESTRING(1 2, 3 4)')), + (ST_PolygonFromText('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))')), + (ST_MPointFromText('MULTIPOINT((1 2), (3 4))')), + (ST_MLineFromText('MULTILINESTRING((1 2, 3 4), (5 6, 7 8))')), + (ST_MPolyFromText('MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)), ((4 4, 6 4, 6 6, 4 6, 4 4)))')), + (ST_GeomCollFromText('GEOMETRYCOLLECTION(POINT(1 2), LINESTRING(1 2, 3 4))'))`, srcFullName)) + require.NoError(s.t, err) + + // Verify the data was inserted correctly in MySQL + rows, err := s.source.GetRows(s.t.Context(), s.suffix, srcTableName, `id, ST_AsWKT(geometry_col) as wkt`) + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 7, "expected 7 rows") + + expectedGeometries := []string{ + "POINT(1 2)", + "LINESTRING(1 2,3 4)", + "POLYGON((1 1,3 1,3 3,1 3,1 1))", + "MULTIPOINT((1 2),(3 4))", + "MULTILINESTRING((1 2,3 4),(5 6,7 8))", + "MULTIPOLYGON(((1 1,3 1,3 3,1 3,1 1)),((4 4,6 4,6 6,4 6,4 4)))", + "GEOMETRYCOLLECTION(POINT(1 2),LINESTRING(1 2,3 4))", + } + + for i, row := range rows.Records { + require.Equal(s.t, expectedGeometries[i], row[1].Value(), "MySQL geometry value mismatch at row %d", i+1) + } + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("clickhouse_test_mysql_geometric_types"), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{ + "PEERDB_DEVELOPMENT": "true", + "PEERDB_LOG_LEVEL": "debug", + } + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + // Wait for initial snapshot to complete + e2e.EnvWaitForCount(env, s, "waiting for initial snapshot count", dstTableName, "id", 7) + + // Insert additional rows to test CDC + err = s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %[1]s (geometry_col) VALUES + (ST_PointFromText('POINT(10 20)')), + (ST_LineFromText('LINESTRING(10 20, 30 40)')), + (ST_PolygonFromText('POLYGON((10 10, 30 10, 30 30, 10 30, 10 10))')), + (ST_MPointFromText('MULTIPOINT((10 20), (30 40))')), + (ST_MLineFromText('MULTILINESTRING((10 20, 30 40), (50 60, 70 80))')), + (ST_MPolyFromText('MULTIPOLYGON(((10 10, 30 10, 30 30, 10 30, 10 10)), ((40 40, 60 40, 60 60, 40 60, 40 40)))')), + (ST_GeomCollFromText('GEOMETRYCOLLECTION(POINT(10 20), LINESTRING(10 20, 30 40))'))`, srcFullName)) + require.NoError(s.t, err) + + // Verify the CDC data was inserted correctly in MySQL + rows, err = s.source.GetRows(s.t.Context(), s.suffix, srcTableName, `id, ST_AsWKT(geometry_col) as wkt`) + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 14, "expected 14 rows") + + expectedGeometries = []string{ + "POINT(10 20)", + "LINESTRING(10 20,30 40)", + "POLYGON((10 10,30 10,30 30,10 30,10 10))", + "MULTIPOINT((10 20),(30 40))", + "MULTILINESTRING((10 20,30 40),(50 60,70 80))", + "MULTIPOLYGON(((10 10,30 10,30 30,10 30,10 10)),((40 40,60 40,60 60,40 60,40 40)))", + "GEOMETRYCOLLECTION(POINT(10 20),LINESTRING(10 20,30 40))", + } + + // Check only the new rows (index 7 onwards) + for i, row := range rows.Records[7:] { + require.Equal(s.t, expectedGeometries[i], row[1].Value(), "MySQL CDC geometry value mismatch at row %d", i+8) + } + + // Wait for CDC to replicate the new rows + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id", 14) + + // Verify that the data was correctly replicated + rows, err = s.GetRows(dstTableName, "id, geometry_col") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 14, "expected 14 rows") + + // Expected WKT format values for each geometric type + expectedValues := []string{ + // Initial snapshot values + "POINT(1 2)", + "LINESTRING(1 2, 3 4)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT((1 2), (3 4))", + "MULTILINESTRING((1 2, 3 4), (5 6, 7 8))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)), ((4 4, 6 4, 6 6, 4 6, 4 4)))", + "GEOMETRYCOLLECTION(POINT(1 2), LINESTRING(1 2, 3 4))", + // CDC values + "POINT(10 20)", + "LINESTRING(10 20, 30 40)", + "POLYGON((10 10, 30 10, 30 30, 10 30, 10 10))", + "MULTIPOINT((10 20), (30 40))", + "MULTILINESTRING((10 20, 30 40), (50 60, 70 80))", + "MULTIPOLYGON(((10 10, 30 10, 30 30, 10 30, 10 10)), ((40 40, 60 40, 60 60, 40 60, 40 40)))", + "GEOMETRYCOLLECTION(POINT(10 20), LINESTRING(10 20, 30 40))", + } + + for i, row := range rows.Records { + require.Len(s.t, row, 2, "expected 2 columns") + geometryVal := row[1].Value() + require.Equal(s.t, expectedValues[i], geometryVal, "geometry_col value mismatch at row %d", i+1) + } + + // Clean up + env.Cancel(s.t.Context()) + e2e.RequireEnvCanceled(s.t, env) +} + +func (s ClickHouseSuite) Test_MySQL_Specific_Geometric_Types() { + if _, ok := s.source.(*e2e.MySqlSource); !ok { + s.t.Skip("only applies to mysql") + } + + srcTableName := "test_mysql_specific_geometric_types" + srcFullName := s.attachSchemaSuffix(srcTableName) + dstTableName := "test_mysql_specific_geometric_types" + dbName := "e2e_test_" + s.suffix + + // Set MySQL system variables and select database + err := s.source.Exec(s.t.Context(), fmt.Sprintf(` + SET SESSION sql_mode = ''; + USE %s; + `, dbName)) + require.NoError(s.t, err) + + // Create a table with specific geometry type columns + err = s.source.Exec(s.t.Context(), ` + CREATE TABLE IF NOT EXISTS`+srcTableName+`( + id serial PRIMARY KEY, + point_col POINT NOT NULL SRID 0, + linestring_col LINESTRING NOT NULL SRID 0, + polygon_col POLYGON NOT NULL SRID 0, + multipoint_col MULTIPOINT NOT NULL SRID 0, + multilinestring_col MULTILINESTRING NOT NULL SRID 0, + multipolygon_col MULTIPOLYGON NOT NULL SRID 0, + geometrycollection_col GEOMETRYCOLLECTION NOT NULL SRID 0 + )`) + require.NoError(s.t, err) + + // Insert test data with specific geometric types + err = s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %[1]s + (point_col, linestring_col, polygon_col, multipoint_col, multilinestring_col, multipolygon_col, geometrycollection_col) VALUES + (ST_GeomFromWKT('POINT(1 2)', 0), + ST_GeomFromWKT('LINESTRING(1 2, 3 4)', 0), + ST_GeomFromWKT('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))', 0), + ST_GeomFromWKT('MULTIPOINT((1 2), (3 4))', 0), + ST_GeomFromWKT('MULTILINESTRING((1 2, 3 4), (5 6, 7 8))', 0), + ST_GeomFromWKT('MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)), ((4 4, 6 4, 6 6, 4 6, 4 4)))', 0), + ST_GeomFromWKT('GEOMETRYCOLLECTION(POINT(1 2), LINESTRING(1 2, 3 4))', 0))`, srcTableName)) + require.NoError(s.t, err) + + // Verify the data was inserted correctly in MySQL + rows, err := s.source.GetRows(s.t.Context(), s.suffix, srcTableName, ` + id, + ST_AsWKT(point_col) as point_wkt, + ST_AsWKT(linestring_col) as linestring_wkt, + ST_AsWKT(polygon_col) as polygon_wkt, + ST_AsWKT(multipoint_col) as multipoint_wkt, + ST_AsWKT(multilinestring_col) as multilinestring_wkt, + ST_AsWKT(multipolygon_col) as multipolygon_wkt, + ST_AsWKT(geometrycollection_col) as geometrycollection_wkt`) + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 1, "expected 1 row") + + row := rows.Records[0] + require.Equal(s.t, "POINT(1 2)", row[1].Value(), "MySQL point value mismatch") + require.Equal(s.t, "LINESTRING(1 2,3 4)", row[2].Value(), "MySQL linestring value mismatch") + require.Equal(s.t, "POLYGON((1 1,3 1,3 3,1 3,1 1))", row[3].Value(), "MySQL polygon value mismatch") + require.Equal(s.t, "MULTIPOINT((1 2),(3 4))", row[4].Value(), "MySQL multipoint value mismatch") + require.Equal(s.t, "MULTILINESTRING((1 2,3 4),(5 6,7 8))", row[5].Value(), "MySQL multilinestring value mismatch") + require.Equal(s.t, "MULTIPOLYGON(((1 1,3 1,3 3,1 3,1 1)),((4 4,6 4,6 6,4 6,4 4)))", row[6].Value(), "MySQL multipolygon value mismatch") + require.Equal(s.t, "GEOMETRYCOLLECTION(POINT(1 2),LINESTRING(1 2,3 4))", row[7].Value(), "MySQL geometrycollection value mismatch") + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("clickhouse_test_mysql_specific_geometric_types"), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{ + "PEERDB_DEVELOPMENT": "true", + "PEERDB_LOG_LEVEL": "debug", + } + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + // Wait for initial snapshot to complete + e2e.EnvWaitForCount(env, s, "waiting for initial snapshot count", dstTableName, "id", 1) + + // Insert additional rows to test CDC + err = s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %[1]s + (point_col, linestring_col, polygon_col, multipoint_col, multilinestring_col, multipolygon_col, geometrycollection_col) VALUES + (ST_GeomFromWKT('POINT(10 20)', 0), + ST_GeomFromWKT('LINESTRING(10 20, 30 40)', 0), + ST_GeomFromWKT('POLYGON((10 10, 30 10, 30 30, 10 30, 10 10))', 0), + ST_GeomFromWKT('MULTIPOINT((10 20), (30 40))', 0), + ST_GeomFromWKT('MULTILINESTRING((10 20, 30 40), (50 60, 70 80))', 0), + ST_GeomFromWKT('MULTIPOLYGON(((10 10, 30 10, 30 30, 10 30, 10 10)), ((40 40, 60 40, 60 60, 40 60, 40 40)))', 0), + ST_GeomFromWKT('GEOMETRYCOLLECTION(POINT(10 20), LINESTRING(10 20, 30 40))', 0))`, srcTableName)) + require.NoError(s.t, err) + + // Verify the CDC data was inserted correctly in MySQL + rows, err = s.source.GetRows(s.t.Context(), s.suffix, srcTableName, ` + id, + ST_AsWKT(point_col) as point_wkt, + ST_AsWKT(linestring_col) as linestring_wkt, + ST_AsWKT(polygon_col) as polygon_wkt, + ST_AsWKT(multipoint_col) as multipoint_wkt, + ST_AsWKT(multilinestring_col) as multilinestring_wkt, + ST_AsWKT(multipolygon_col) as multipolygon_wkt, + ST_AsWKT(geometrycollection_col) as geometrycollection_wkt`) + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + + row = rows.Records[1] // Check the second row (CDC data) + require.Equal(s.t, "POINT(10 20)", row[1].Value(), "MySQL CDC point value mismatch") + require.Equal(s.t, "LINESTRING(10 20,30 40)", row[2].Value(), "MySQL CDC linestring value mismatch") + require.Equal(s.t, "POLYGON((10 10,30 10,30 30,10 30,10 10))", row[3].Value(), "MySQL CDC polygon value mismatch") + require.Equal(s.t, "MULTIPOINT((10 20),(30 40))", row[4].Value(), "MySQL CDC multipoint value mismatch") + require.Equal(s.t, "MULTILINESTRING((10 20,30 40),(50 60,70 80))", row[5].Value(), "MySQL CDC multilinestring value mismatch") + require.Equal(s.t, "MULTIPOLYGON(((10 10,30 10,30 30,10 30,10 10)),((40 40,60 40,60 60,40 60,40 40)))", + row[6].Value(), "MySQL CDC multipolygon value mismatch") + require.Equal(s.t, "GEOMETRYCOLLECTION(POINT(10 20),LINESTRING(10 20,30 40))", + row[7].Value(), "MySQL CDC geometrycollection value mismatch") + + // Wait for CDC to replicate the new rows + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id", 2) + + // Verify that the data was correctly replicated + rows, err = s.GetRows(dstTableName, `id, point_col, linestring_col, polygon_col, + multipoint_col, multilinestring_col, multipolygon_col, geometrycollection_col`) + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + + // Expected WKT format values for each geometric type + expectedValues := []struct { + point string + linestring string + polygon string + multipoint string + multilinestring string + multipolygon string + geometrycollection string + }{ + { + point: "POINT(1 2)", + linestring: "LINESTRING(1 2, 3 4)", + polygon: "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + multipoint: "MULTIPOINT((1 2), (3 4))", + multilinestring: "MULTILINESTRING((1 2, 3 4), (5 6, 7 8))", + multipolygon: "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)), ((4 4, 6 4, 6 6, 4 6, 4 4)))", + geometrycollection: "GEOMETRYCOLLECTION(POINT(1 2), LINESTRING(1 2, 3 4))", + }, + { + point: "POINT(10 20)", + linestring: "LINESTRING(10 20, 30 40)", + polygon: "POLYGON((10 10, 30 10, 30 30, 10 30, 10 10))", + multipoint: "MULTIPOINT((10 20), (30 40))", + multilinestring: "MULTILINESTRING((10 20, 30 40), (50 60, 70 80))", + multipolygon: "MULTIPOLYGON(((10 10, 30 10, 30 30, 10 30, 10 10)), ((40 40, 60 40, 60 60, 40 60, 40 40)))", + geometrycollection: "GEOMETRYCOLLECTION(POINT(10 20), LINESTRING(10 20, 30 40))", + }, + } + + for i, row := range rows.Records { + require.Len(s.t, row, 8, "expected 8 columns") + + pointVal := row[1].Value() + require.Equal(s.t, expectedValues[i].point, pointVal, "point_col value mismatch at row %d", i+1) + + linestringVal := row[2].Value() + require.Equal(s.t, expectedValues[i].linestring, linestringVal, "linestring_col value mismatch at row %d", i+1) + + polygonVal := row[3].Value() + require.Equal(s.t, expectedValues[i].polygon, polygonVal, "polygon_col value mismatch at row %d", i+1) + + multipointVal := row[4].Value() + require.Equal(s.t, expectedValues[i].multipoint, multipointVal, "multipoint_col value mismatch at row %d", i+1) + + multilinestringVal := row[5].Value() + require.Equal(s.t, expectedValues[i].multilinestring, multilinestringVal, "multilinestring_col value mismatch at row %d", i+1) + + multipolygonVal := row[6].Value() + require.Equal(s.t, expectedValues[i].multipolygon, multipolygonVal, "multipolygon_col value mismatch at row %d", i+1) + + geometrycollectionVal := row[7].Value() + require.Equal(s.t, expectedValues[i].geometrycollection, geometrycollectionVal, "geometrycollection_col value mismatch at row %d", i+1) + } + + // Clean up + env.Cancel(s.t.Context()) + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/model/qvalue/equals.go b/flow/model/qvalue/equals.go index 54ccfe0fdb..dff3179d4b 100644 --- a/flow/model/qvalue/equals.go +++ b/flow/model/qvalue/equals.go @@ -118,6 +118,20 @@ func Equals(qv QValue, other QValue) bool { return compareGeometry(q.Val, otherValue) case QValueGeography: return compareGeometry(q.Val, otherValue) + case QValuePoint: + return compareGeometry(q.Val, otherValue) + case QValueLineString: + return compareGeometry(q.Val, otherValue) + case QValuePolygon: + return compareGeometry(q.Val, otherValue) + case QValueMultiPoint: + return compareGeometry(q.Val, otherValue) + case QValueMultiLineString: + return compareGeometry(q.Val, otherValue) + case QValueMultiPolygon: + return compareGeometry(q.Val, otherValue) + case QValueGeometryCollection: + return compareGeometry(q.Val, otherValue) case QValueHStore: return compareHStore(q.Val, otherValue) case QValueArrayInt32, QValueArrayInt16, QValueArrayInt64, QValueArrayFloat32, QValueArrayFloat64: diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 7727873b54..7dd66696e1 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -13,38 +13,44 @@ import ( type QValueKind string const ( - QValueKindInvalid QValueKind = "invalid" - QValueKindFloat32 QValueKind = "float32" - QValueKindFloat64 QValueKind = "float64" - QValueKindInt8 QValueKind = "int8" - QValueKindInt16 QValueKind = "int16" - QValueKindInt32 QValueKind = "int32" - QValueKindInt64 QValueKind = "int64" - QValueKindUInt8 QValueKind = "uint8" - QValueKindUInt16 QValueKind = "uint16" - QValueKindUInt32 QValueKind = "uint32" - QValueKindUInt64 QValueKind = "uint64" - QValueKindBoolean QValueKind = "bool" - QValueKindStruct QValueKind = "struct" - QValueKindQChar QValueKind = "qchar" - QValueKindString QValueKind = "string" - QValueKindEnum QValueKind = "enum" - QValueKindTimestamp QValueKind = "timestamp" - QValueKindTimestampTZ QValueKind = "timestamptz" - QValueKindDate QValueKind = "date" - QValueKindTime QValueKind = "time" - QValueKindTimeTZ QValueKind = "timetz" - QValueKindInterval QValueKind = "interval" - QValueKindTSTZRange QValueKind = "tstzrange" - QValueKindNumeric QValueKind = "numeric" - QValueKindBytes QValueKind = "bytes" - QValueKindUUID QValueKind = "uuid" - QValueKindJSON QValueKind = "json" - QValueKindJSONB QValueKind = "jsonb" - QValueKindHStore QValueKind = "hstore" - QValueKindGeography QValueKind = "geography" - QValueKindGeometry QValueKind = "geometry" - QValueKindPoint QValueKind = "point" + QValueKindInvalid QValueKind = "invalid" + QValueKindFloat32 QValueKind = "float32" + QValueKindFloat64 QValueKind = "float64" + QValueKindInt8 QValueKind = "int8" + QValueKindInt16 QValueKind = "int16" + QValueKindInt32 QValueKind = "int32" + QValueKindInt64 QValueKind = "int64" + QValueKindUInt8 QValueKind = "uint8" + QValueKindUInt16 QValueKind = "uint16" + QValueKindUInt32 QValueKind = "uint32" + QValueKindUInt64 QValueKind = "uint64" + QValueKindBoolean QValueKind = "bool" + QValueKindStruct QValueKind = "struct" + QValueKindQChar QValueKind = "qchar" + QValueKindString QValueKind = "string" + QValueKindEnum QValueKind = "enum" + QValueKindTimestamp QValueKind = "timestamp" + QValueKindTimestampTZ QValueKind = "timestamptz" + QValueKindDate QValueKind = "date" + QValueKindTime QValueKind = "time" + QValueKindTimeTZ QValueKind = "timetz" + QValueKindInterval QValueKind = "interval" + QValueKindTSTZRange QValueKind = "tstzrange" + QValueKindNumeric QValueKind = "numeric" + QValueKindBytes QValueKind = "bytes" + QValueKindUUID QValueKind = "uuid" + QValueKindJSON QValueKind = "json" + QValueKindJSONB QValueKind = "jsonb" + QValueKindHStore QValueKind = "hstore" + QValueKindGeography QValueKind = "geography" + QValueKindGeometry QValueKind = "geometry" + QValueKindPoint QValueKind = "point" + QValueKindLineString QValueKind = "linestring" + QValueKindPolygon QValueKind = "polygon" + QValueKindMultiPoint QValueKind = "multipoint" + QValueKindMultiLineString QValueKind = "multilinestring" + QValueKindMultiPolygon QValueKind = "multipolygon" + QValueKindGeometryCollection QValueKind = "geometrycollection" // network types QValueKindCIDR QValueKind = "cidr" diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index c8ccb8963d..231dc72e56 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -798,3 +798,99 @@ func (v QValueArrayEnum) LValue(ls *lua.LState) lua.LValue { return lua.LString(x) }) } + +type QValueLineString struct { + Val string +} + +func (QValueLineString) Kind() QValueKind { + return QValueKindLineString +} + +func (v QValueLineString) Value() any { + return v.Val +} + +func (v QValueLineString) LValue(ls *lua.LState) lua.LValue { + return lua.LString(v.Val) +} + +type QValuePolygon struct { + Val string +} + +func (QValuePolygon) Kind() QValueKind { + return QValueKindPolygon +} + +func (v QValuePolygon) Value() any { + return v.Val +} + +func (v QValuePolygon) LValue(ls *lua.LState) lua.LValue { + return lua.LString(v.Val) +} + +type QValueMultiPoint struct { + Val string +} + +func (QValueMultiPoint) Kind() QValueKind { + return QValueKindMultiPoint +} + +func (v QValueMultiPoint) Value() any { + return v.Val +} + +func (v QValueMultiPoint) LValue(ls *lua.LState) lua.LValue { + return lua.LString(v.Val) +} + +type QValueMultiLineString struct { + Val string +} + +func (QValueMultiLineString) Kind() QValueKind { + return QValueKindMultiLineString +} + +func (v QValueMultiLineString) Value() any { + return v.Val +} + +func (v QValueMultiLineString) LValue(ls *lua.LState) lua.LValue { + return lua.LString(v.Val) +} + +type QValueMultiPolygon struct { + Val string +} + +func (QValueMultiPolygon) Kind() QValueKind { + return QValueKindMultiPolygon +} + +func (v QValueMultiPolygon) Value() any { + return v.Val +} + +func (v QValueMultiPolygon) LValue(ls *lua.LState) lua.LValue { + return lua.LString(v.Val) +} + +type QValueGeometryCollection struct { + Val string +} + +func (QValueGeometryCollection) Kind() QValueKind { + return QValueKindGeometryCollection +} + +func (v QValueGeometryCollection) Value() any { + return v.Val +} + +func (v QValueGeometryCollection) LValue(ls *lua.LState) lua.LValue { + return lua.LString(v.Val) +}