From af14fb76f22077f16c5ad9ede028c6c2b54833bc Mon Sep 17 00:00:00 2001 From: LiuBo Date: Thu, 27 Mar 2025 13:14:23 +0800 Subject: [PATCH 1/4] [improvement] frontend: add sql to fatal log --- pkg/frontend/mysql_cmd_executor.go | 2 +- pkg/frontend/test/txn_mock.go | 8 +++---- pkg/frontend/txn_test.go | 26 +++++++++++------------ pkg/frontend/util_test.go | 4 ++-- pkg/sql/compile/sql_executor.go | 4 ++-- pkg/txn/client/types.go | 2 +- pkg/vm/engine/disttae/types.go | 7 ++++-- pkg/vm/engine/test/disttae_engine_test.go | 4 ++-- pkg/vm/engine/test/testutil/util.go | 2 +- pkg/vm/engine/test/workspace_test.go | 12 +++++------ 10 files changed, 37 insertions(+), 34 deletions(-) diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index 53f3636056e81..dc3a7e8f5ed7d 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -2689,7 +2689,7 @@ func executeStmtWithWorkspace(ses FeSession, defer ses.ExitFPrint(FPExecStmtWithWorkspaceBeforeStart) //!!!NOTE!!!: statement management //2. start statement on workspace - txnOp.GetWorkspace().StartStatement() + txnOp.GetWorkspace().StartStatement(execCtx.stmt.String()) //3. end statement on workspace // defer Start/End Statement management, called after finishTxnFunc() defer func() { diff --git a/pkg/frontend/test/txn_mock.go b/pkg/frontend/test/txn_mock.go index 822a07c284d24..df4fbef1725a4 100644 --- a/pkg/frontend/test/txn_mock.go +++ b/pkg/frontend/test/txn_mock.go @@ -1238,15 +1238,15 @@ func (mr *MockWorkspaceMockRecorder) SetHaveDDL(flag interface{}) *gomock.Call { } // StartStatement mocks base method. -func (m *MockWorkspace) StartStatement() { +func (m *MockWorkspace) StartStatement(arg0 string) { m.ctrl.T.Helper() - m.ctrl.Call(m, "StartStatement") + m.ctrl.Call(m, "StartStatement", arg0) } // StartStatement indicates an expected call of StartStatement. -func (mr *MockWorkspaceMockRecorder) StartStatement() *gomock.Call { +func (mr *MockWorkspaceMockRecorder) StartStatement(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartStatement", reflect.TypeOf((*MockWorkspace)(nil).StartStatement)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartStatement", reflect.TypeOf((*MockWorkspace)(nil).StartStatement), arg0) } // UpdateSnapshotWriteOffset mocks base method. diff --git a/pkg/frontend/txn_test.go b/pkg/frontend/txn_test.go index 6f7f9830db246..73adcb4bbf7c0 100644 --- a/pkg/frontend/txn_test.go +++ b/pkg/frontend/txn_test.go @@ -74,9 +74,9 @@ func newTestWorkspace() *testWorkspace { return &testWorkspace{} } -func (txn *testWorkspace) StartStatement() { +func (txn *testWorkspace) StartStatement(sql string) { if txn.start { - panic("BUG: StartStatement called twice") + panic(fmt.Sprintf("BUG: StartStatement called twice, sql: %s", sql)) } txn.start = true txn.incr = false @@ -177,7 +177,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement() + wsp.StartStatement("") wsp.EndStatement() }, convey.ShouldNotPanic, @@ -196,8 +196,8 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement() - wsp.StartStatement() + wsp.StartStatement("") + wsp.StartStatement("") }, convey.ShouldPanic, ) @@ -217,7 +217,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement() + wsp.StartStatement("") err := wsp.IncrStatementID(context.TODO(), false) convey.So(err, convey.ShouldBeNil) //incr twice @@ -231,7 +231,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement() + wsp.StartStatement("") err := wsp.RollbackLastStatement(context.TODO()) convey.So(err, convey.ShouldBeNil) }, @@ -242,7 +242,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement() + wsp.StartStatement("") err := wsp.IncrStatementID(context.TODO(), false) convey.So(err, convey.ShouldBeNil) err = wsp.RollbackLastStatement(context.TODO()) @@ -484,7 +484,7 @@ func Test_rollbackStatement(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement() + txnOp.GetWorkspace().StartStatement("") err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -514,7 +514,7 @@ func Test_rollbackStatement(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement() + txnOp.GetWorkspace().StartStatement("") err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -671,7 +671,7 @@ func Test_rollbackStatement5(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement() + txnOp.GetWorkspace().StartStatement("") err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -710,7 +710,7 @@ func Test_rollbackStatement6(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement() + txnOp.GetWorkspace().StartStatement("") err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -745,7 +745,7 @@ func Test_rollbackStatement6(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement() + txnOp.GetWorkspace().StartStatement("") err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} diff --git a/pkg/frontend/util_test.go b/pkg/frontend/util_test.go index da56a38007be6..f01e763bbbe72 100644 --- a/pkg/frontend/util_test.go +++ b/pkg/frontend/util_test.go @@ -621,7 +621,7 @@ func TestGetExprValue(t *testing.T) { ws.EXPECT().IncrStatementID(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ws.EXPECT().IncrSQLCount().AnyTimes() ws.EXPECT().GetSQLCount().AnyTimes() - ws.EXPECT().StartStatement().AnyTimes() + ws.EXPECT().StartStatement("").AnyTimes() ws.EXPECT().EndStatement().AnyTimes() ws.EXPECT().GetSnapshotWriteOffset().Return(0).AnyTimes() ws.EXPECT().UpdateSnapshotWriteOffset().AnyTimes() @@ -729,7 +729,7 @@ func TestGetExprValue(t *testing.T) { ws := mock_frontend.NewMockWorkspace(ctrl) ws.EXPECT().IncrStatementID(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ws.EXPECT().StartStatement().AnyTimes() + ws.EXPECT().StartStatement("").AnyTimes() ws.EXPECT().EndStatement().AnyTimes() ws.EXPECT().GetSnapshotWriteOffset().Return(0).AnyTimes() ws.EXPECT().UpdateSnapshotWriteOffset().AnyTimes() diff --git a/pkg/sql/compile/sql_executor.go b/pkg/sql/compile/sql_executor.go index 643c46efd506f..1461882e645ba 100644 --- a/pkg/sql/compile/sql_executor.go +++ b/pkg/sql/compile/sql_executor.go @@ -99,7 +99,7 @@ func (s *sqlExecutor) NewTxnOperator(ctx context.Context) client.TxnOperator { return nil } } - opts.Txn().GetWorkspace().StartStatement() + opts.Txn().GetWorkspace().StartStatement("") opts.Txn().GetWorkspace().IncrStatementID(ctx, false) return opts.Txn() } @@ -282,7 +282,7 @@ func (exec *txnExecutor) Exec( // maybe we should fix it. txnOp := exec.opts.Txn() if txnOp != nil && !exec.opts.DisableIncrStatement() { - txnOp.GetWorkspace().StartStatement() + txnOp.GetWorkspace().StartStatement(sql) defer func() { txnOp.GetWorkspace().EndStatement() }() diff --git a/pkg/txn/client/types.go b/pkg/txn/client/types.go index 98da924eb6b30..9799ae4abf7f7 100644 --- a/pkg/txn/client/types.go +++ b/pkg/txn/client/types.go @@ -250,7 +250,7 @@ type Workspace interface { Readonly() bool // StartStatement tag a statement is running - StartStatement() + StartStatement(string) // EndStatement tag end a statement is completed EndStatement() diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 160a32e2a4631..7d9b35accdad3 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -525,9 +525,12 @@ func (txn *Transaction) PPString() string { stringifySlice(txn.transfer.timestamps, func(a any) string { t := a.(timestamp.Timestamp); return t.DebugString() })) } -func (txn *Transaction) StartStatement() { +func (txn *Transaction) StartStatement(sql string) { if txn.startStatementCalled { - logutil.Fatal("BUG: StartStatement called twice", zap.String("txn", hex.EncodeToString(txn.op.Txn().ID))) + logutil.Fatal("BUG: StartStatement called twice", + zap.String("txn", hex.EncodeToString(txn.op.Txn().ID)), + zap.String("SQL", sql), + ) } txn.startStatementCalled = true txn.incrStatementCalled = false diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index 3138bda372e21..8d9429934e024 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -163,7 +163,7 @@ func TestSystemDB1(t *testing.T) { txnop = p.StartCNTxn() dbs, err = p.D.Engine.Databases(p.Ctx, txnop) require.NoError(t, err) - txnop.GetWorkspace().StartStatement() + txnop.GetWorkspace().StartStatement("") require.Equal(t, 2+1, len(dbs)) txn, err := p.T.StartTxn() @@ -561,7 +561,7 @@ func TestColumnsTransfer(t *testing.T) { require.NoError(t, txnop.Commit(p.Ctx)) txnop = p.StartCNTxn() - txnop.GetWorkspace().StartStatement() + txnop.GetWorkspace().StartStatement("") p.DeleteTableInDB(txnop, "db", schema2.Name) txn, _ := tae.StartTxn(nil) diff --git a/pkg/vm/engine/test/testutil/util.go b/pkg/vm/engine/test/testutil/util.go index 9accd6bcbb056..0c46f7b0dabaf 100644 --- a/pkg/vm/engine/test/testutil/util.go +++ b/pkg/vm/engine/test/testutil/util.go @@ -444,7 +444,7 @@ func WriteToRelation( bat *batch.Batch, isDelete, toEndStatement bool, ) (err error) { - txn.GetWorkspace().StartStatement() + txn.GetWorkspace().StartStatement("") if isDelete { err = relation.Delete(ctx, bat, catalog2.Row_ID) } else { diff --git a/pkg/vm/engine/test/workspace_test.go b/pkg/vm/engine/test/workspace_test.go index 4c90fd94b633f..7295fa6e9af01 100644 --- a/pkg/vm/engine/test/workspace_test.go +++ b/pkg/vm/engine/test/workspace_test.go @@ -809,7 +809,7 @@ func Test_BasicRollbackStatement(t *testing.T) { require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat1, false, true)) - txn.GetWorkspace().StartStatement() + txn.GetWorkspace().StartStatement("") require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -921,7 +921,7 @@ func Test_BasicRollbackStatementS3(t *testing.T) { require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat1, false, true)) - txn.GetWorkspace().StartStatement() + txn.GetWorkspace().StartStatement("") require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -1033,7 +1033,7 @@ func Test_RollbackDeleteAndDrop(t *testing.T) { txnop = p.StartCNTxn() exec := v.(executor.SQLExecutor) execopts := executor.Options{}.WithTxn(txnop).WithDisableIncrStatement() - txnop.GetWorkspace().StartStatement() + txnop.GetWorkspace().StartStatement("") txnop.GetWorkspace().IncrStatementID(p.Ctx, false) dropTable := func() { _, err := exec.Exec(p.Ctx, "delete from db.test3 where mock_1 = 0", execopts) @@ -1182,7 +1182,7 @@ func Test_MultiTxnRollbackStatement(t *testing.T) { { require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat2, false, true)) - txn.GetWorkspace().StartStatement() + txn.GetWorkspace().StartStatement("") require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -1357,7 +1357,7 @@ func Test_MultiTxnRollbackStatementS3(t *testing.T) { // txn2 delete 5-15 { - txn.GetWorkspace().StartStatement() + txn.GetWorkspace().StartStatement("") require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -1662,7 +1662,7 @@ func Test_CNTransferTombstoneObjects(t *testing.T) { _, _, cnTxnOp, err = p.D.GetTable(ctx, databaseName, tableName) require.NoError(t, err) - cnTxnOp.GetWorkspace().StartStatement() + cnTxnOp.GetWorkspace().StartStatement("") err = cnTxnOp.GetWorkspace().IncrStatementID(ctx, false) require.NoError(t, err) } From 59e375ad8f9bd260944f28378a2f127bcaf422e3 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Thu, 27 Mar 2025 13:42:30 +0800 Subject: [PATCH 2/4] fix function parameter --- pkg/frontend/mysql_cmd_executor.go | 2 +- pkg/frontend/test/txn_mock.go | 3 ++- pkg/frontend/txn_test.go | 28 +++++++++++++---------- pkg/frontend/util_test.go | 4 ++-- pkg/sql/compile/sql_executor.go | 4 ++-- pkg/txn/client/types.go | 3 ++- pkg/vm/engine/disttae/types.go | 7 +++++- pkg/vm/engine/test/disttae_engine_test.go | 4 ++-- pkg/vm/engine/test/testutil/util.go | 2 +- pkg/vm/engine/test/workspace_test.go | 12 +++++----- 10 files changed, 40 insertions(+), 29 deletions(-) diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index dc3a7e8f5ed7d..bad59a31ae13d 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -2689,7 +2689,7 @@ func executeStmtWithWorkspace(ses FeSession, defer ses.ExitFPrint(FPExecStmtWithWorkspaceBeforeStart) //!!!NOTE!!!: statement management //2. start statement on workspace - txnOp.GetWorkspace().StartStatement(execCtx.stmt.String()) + txnOp.GetWorkspace().StartStatement(execCtx.stmt) //3. end statement on workspace // defer Start/End Statement management, called after finishTxnFunc() defer func() { diff --git a/pkg/frontend/test/txn_mock.go b/pkg/frontend/test/txn_mock.go index df4fbef1725a4..8d13d2aa653af 100644 --- a/pkg/frontend/test/txn_mock.go +++ b/pkg/frontend/test/txn_mock.go @@ -13,6 +13,7 @@ import ( lock "github.com/matrixorigin/matrixone/pkg/pb/lock" timestamp "github.com/matrixorigin/matrixone/pkg/pb/timestamp" txn "github.com/matrixorigin/matrixone/pkg/pb/txn" + tree "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" client "github.com/matrixorigin/matrixone/pkg/txn/client" rpc "github.com/matrixorigin/matrixone/pkg/txn/rpc" ) @@ -1238,7 +1239,7 @@ func (mr *MockWorkspaceMockRecorder) SetHaveDDL(flag interface{}) *gomock.Call { } // StartStatement mocks base method. -func (m *MockWorkspace) StartStatement(arg0 string) { +func (m *MockWorkspace) StartStatement(arg0 tree.Statement) { m.ctrl.T.Helper() m.ctrl.Call(m, "StartStatement", arg0) } diff --git a/pkg/frontend/txn_test.go b/pkg/frontend/txn_test.go index 73adcb4bbf7c0..2f25463585cbe 100644 --- a/pkg/frontend/txn_test.go +++ b/pkg/frontend/txn_test.go @@ -74,8 +74,12 @@ func newTestWorkspace() *testWorkspace { return &testWorkspace{} } -func (txn *testWorkspace) StartStatement(sql string) { +func (txn *testWorkspace) StartStatement(stmt tree.Statement) { if txn.start { + var sql string + if stmt != nil { + sql = stmt.String() + } panic(fmt.Sprintf("BUG: StartStatement called twice, sql: %s", sql)) } txn.start = true @@ -177,7 +181,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement("") + wsp.StartStatement(nil) wsp.EndStatement() }, convey.ShouldNotPanic, @@ -196,8 +200,8 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement("") - wsp.StartStatement("") + wsp.StartStatement(nil) + wsp.StartStatement(nil) }, convey.ShouldPanic, ) @@ -217,7 +221,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement("") + wsp.StartStatement(nil) err := wsp.IncrStatementID(context.TODO(), false) convey.So(err, convey.ShouldBeNil) //incr twice @@ -231,7 +235,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement("") + wsp.StartStatement(nil) err := wsp.RollbackLastStatement(context.TODO()) convey.So(err, convey.ShouldBeNil) }, @@ -242,7 +246,7 @@ func TestWorkspace(t *testing.T) { convey.So( func() { wsp := newTestWorkspace() - wsp.StartStatement("") + wsp.StartStatement(nil) err := wsp.IncrStatementID(context.TODO(), false) convey.So(err, convey.ShouldBeNil) err = wsp.RollbackLastStatement(context.TODO()) @@ -484,7 +488,7 @@ func Test_rollbackStatement(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement("") + txnOp.GetWorkspace().StartStatement(nil) err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -514,7 +518,7 @@ func Test_rollbackStatement(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement("") + txnOp.GetWorkspace().StartStatement(nil) err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -671,7 +675,7 @@ func Test_rollbackStatement5(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement("") + txnOp.GetWorkspace().StartStatement(nil) err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -710,7 +714,7 @@ func Test_rollbackStatement6(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement("") + txnOp.GetWorkspace().StartStatement(nil) err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} @@ -745,7 +749,7 @@ func Test_rollbackStatement6(t *testing.T) { NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse) convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue) //called incrStatement - txnOp.GetWorkspace().StartStatement("") + txnOp.GetWorkspace().StartStatement(nil) err = txnOp.GetWorkspace().IncrStatementID(ctx, false) convey.So(err, convey.ShouldBeNil) ec.stmt = &tree.Insert{} diff --git a/pkg/frontend/util_test.go b/pkg/frontend/util_test.go index f01e763bbbe72..6a1f5b95b2528 100644 --- a/pkg/frontend/util_test.go +++ b/pkg/frontend/util_test.go @@ -621,7 +621,7 @@ func TestGetExprValue(t *testing.T) { ws.EXPECT().IncrStatementID(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ws.EXPECT().IncrSQLCount().AnyTimes() ws.EXPECT().GetSQLCount().AnyTimes() - ws.EXPECT().StartStatement("").AnyTimes() + ws.EXPECT().StartStatement(nil).AnyTimes() ws.EXPECT().EndStatement().AnyTimes() ws.EXPECT().GetSnapshotWriteOffset().Return(0).AnyTimes() ws.EXPECT().UpdateSnapshotWriteOffset().AnyTimes() @@ -729,7 +729,7 @@ func TestGetExprValue(t *testing.T) { ws := mock_frontend.NewMockWorkspace(ctrl) ws.EXPECT().IncrStatementID(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ws.EXPECT().StartStatement("").AnyTimes() + ws.EXPECT().StartStatement(nil).AnyTimes() ws.EXPECT().EndStatement().AnyTimes() ws.EXPECT().GetSnapshotWriteOffset().Return(0).AnyTimes() ws.EXPECT().UpdateSnapshotWriteOffset().AnyTimes() diff --git a/pkg/sql/compile/sql_executor.go b/pkg/sql/compile/sql_executor.go index 1461882e645ba..f68a4ff895885 100644 --- a/pkg/sql/compile/sql_executor.go +++ b/pkg/sql/compile/sql_executor.go @@ -99,7 +99,7 @@ func (s *sqlExecutor) NewTxnOperator(ctx context.Context) client.TxnOperator { return nil } } - opts.Txn().GetWorkspace().StartStatement("") + opts.Txn().GetWorkspace().StartStatement(nil) opts.Txn().GetWorkspace().IncrStatementID(ctx, false) return opts.Txn() } @@ -282,7 +282,7 @@ func (exec *txnExecutor) Exec( // maybe we should fix it. txnOp := exec.opts.Txn() if txnOp != nil && !exec.opts.DisableIncrStatement() { - txnOp.GetWorkspace().StartStatement(sql) + txnOp.GetWorkspace().StartStatement(nil) defer func() { txnOp.GetWorkspace().EndStatement() }() diff --git a/pkg/txn/client/types.go b/pkg/txn/client/types.go index 9799ae4abf7f7..e11f55f0aafd9 100644 --- a/pkg/txn/client/types.go +++ b/pkg/txn/client/types.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/lock" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/pb/txn" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/txn/rpc" ) @@ -250,7 +251,7 @@ type Workspace interface { Readonly() bool // StartStatement tag a statement is running - StartStatement(string) + StartStatement(tree.Statement) // EndStatement tag end a statement is completed EndStatement() diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 7d9b35accdad3..6961885c0c709 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -19,6 +19,7 @@ import ( "context" "encoding/hex" "fmt" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "math" "strconv" "sync" @@ -525,8 +526,12 @@ func (txn *Transaction) PPString() string { stringifySlice(txn.transfer.timestamps, func(a any) string { t := a.(timestamp.Timestamp); return t.DebugString() })) } -func (txn *Transaction) StartStatement(sql string) { +func (txn *Transaction) StartStatement(stmt tree.Statement) { if txn.startStatementCalled { + var sql string + if stmt != nil { + sql = stmt.String() + } logutil.Fatal("BUG: StartStatement called twice", zap.String("txn", hex.EncodeToString(txn.op.Txn().ID)), zap.String("SQL", sql), diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index 8d9429934e024..7d8e79d4100ec 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -163,7 +163,7 @@ func TestSystemDB1(t *testing.T) { txnop = p.StartCNTxn() dbs, err = p.D.Engine.Databases(p.Ctx, txnop) require.NoError(t, err) - txnop.GetWorkspace().StartStatement("") + txnop.GetWorkspace().StartStatement(nil) require.Equal(t, 2+1, len(dbs)) txn, err := p.T.StartTxn() @@ -561,7 +561,7 @@ func TestColumnsTransfer(t *testing.T) { require.NoError(t, txnop.Commit(p.Ctx)) txnop = p.StartCNTxn() - txnop.GetWorkspace().StartStatement("") + txnop.GetWorkspace().StartStatement(nil) p.DeleteTableInDB(txnop, "db", schema2.Name) txn, _ := tae.StartTxn(nil) diff --git a/pkg/vm/engine/test/testutil/util.go b/pkg/vm/engine/test/testutil/util.go index 0c46f7b0dabaf..fff60016da1fb 100644 --- a/pkg/vm/engine/test/testutil/util.go +++ b/pkg/vm/engine/test/testutil/util.go @@ -444,7 +444,7 @@ func WriteToRelation( bat *batch.Batch, isDelete, toEndStatement bool, ) (err error) { - txn.GetWorkspace().StartStatement("") + txn.GetWorkspace().StartStatement(nil) if isDelete { err = relation.Delete(ctx, bat, catalog2.Row_ID) } else { diff --git a/pkg/vm/engine/test/workspace_test.go b/pkg/vm/engine/test/workspace_test.go index 7295fa6e9af01..934a348fc8595 100644 --- a/pkg/vm/engine/test/workspace_test.go +++ b/pkg/vm/engine/test/workspace_test.go @@ -809,7 +809,7 @@ func Test_BasicRollbackStatement(t *testing.T) { require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat1, false, true)) - txn.GetWorkspace().StartStatement("") + txn.GetWorkspace().StartStatement(nil) require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -921,7 +921,7 @@ func Test_BasicRollbackStatementS3(t *testing.T) { require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat1, false, true)) - txn.GetWorkspace().StartStatement("") + txn.GetWorkspace().StartStatement(nil) require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -1033,7 +1033,7 @@ func Test_RollbackDeleteAndDrop(t *testing.T) { txnop = p.StartCNTxn() exec := v.(executor.SQLExecutor) execopts := executor.Options{}.WithTxn(txnop).WithDisableIncrStatement() - txnop.GetWorkspace().StartStatement("") + txnop.GetWorkspace().StartStatement(nil) txnop.GetWorkspace().IncrStatementID(p.Ctx, false) dropTable := func() { _, err := exec.Exec(p.Ctx, "delete from db.test3 where mock_1 = 0", execopts) @@ -1182,7 +1182,7 @@ func Test_MultiTxnRollbackStatement(t *testing.T) { { require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat2, false, true)) - txn.GetWorkspace().StartStatement("") + txn.GetWorkspace().StartStatement(nil) require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -1357,7 +1357,7 @@ func Test_MultiTxnRollbackStatementS3(t *testing.T) { // txn2 delete 5-15 { - txn.GetWorkspace().StartStatement("") + txn.GetWorkspace().StartStatement(nil) require.NoError(t, relation.Write(ctx, bat2)) require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx)) require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false)) @@ -1662,7 +1662,7 @@ func Test_CNTransferTombstoneObjects(t *testing.T) { _, _, cnTxnOp, err = p.D.GetTable(ctx, databaseName, tableName) require.NoError(t, err) - cnTxnOp.GetWorkspace().StartStatement("") + cnTxnOp.GetWorkspace().StartStatement(nil) err = cnTxnOp.GetWorkspace().IncrStatementID(ctx, false) require.NoError(t, err) } From 3661438c1431b20b03ea616a61af2578fe7dbdb1 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Thu, 27 Mar 2025 14:15:49 +0800 Subject: [PATCH 3/4] fix ut --- pkg/sql/compile/compile_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/sql/compile/compile_test.go b/pkg/sql/compile/compile_test.go index e6bb2496efe6f..b820b2b985027 100644 --- a/pkg/sql/compile/compile_test.go +++ b/pkg/sql/compile/compile_test.go @@ -136,10 +136,10 @@ func (w *Ws) Adjust(_ uint64) error { return nil } -func (w *Ws) StartStatement() {} -func (w *Ws) EndStatement() {} -func (w *Ws) IncrSQLCount() {} -func (w *Ws) GetSQLCount() uint64 { return 0 } +func (w *Ws) StartStatement(statement tree.Statement) {} +func (w *Ws) EndStatement() {} +func (w *Ws) IncrSQLCount() {} +func (w *Ws) GetSQLCount() uint64 { return 0 } func (w *Ws) CloneSnapshotWS() client.Workspace { return nil From 0aac63ad5b7be9156557b7e6bcd816be448dae0d Mon Sep 17 00:00:00 2001 From: LiuBo Date: Thu, 27 Mar 2025 17:00:20 +0800 Subject: [PATCH 4/4] fix ut --- pkg/vm/engine/disttae/txn_test.go | 17 +++++++++++++++++ pkg/vm/engine/disttae/types.go | 14 +++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/pkg/vm/engine/disttae/txn_test.go b/pkg/vm/engine/disttae/txn_test.go index 61de4a5a40025..e1d9ec3a805d6 100644 --- a/pkg/vm/engine/disttae/txn_test.go +++ b/pkg/vm/engine/disttae/txn_test.go @@ -16,11 +16,15 @@ package disttae import ( "bytes" + "context" "sync" "testing" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/sql/parsers" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect" + "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/stretchr/testify/require" ) @@ -62,3 +66,16 @@ func Test_GetUncommittedS3Tombstone(t *testing.T) { return true }) } + +func TestTransaction_StartStatement(t *testing.T) { + var txn Transaction + var fn func() + ctx := context.Background() + txn.op, fn = client.NewTestTxnOperator(ctx) + defer fn() + txn.StartStatement(nil) + stmts, err := parsers.Parse(ctx, dialect.MYSQL, "show databases", 0) + require.NoError(t, err) + require.Equal(t, len(stmts), 1) + txn.StartStatement(stmts[0]) +} diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 6961885c0c709..869c923706985 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -18,8 +18,8 @@ import ( "bytes" "context" "encoding/hex" + "flag" "fmt" - "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "math" "strconv" "sync" @@ -45,6 +45,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/timestamp" qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client" "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/txn/trace" "github.com/matrixorigin/matrixone/pkg/udf" @@ -530,9 +532,15 @@ func (txn *Transaction) StartStatement(stmt tree.Statement) { if txn.startStatementCalled { var sql string if stmt != nil { - sql = stmt.String() + fmtCtx := tree.NewFmtCtx(dialect.MYSQL, tree.WithQuoteString(true)) + stmt.Format(fmtCtx) + sql = fmtCtx.String() + } + log := logutil.Fatal + if flag.Lookup("test.v") != nil { + log = logutil.Error } - logutil.Fatal("BUG: StartStatement called twice", + log("BUG: StartStatement called twice", zap.String("txn", hex.EncodeToString(txn.op.Txn().ID)), zap.String("SQL", sql), )