Skip to content

Commit 45574a7

Browse files
authored
Merge pull request #712 from datazip-inc/staging
chore: releasing version v0.3.9
2 parents ad6d0e9 + cdf6d5f commit 45574a7

4 files changed

Lines changed: 52 additions & 11 deletions

File tree

drivers/mysql/internal/mysql_test_util.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
7676
is_active TINYINT(1),
7777
long_varchar MEDIUMTEXT,
7878
name_bool TINYINT(1) DEFAULT '1',
79+
status ENUM('active','inactive','pending') DEFAULT NULL,
80+
priority ENUM('low','medium','high') DEFAULT 'low',
7981
PRIMARY KEY (id)
8082
)`, integrationTestTable)
8183

@@ -100,7 +102,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
100102
name_char, name_varchar, name_text, name_tinytext,
101103
name_mediumtext, name_longtext, created_date,
102104
created_timestamp, is_active,
103-
long_varchar, name_bool
105+
long_varchar, name_bool, status, priority
104106
) VALUES (
105107
6, 6, 123456789012345,
106108
100, 101, 102, 103,
@@ -111,7 +113,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
111113
'c', 'varchar_val', 'text_val', 'tinytext_val',
112114
'mediumtext_val', 'longtext_val', '2023-01-01 12:00:00',
113115
'2023-01-01 12:00:00', 1,
114-
'long_varchar_val', 1
116+
'long_varchar_val', 1, 'active', 'high'
115117
)`, integrationTestTable)
116118

117119
case "update":
@@ -132,7 +134,8 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
132134
name_mediumtext = 'upd medium', name_longtext = 'upd long',
133135
created_date = '2024-07-01 15:30:00',
134136
created_timestamp = '2024-07-01 15:30:00', is_active = 0,
135-
long_varchar = 'updated long...', name_bool = 0
137+
long_varchar = 'updated long...', name_bool = 0,
138+
status = 'pending', priority = 'low'
136139
WHERE id = 6`, integrationTestTable)
137140

138141
case "delete":
@@ -188,7 +191,7 @@ func insertTestData(t *testing.T, ctx context.Context, db *sqlx.DB, tableName st
188191
price_double_precision, price_float, price_numeric, price_real,
189192
name_char, name_varchar, name_text, name_tinytext,
190193
name_mediumtext, name_longtext, created_date,
191-
created_timestamp, is_active, long_varchar, name_bool
194+
created_timestamp, is_active, long_varchar, name_bool, status, priority
192195
) VALUES (
193196
%d, %d, 123456789012345,
194197
100, 101, 102, 103,
@@ -198,7 +201,7 @@ func insertTestData(t *testing.T, ctx context.Context, db *sqlx.DB, tableName st
198201
123.456, 123.45, 123.45, 123.456,
199202
'c', 'varchar_val', 'text_val', 'tinytext_val',
200203
'mediumtext_val', 'longtext_val', '2023-01-01 12:00:00',
201-
'2023-01-01 12:00:00', 1, 'long_varchar_val', 1
204+
'2023-01-01 12:00:00', 1, 'long_varchar_val', 1, 'active', 'high'
202205
)`, tableName, i, i)
203206

204207
_, err := db.ExecContext(ctx, query)
@@ -236,6 +239,8 @@ var ExpectedMySQLData = map[string]interface{}{
236239
"is_active": int32(1),
237240
"long_varchar": "long_varchar_val",
238241
"name_bool": int32(1),
242+
"status": "active",
243+
"priority": "high",
239244
}
240245

