Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Bristol/mysql/conn_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ func (mc *mysqlConn) DumpBinlog0(parser *eventParser, callbackFun callback) (dri
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, UPDATE_ROWS_EVENTv0, UPDATE_ROWS_EVENTv1, UPDATE_ROWS_EVENTv2, DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2:
commitEventOk = true
break
case QUERY_EVENT:
// When a DML QUERY_EVENT (from STATEMENT/MIXED binlog format, e.g.
// on tables with triggers) has a table name, it contains data that
// needs to be committed. Keep commitEventOk = true so the
// subsequent COMMIT/XID event is properly propagated.
// For BEGIN, COMMIT, and other queries without a table name,
// reset commitEventOk.
if event.TableName != "" {
commitEventOk = true
} else {
commitEventOk = false
}
default:
commitEventOk = false
}
Expand Down
10 changes: 10 additions & 0 deletions plugin/clickhouse/src/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,16 @@ func (This *Conn) Query(data *pluginDriver.PluginDataType, retry bool) (LastSucc
default:
break
}
// Skip non-DDL queries (DML and transaction control statements) without
// triggering AutoCommit. When MySQL tables have triggers and binlog_format
// is STATEMENT or MIXED, DML operations appear as QUERY_EVENT. These cannot
// be converted to ClickHouse row operations. Transaction control statements
// (SAVEPOINT, RELEASE SAVEPOINT, ROLLBACK TO) may also appear mid-transaction
// when triggers are involved. Skipping them preserves pending row data in the
// buffer for the eventual COMMIT to flush.
if IsNonDDLQuery(data.Query) {
return nil, nil, nil
}
for {
LastSuccessCommitData, ErrData, err = This.AutoCommit()
if err != nil {
Expand Down
123 changes: 123 additions & 0 deletions plugin/clickhouse/src/clickhouse_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package src

import (
pluginDriver "github.com/brokercap/Bifrost/plugin/driver"
"testing"
)

Expand All @@ -13,6 +14,128 @@ func TestNewTableData(t *testing.T) {
t.Log("success")
}

func TestQuery_NonDDLStatements(t *testing.T) {
// Verify that non-DDL QUERY_EVENTs (DML and transaction control) don't
// trigger AutoCommit and don't lose pending data. This tests the fix for
// issue #285: MySQL trigger data loss when syncing to ClickHouse.
conn := &Conn{
p: &PluginParam{
AutoCreateTable: true,
BatchSize: 500,
Data: NewTableData(),
},
}

// Simulate pending row data in the buffer
conn.p.Data.Data = append(conn.p.Data.Data, &pluginDriver.PluginDataType{
EventType: "insert",
SchemaName: "test_db",
TableName: "test_table",
Rows: []map[string]interface{}{{"id": 1, "name": "foo"}},
})

nonDDLQueries := []string{
"INSERT INTO test VALUES (1, 'foo')",
"UPDATE test SET name = 'bar'",
"DELETE FROM test WHERE id = 1",
"REPLACE INTO test VALUES (1, 'foo')",
"SAVEPOINT sp1",
"RELEASE SAVEPOINT sp1",
"ROLLBACK TO sp1",
}

for _, query := range nonDDLQueries {
// Re-add pending data for each iteration (to verify it's preserved)
if len(conn.p.Data.Data) == 0 {
conn.p.Data.Data = append(conn.p.Data.Data, &pluginDriver.PluginDataType{
EventType: "insert",
SchemaName: "test_db",
TableName: "test_table",
Rows: []map[string]interface{}{{"id": 1, "name": "foo"}},
})
}

data := &pluginDriver.PluginDataType{
Query: query,
SchemaName: "test_db",
TableName: "test_table",
}

lastSuccess, errData, err := conn.Query(data, false)
if lastSuccess != nil {
t.Errorf("Query(%q): expected nil lastSuccess, got %v", query, lastSuccess)
}
if errData != nil {
t.Errorf("Query(%q): expected nil errData, got %v", query, errData)
}
if err != nil {
t.Errorf("Query(%q): expected nil err, got %v", query, err)
}

// Verify pending data was NOT flushed by AutoCommit
if len(conn.p.Data.Data) == 0 {
t.Errorf("Query(%q): pending data was flushed (AutoCommit triggered), expected data to be preserved", query)
}
}
}

func TestQuery_TransactionControlSkipped(t *testing.T) {
// Verify BEGIN and COMMIT are also properly skipped
conn := &Conn{
p: &PluginParam{
AutoCreateTable: true,
BatchSize: 500,
Data: NewTableData(),
},
}

conn.p.Data.Data = append(conn.p.Data.Data, &pluginDriver.PluginDataType{
EventType: "insert",
SchemaName: "test_db",
TableName: "test_table",
Rows: []map[string]interface{}{{"id": 1}},
})

for _, query := range []string{"BEGIN", "begin", "COMMIT", "commit"} {
data := &pluginDriver.PluginDataType{Query: query}
lastSuccess, errData, err := conn.Query(data, false)
if lastSuccess != nil || errData != nil || err != nil {
t.Errorf("Query(%q): expected all nil returns, got lastSuccess=%v errData=%v err=%v", query, lastSuccess, errData, err)
}
if len(conn.p.Data.Data) == 0 {
t.Errorf("Query(%q): pending data was flushed", query)
}
}
}

func TestQuery_AutoCreateTableFalse(t *testing.T) {
// When AutoCreateTable is false, ALL queries should return nil without
// affecting the data buffer.
conn := &Conn{
p: &PluginParam{
AutoCreateTable: false,
BatchSize: 500,
Data: NewTableData(),
},
}

conn.p.Data.Data = append(conn.p.Data.Data, &pluginDriver.PluginDataType{
EventType: "insert",
Rows: []map[string]interface{}{{"id": 1}},
})

for _, query := range []string{"INSERT INTO t VALUES (1)", "ALTER TABLE t ADD COLUMN c INT", "SAVEPOINT sp1"} {
data := &pluginDriver.PluginDataType{Query: query}
lastSuccess, _, err := conn.Query(data, false)
if lastSuccess != nil || err != nil {
t.Errorf("Query(%q) with AutoCreateTable=false: expected nil, got lastSuccess=%v err=%v", query, lastSuccess, err)
}
if len(conn.p.Data.Data) == 0 {
t.Errorf("Query(%q) with AutoCreateTable=false: data was flushed", query)
}
}
}

func TestConn_InitVersion0(t *testing.T) {
obj := &Conn{}
str := "19.13.3.26"
Expand Down
26 changes: 26 additions & 0 deletions plugin/clickhouse/src/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ func (This *Conn) getAutoTableSqlSchemaAndTable(name string, DefaultSchemaName s
return
}

// IsNonDDLQuery returns true if the query is a DML statement or transaction
// control statement that should NOT be processed as DDL by the ClickHouse plugin.
// This prevents data loss when MySQL tables have triggers and binlog_format is
// STATEMENT or MIXED, causing DML to be logged as QUERY_EVENT.
func IsNonDDLQuery(query string) bool {
query = strings.TrimSpace(query)
if len(query) < 4 {
return false
}
upper := strings.ToUpper(query)
// Transaction control statements (may appear with triggers)
if strings.HasPrefix(upper, "SAVEPOINT") ||
strings.HasPrefix(upper, "RELEASE SAVEPOINT") ||
strings.HasPrefix(upper, "ROLLBACK TO") {
return true
}
// DML statements from STATEMENT/MIXED binlog format
if strings.HasPrefix(upper, "INSERT") ||
strings.HasPrefix(upper, "UPDATE") ||
strings.HasPrefix(upper, "DELETE") ||
strings.HasPrefix(upper, "REPLACE") {
return true
}
return false
}

func (This *Conn) TranferQuerySql(data *pluginDriver.PluginDataType) (SchemaName, TableName, newSql, newLocalSql, newDisSql, newViewSql string) {
Query := strings.Trim(data.Query, " ")
// 非 DDL ALTER 语句,直接过滤掉
Expand Down
84 changes: 83 additions & 1 deletion plugin/clickhouse/src/sql_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,88 @@
package src

import "testing"
import (
pluginDriver "github.com/brokercap/Bifrost/plugin/driver"
"testing"
)

func TestIsNonDDLQuery(t *testing.T) {
type testCase struct {
query string
expected bool
}
cases := []testCase{
// DDL statements - should return false
{"ALTER TABLE test ADD COLUMN name VARCHAR(50)", false},
{"RENAME TABLE old_table TO new_table", false},
{"DROP TABLE IF EXISTS test", false},
{"TRUNCATE TABLE test", false},
{"CREATE TABLE test (id INT)", false},

// Transaction control - should return true
{"BEGIN", false}, // BEGIN is handled separately in Query()
{"COMMIT", false}, // COMMIT is handled separately in Query()
{"SAVEPOINT sp1", true},
{"RELEASE SAVEPOINT sp1", true},
{"ROLLBACK TO sp1", true},
{"ROLLBACK TO SAVEPOINT sp1", true},

// DML statements from STATEMENT/MIXED binlog format - should return true
{"INSERT INTO test VALUES (1, 'foo')", true},
{"UPDATE test SET name = 'bar' WHERE id = 1", true},
{"DELETE FROM test WHERE id = 1", true},
{"REPLACE INTO test VALUES (1, 'foo')", true},

// Edge cases
{"", false},
{"AB", false},
{" INSERT INTO test VALUES (1)", true},
{" SAVEPOINT sp1 ", true},
{"insert into test values (1)", true},
{"update test set name = 'bar'", true},
{"delete from test where id = 1", true},
{"savepoint sp1", true},
}

for i, c := range cases {
result := IsNonDDLQuery(c.query)
if result != c.expected {
t.Errorf("case %d: IsNonDDLQuery(%q) = %v, want %v", i, c.query, result, c.expected)
}
}
}

func TestTranferQuerySql_DML(t *testing.T) {
// Verify that TranferQuerySql returns empty strings for DML statements.
// This confirms DML from STATEMENT/MIXED binlog format won't be
// mistakenly executed as DDL on ClickHouse.
ckObj := &Conn{
p: &PluginParam{
CkSchema: "",
},
}

dmlQueries := []string{
"INSERT INTO test VALUES (1, 'foo')",
"UPDATE test SET name = 'bar' WHERE id = 1",
"DELETE FROM test WHERE id = 1",
"REPLACE INTO test VALUES (1, 'foo')",
"SAVEPOINT sp1",
"RELEASE SAVEPOINT sp1",
"ROLLBACK TO sp1",
}

for i, query := range dmlQueries {
data := &pluginDriver.PluginDataType{
Query: query,
SchemaName: "test_db",
TableName: "test",
}
_, _, newSql, newLocalSql, newDisSql, newViewSql := ckObj.TranferQuerySql(data)
if newSql != "" || newLocalSql != "" || newDisSql != "" || newViewSql != "" {
t.Errorf("case %d: TranferQuerySql(%q) returned non-empty SQL, expected all empty for DML", i, query)
}
}
}

func TestConn_getAutoTableSqlSchemaAndTable(t *testing.T) {
type caseStruct struct {
Expand Down