241246
var ExpectedUpdatedData = map[string]interface{}{
@@ -268,6 +273,8 @@ var ExpectedUpdatedData = map[string]interface{}{
268273
"is_active": int32(0),
269274
"long_varchar": "updated long...",
270275
"name_bool": int32(0),
276+
"status": "pending",
277+
"priority": "low",
271278
}
272279

273280
var MySQLToDestinationSchema = map[string]string{
@@ -300,6 +307,8 @@ var MySQLToDestinationSchema = map[string]string{
300307
"is_active": "tinyint",
301308
"long_varchar": "mediumtext",
302309
"name_bool": "tinyint",
310+
"status": "enum",
311+
"priority": "enum",
303312
}
304313

305314
var EvolvedMySQLToDestinationSchema = map[string]string{
@@ -333,4 +342,6 @@ var EvolvedMySQLToDestinationSchema = map[string]string{
333342
"is_active": "tinyint",
334343
"long_varchar": "mediumtext",
335344
"name_bool": "tinyint",
345+
"status": "enum",
346+
"priority": "enum",
336347
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"selected_streams":{"olake_mysql_test":[{"partition_regex":"","stream_name":"mysql_test_table_olake","normalization":true}]},"streams":[{"stream":{"name":"mysql_test_table_olake","namespace":"olake_mysql_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_olake_id":{"type":["null","string"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"},"amount_decimal_9_2":{"type":["number","null"],"destination_column_name":"amount_decimal_9_2"},"created_date":{"type":["timestamp","null"],"destination_column_name":"created_date"},"created_timestamp":{"type":["timestamp","null"],"destination_column_name":"created_timestamp"},"id":{"type":["integer_small"],"destination_column_name":"id"},"id_bigint":{"type":["null","integer"],"destination_column_name":"id_bigint"},"id_cursor":{"type":["null","integer_small"],"destination_column_name":"id_cursor"},"id_int":{"type":["null","integer_small"],"destination_column_name":"id_int"},"id_int_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_int_unsigned"},"id_integer":{"type":["integer_small","null"],"destination_column_name":"id_integer"},"id_integer_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_integer_unsigned"},"id_mediumint":{"type":["null","integer_small"],"destination_column_name":"id_mediumint"},"id_mediumint_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_mediumint_unsigned"},"id_smallint":{"type":["integer_small","null"],"destination_column_name":"id_smallint"},"id_smallint_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_smallint_unsigned"},"id_tinyint":{"type":["integer_small","null"],"destination_column_name":"id_tinyint"},"id_tinyint_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_tinyint_unsigned"},"is_active":{"type":["integer_small","null"],"destination_column_name":"is_active"},"long_varchar":{"type":["string","null"],"destination_column_name":"long_varchar"},"name_bool":{"type":["integer_small","null"],"destination_column_name":"name_bool"},"name_char":{"type":["string","null"],"destination_column_name":"name_char"},"name_longtext":{"type":["string","null"],"destination_column_name":"name_longtext"},"name_mediumtext":{"type":["string","null"],"destination_column_name":"name_mediumtext"},"name_text":{"type":["string","null"],"destination_column_name":"name_text"},"name_tinytext":{"type":["null","string"],"destination_column_name":"name_tinytext"},"name_varchar":{"type":["string","null"],"destination_column_name":"name_varchar"},"price_decimal":{"type":["number","null"],"destination_column_name":"price_decimal"},"price_double":{"type":["number","null"],"destination_column_name":"price_double"},"price_double_precision":{"type":["null","number"],"destination_column_name":"price_double_precision"},"price_float":{"type":["number_small","null"],"destination_column_name":"price_float"},"price_numeric":{"type":["number","null"],"destination_column_name":"price_numeric"},"price_real":{"type":["null","number"],"destination_column_name":"price_real"}}},"supported_sync_modes":["strict_cdc","full_refresh","incremental","cdc"],"source_defined_primary_key":["id"],"available_cursor_fields":["price_numeric","name_longtext","long_varchar","name_text","id_bigint","id_mediumint_unsigned","price_real","name_varchar","id_integer","id_mediumint","name_mediumtext","id_integer_unsigned","id_smallint","id_tinyint_unsigned","price_double","created_timestamp","name_tinytext","created_date","id_int","id_smallint_unsigned","id_tinyint","price_double_precision","id","id_cursor","price_decimal","amount_decimal_9_2","name_char","is_active","name_bool","id_int_unsigned","price_float"],"sync_mode":"cdc","destination_database":"mysql:olake_mysql_test","destination_table":"mysql_test_table_olake"}}]}
1+
{"selected_streams":{"olake_mysql_test":[{"partition_regex":"","stream_name":"mysql_test_table_olake","normalization":true}]},"streams":[{"stream":{"name":"mysql_test_table_olake","namespace":"olake_mysql_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_olake_id":{"type":["null","string"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"},"amount_decimal_9_2":{"type":["number","null"],"destination_column_name":"amount_decimal_9_2"},"created_date":{"type":["timestamp","null"],"destination_column_name":"created_date"},"created_timestamp":{"type":["timestamp","null"],"destination_column_name":"created_timestamp"},"id":{"type":["integer_small"],"destination_column_name":"id"},"id_bigint":{"type":["null","integer"],"destination_column_name":"id_bigint"},"id_cursor":{"type":["null","integer_small"],"destination_column_name":"id_cursor"},"id_int":{"type":["null","integer_small"],"destination_column_name":"id_int"},"id_int_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_int_unsigned"},"id_integer":{"type":["integer_small","null"],"destination_column_name":"id_integer"},"id_integer_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_integer_unsigned"},"id_mediumint":{"type":["null","integer_small"],"destination_column_name":"id_mediumint"},"id_mediumint_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_mediumint_unsigned"},"id_smallint":{"type":["integer_small","null"],"destination_column_name":"id_smallint"},"id_smallint_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_smallint_unsigned"},"id_tinyint":{"type":["integer_small","null"],"destination_column_name":"id_tinyint"},"id_tinyint_unsigned":{"type":["integer_small","null"],"destination_column_name":"id_tinyint_unsigned"},"is_active":{"type":["integer_small","null"],"destination_column_name":"is_active"},"long_varchar":{"type":["string","null"],"destination_column_name":"long_varchar"},"name_bool":{"type":["integer_small","null"],"destination_column_name":"name_bool"},"name_char":{"type":["string","null"],"destination_column_name":"name_char"},"name_longtext":{"type":["string","null"],"destination_column_name":"name_longtext"},"name_mediumtext":{"type":["string","null"],"destination_column_name":"name_mediumtext"},"name_text":{"type":["string","null"],"destination_column_name":"name_text"},"name_tinytext":{"type":["null","string"],"destination_column_name":"name_tinytext"},"name_varchar":{"type":["string","null"],"destination_column_name":"name_varchar"},"price_decimal":{"type":["number","null"],"destination_column_name":"price_decimal"},"price_double":{"type":["number","null"],"destination_column_name":"price_double"},"price_double_precision":{"type":["null","number"],"destination_column_name":"price_double_precision"},"price_float":{"type":["number_small","null"],"destination_column_name":"price_float"},"price_numeric":{"type":["number","null"],"destination_column_name":"price_numeric"},"price_real":{"type":["null","number"],"destination_column_name":"price_real"},"priority":{"type":["null","string"],"destination_column_name":"priority"},"status":{"type":["string","null"],"destination_column_name":"status"}}},"supported_sync_modes":["strict_cdc","full_refresh","incremental","cdc"],"source_defined_primary_key":["id"],"available_cursor_fields":["price_numeric","name_longtext","long_varchar","name_text","id_bigint","id_mediumint_unsigned","price_real","name_varchar","id_integer","id_mediumint","name_mediumtext","id_integer_unsigned","id_smallint","id_tinyint_unsigned","price_double","created_timestamp","name_tinytext","created_date","id_int","id_smallint_unsigned","id_tinyint","price_double_precision","id","id_cursor","price_decimal","amount_decimal_9_2","name_char","is_active","name_bool","id_int_unsigned","price_float","priority","status"],"sync_mode":"cdc","destination_database":"mysql:olake_mysql_test","destination_table":"mysql_test_table_olake"}}]}

pkg/binlog/binlog.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ func (c *Connection) StreamMessages(ctx context.Context, client *sqlx.DB, callba
111111
c.CurrentPos.Pos = uint32(e.Position)
112112
logger.Infof("Binlog rotated to %s:%d", c.CurrentPos.Name, c.CurrentPos.Pos)
113113

114+
case *replication.GTIDEvent:
115+
if e.OriginalCommitTimestamp > 0 {
116+
c.changeFilter.lastGTIDEvent = time.UnixMicro(int64(e.OriginalCommitTimestamp)) // #nosec G115 - timestamp value is always within int64 range
117+
}
118+
119+
// TODO: Investigate MariaDB GTID event structure for microsecond timestamp support.
120+
114121
case *replication.RowsEvent:
115122
messageReceived = true
116123
if err := c.changeFilter.FilterRowsEvent(ctx, e, ev, callback); err != nil {

pkg/binlog/filter.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ import (
77

88
"github.com/datazip-inc/olake/drivers/abstract"
99
"github.com/datazip-inc/olake/types"
10+
"github.com/datazip-inc/olake/utils"
1011
"github.com/datazip-inc/olake/utils/typeutils"
1112
"github.com/go-mysql-org/go-mysql/mysql"
1213
"github.com/go-mysql-org/go-mysql/replication"
1314
)
1415

1516
// ChangeFilter filters binlog events based on the specified streams.
1617
type ChangeFilter struct {
17-
streams map[string]types.StreamInterface // Keyed by "schema.table"
18-
converter func(value interface{}, columnType string) (interface{}, error)
18+
streams map[string]types.StreamInterface // Keyed by "schema.table"
19+
converter func(value interface{}, columnType string) (interface{}, error)
20+
lastGTIDEvent time.Time
1921
}
2022

2123
// NewChangeFilter creates a filter for the given streams.
@@ -68,16 +70,21 @@ func (f ChangeFilter) FilterRowsEvent(ctx context.Context, e *replication.RowsEv
6870
}
6971

7072
for _, row := range rowsToProcess {
71-
record, err := convertRowToMap(row, e.Table.ColumnNameString(), columnTypes, f.converter)
73+
record, err := convertRowToMap(row, e.Table, columnTypes, f.converter)
7274
if err != nil {
7375
return err
7476
}
7577
if record == nil {
7678
continue
7779
}
80+
81+
// Use microsecond-precision timestamp from GTID event (MySQL 8.0.1+) if available,
82+
// otherwise fall back to second-precision header timestamp
83+
timestamp := utils.Ternary(!f.lastGTIDEvent.IsZero(), f.lastGTIDEvent, time.Unix(int64(ev.Header.Timestamp), 0)).(time.Time)
84+
7885
change := abstract.CDCChange{
7986
Stream: stream,
80-
Timestamp: time.Unix(int64(ev.Header.Timestamp), 0),
87+
Timestamp: timestamp,
8188
Kind: operationType,
8289
Data: record,
8390
}
@@ -89,15 +96,31 @@ func (f ChangeFilter) FilterRowsEvent(ctx context.Context, e *replication.RowsEv
8996
}
9097

9198
// convertRowToMap converts a binlog row to a map.
92-
func convertRowToMap(row []interface{}, columns []string, columnTypes []string, converter func(value interface{}, columnType string) (interface{}, error)) (map[string]interface{}, error) {
99+
func convertRowToMap(row []interface{}, tableMap *replication.TableMapEvent, columnTypes []string, converter func(value interface{}, columnType string) (interface{}, error)) (map[string]interface{}, error) {
100+
columns := tableMap.ColumnNameString()
93101
if len(columns) != len(row) {
94102
return nil, fmt.Errorf("column count mismatch: expected %d, got %d", len(columns), len(row))
95103
}
96104

105+
enumMap := tableMap.EnumStrValueMap()
106+
97107
// TODO: float values from binlog are not always same as the output of select * from db,
98108
// need to typecast it to the datatype of the column for consistency with db.
99109
record := make(map[string]interface{})
100110
for i, val := range row {
111+
if tableMap.IsEnumColumn(i) && val != nil {
112+
if enumValues, ok := enumMap[i]; ok {
113+
// for an update CDC event, the key of enum value is passed in binlog events which is always in int64
114+
// during such a case, we need to find out the enum value of it from the index
115+
if idx, isInt64 := val.(int64); isInt64 {
116+
if idx == 0 {
117+
return nil, fmt.Errorf("invalid ENUM value detected for column %s: index 0 indicates an error value (empty string) from MySQL", columns[i])
118+
}
119+
val = enumValues[idx-1]
120+
}
121+
}
122+
}
123+
101124
convertedVal, err := converter(val, columnTypes[i])
102125
if err != nil && err != typeutils.ErrNullValue {
103126
return nil, err

0 commit comments

Comments
 (0)