From 6040bc18bb76b6ec32a158321d277e7f97b30781 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 17:17:14 +0800 Subject: [PATCH 01/12] update 1 --- pkg/objectio/ioutil/reader.go | 14 -------------- pkg/objectio/reader.go | 16 ---------------- 2 files changed, 30 deletions(-) diff --git a/pkg/objectio/ioutil/reader.go b/pkg/objectio/ioutil/reader.go index 25aa7aeaef96a..5b9f7b5c252e5 100644 --- a/pkg/objectio/ioutil/reader.go +++ b/pkg/objectio/ioutil/reader.go @@ -288,20 +288,6 @@ func (r *BlockReader) LoadOneBF( return r.reader.ReadOneBF(ctx, blk) } -func (r *BlockReader) LoadAllBF( - ctx context.Context, -) (objectio.BloomFilter, uint32, error) { - return r.reader.ReadAllBF(ctx) -} - -func (r *BlockReader) GetObjectName() *objectio.ObjectName { - return r.reader.GetObjectName() -} - func (r *BlockReader) GetName() string { return r.reader.GetName() } - -func (r *BlockReader) GetObjectReader() *objectio.ObjectReader { - return r.reader -} diff --git a/pkg/objectio/reader.go b/pkg/objectio/reader.go index 3d2c40aaee1b4..61bab75e86caf 100644 --- a/pkg/objectio/reader.go +++ b/pkg/objectio/reader.go @@ -251,22 +251,6 @@ func (r *objectReaderV1) ReadOneBF( return bf, size, nil } -func (r *objectReaderV1) ReadAllBF( - ctx context.Context, -) (bfs BloomFilter, size uint32, err error) { - var metaHeader ObjectMeta - var buf []byte - if metaHeader, err = r.ReadMeta(ctx, nil); err != nil { - return - } - meta := metaHeader.MustDataMeta() - extent := meta.BlockHeader().BFExtent() - if buf, err = ReadBloomFilter(ctx, r.name, &extent, r.dataReadPolicy, r.fs); err != nil { - return - } - return buf, extent.OriginSize(), nil -} - func (r *objectReaderV1) ReadExtent( ctx context.Context, extent Extent, From 41a6d5997a3528f70a1128d732a7bb16b8a433b4 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 17:22:10 +0800 Subject: [PATCH 02/12] update 2 --- pkg/vm/engine/disttae/txn_table.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 0e7d7ef1fc714..81d63a8431244 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -1599,9 +1599,10 @@ func (tbl *txnTable) rewriteObjectByDeletion( fileName string ) - objectio.ForeachBlkInObjStatsList(true, nil, + objectio.ForeachBlkInObjStatsList( + true, + nil, func(blk objectio.BlockInfo, blkMeta objectio.BlockObject) bool { - del := deletes[blk.BlockID] slices.Sort(del) if bat, err = blockio.BlockCompactionRead( @@ -1628,7 +1629,9 @@ func (tbl *txnTable) rewriteObjectByDeletion( return true - }, obj) + }, + obj, + ) if err != nil { return nil, fileName, err From 0b11cdaca0bc526c1d99a2258f85b0e302fab075 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 17:30:59 +0800 Subject: [PATCH 03/12] update 3 --- pkg/vm/engine/disttae/txn_table.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 81d63a8431244..cd3cbbc1d48e4 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -1575,7 +1575,7 @@ func (tbl *txnTable) ensureSeqnumsAndTypesExpectRowid() { func (tbl *txnTable) rewriteObjectByDeletion( ctx context.Context, obj objectio.ObjectStats, - deletes map[objectio.Blockid][]int64, + blockDeletes map[objectio.Blockid][]int64, ) (*batch.Batch, string, error) { proc := tbl.proc.Load() @@ -1603,19 +1603,20 @@ func (tbl *txnTable) rewriteObjectByDeletion( true, nil, func(blk objectio.BlockInfo, blkMeta objectio.BlockObject) bool { - del := deletes[blk.BlockID] - slices.Sort(del) + deletes := blockDeletes[blk.BlockID] + slices.Sort(deletes) if bat, err = blockio.BlockCompactionRead( tbl.getTxn().proc.Ctx, blk.MetaLoc[:], - del, + deletes, tbl.seqnums, tbl.typs, tbl.getTxn().engine.fs, - tbl.getTxn().proc.GetMPool()); err != nil { - + tbl.getTxn().proc.GetMPool(), + ); err != nil { return false } + defer bat.Clean(tbl.getTxn().proc.GetMPool()) if bat.RowCount() == 0 { return true @@ -1625,10 +1626,7 @@ func (tbl *txnTable) rewriteObjectByDeletion( return false } - bat.Clean(tbl.getTxn().proc.GetMPool()) - return true - }, obj, ) From f5383d359f7bedf8add2bf7d984f181d20898ad9 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 17:41:57 +0800 Subject: [PATCH 04/12] update 4 --- pkg/vm/engine/tae/blockio/read.go | 35 ++++++++++++------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index a18b53adcc014..e2d17bf22257c 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -16,9 +16,10 @@ package blockio import ( "context" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "time" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" + "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -295,33 +296,23 @@ func BlockCompactionRead( return nil, err } defer release() - // why comment? the cacheVector will be free when exit this function - //if len(deletes) == 0 { - // result := batch.NewWithSize(len(seqnums)) - // for i := range cacheVectors { - // result.Vecs[i] = &cacheVectors[i] - // } - // result.SetRowCount(result.Vecs[0].Length()) - // return result, nil - //} + result := batch.NewWithSize(len(seqnums)) for i, col := range cacheVectors { - typ := *col.GetType() - result.Vecs[i] = vector.NewVec(typ) - if err = vector.GetUnionAllFunction(typ, mp)(result.Vecs[i], &col); err != nil { - break + result.Vecs[i] = vector.NewVec(*col.GetType()) + if err = result.Vecs[i].UnionBatch( + &col, + 0, + col.Length(), + nil, + mp, + ); err != nil { + result.Clean(mp) + return nil, err } result.Vecs[i].Shrink(deletes, true) } - if err != nil { - for _, col := range result.Vecs { - if col != nil { - col.Free(mp) - } - } - return nil, err - } result.SetRowCount(result.Vecs[0].Length()) return result, nil } From 0d42bdec50e6f7ddf0e691743492fb22294bec03 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 17:49:37 +0800 Subject: [PATCH 05/12] update 5 --- pkg/vm/engine/tae/blockio/read.go | 20 ++------------------ pkg/vm/engine/tae/containers/utils.go | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index e2d17bf22257c..0f6cd43218e1f 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -297,24 +297,8 @@ func BlockCompactionRead( } defer release() - result := batch.NewWithSize(len(seqnums)) - for i, col := range cacheVectors { - result.Vecs[i] = vector.NewVec(*col.GetType()) - if err = result.Vecs[i].UnionBatch( - &col, - 0, - col.Length(), - nil, - mp, - ); err != nil { - result.Clean(mp) - return nil, err - } - result.Vecs[i].Shrink(deletes, true) - } - - result.SetRowCount(result.Vecs[0].Length()) - return result, nil + ret, err := containers.VectorsCopyToBatch(cacheVectors, mp) + return ret, err } func windowCNBatch(bat *batch.Batch, start, end uint64) error { diff --git a/pkg/vm/engine/tae/containers/utils.go b/pkg/vm/engine/tae/containers/utils.go index fc07f792a559a..e5cc8401bf0b6 100644 --- a/pkg/vm/engine/tae/containers/utils.go +++ b/pkg/vm/engine/tae/containers/utils.go @@ -24,6 +24,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" movec "github.com/matrixorigin/matrixone/pkg/container/vector" ) @@ -1145,3 +1146,29 @@ func DedupSortedBatches( } return nil } + +func VectorsCopyToBatch( + vecs Vectors, + mp *mpool.MPool, +) (bat *batch.Batch, err error) { + bat = batch.NewWithSize(len(vecs)) + if len(vecs) == 0 { + return + } + for i, vec := range vecs { + bat.Vecs[i] = vector.NewVec(*vec.GetType()) + if err = bat.Vecs[i].UnionBatch( + &vec, + 0, + vec.Length(), + nil, + mp, + ); err != nil { + bat.Clean(mp) + bat = nil + return + } + } + bat.SetRowCount(bat.Vecs[0].Length()) + return +} From 310b54c3d9aa01e1ca44a71c400f4f1345020293 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 18:26:13 +0800 Subject: [PATCH 06/12] update 6 --- pkg/vm/engine/disttae/txn_table.go | 15 +++++++++++++-- pkg/vm/engine/tae/blockio/read.go | 26 +++++++++++++++++--------- pkg/vm/engine/tae/containers/utils.go | 12 +++++------- 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index cd3cbbc1d48e4..a7b7683800d63 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -1599,24 +1599,35 @@ func (tbl *txnTable) rewriteObjectByDeletion( fileName string ) + defer func() { + if bat != nil { + bat.Clean(proc.Mp()) + } + }() + objectio.ForeachBlkInObjStatsList( true, nil, func(blk objectio.BlockInfo, blkMeta objectio.BlockObject) bool { deletes := blockDeletes[blk.BlockID] slices.Sort(deletes) - if bat, err = blockio.BlockCompactionRead( + if bat == nil { + bat = batch.NewWithSize(len(tbl.seqnums)) + } + bat.CleanOnlyData() + + if err = blockio.CopyBlockData( tbl.getTxn().proc.Ctx, blk.MetaLoc[:], deletes, tbl.seqnums, tbl.typs, + bat, tbl.getTxn().engine.fs, tbl.getTxn().proc.GetMPool(), ); err != nil { return false } - defer bat.Clean(tbl.getTxn().proc.GetMPool()) if bat.RowCount() == 0 { return true diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index 0f6cd43218e1f..f5ecbd3ab1dc4 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -278,27 +278,35 @@ func BlockDataRead( return nil } -func BlockCompactionRead( +func CopyBlockData( ctx context.Context, location objectio.Location, deletes []int64, seqnums []uint16, colTypes []types.Type, + outputBat *batch.Batch, fs fileservice.FileService, mp *mpool.MPool, -) (*batch.Batch, error) { - cacheVectors := containers.NewVectors(len(seqnums)) +) (err error) { + var ( + release func() + cacheVectors = containers.NewVectors(len(seqnums)) + ) - release, err := ioutil.LoadColumns( + if release, err = ioutil.LoadColumns( ctx, seqnums, colTypes, fs, location, cacheVectors, mp, fileservice.Policy(0), - ) - if err != nil { - return nil, err + ); err != nil { + return } defer release() - ret, err := containers.VectorsCopyToBatch(cacheVectors, mp) - return ret, err + if err = containers.VectorsCopyToBatch( + cacheVectors, outputBat, mp, + ); err != nil { + return + } + outputBat.Shrink(deletes, true) + return } func windowCNBatch(bat *batch.Batch, start, end uint64) error { diff --git a/pkg/vm/engine/tae/containers/utils.go b/pkg/vm/engine/tae/containers/utils.go index e5cc8401bf0b6..73ff4fe7ba55d 100644 --- a/pkg/vm/engine/tae/containers/utils.go +++ b/pkg/vm/engine/tae/containers/utils.go @@ -1149,26 +1149,24 @@ func DedupSortedBatches( func VectorsCopyToBatch( vecs Vectors, + outputBat *batch.Batch, mp *mpool.MPool, -) (bat *batch.Batch, err error) { - bat = batch.NewWithSize(len(vecs)) +) (err error) { if len(vecs) == 0 { return } for i, vec := range vecs { - bat.Vecs[i] = vector.NewVec(*vec.GetType()) - if err = bat.Vecs[i].UnionBatch( + outputBat.Vecs[i] = vector.NewVec(*vec.GetType()) + if err = outputBat.Vecs[i].UnionBatch( &vec, 0, vec.Length(), nil, mp, ); err != nil { - bat.Clean(mp) - bat = nil return } } - bat.SetRowCount(bat.Vecs[0].Length()) + outputBat.SetRowCount(outputBat.Vecs[0].Length()) return } From 886d7b25b1c7321642e65b58ce8e983f7be82ceb Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 19:07:56 +0800 Subject: [PATCH 07/12] update 7 --- pkg/frontend/test/engine_mock.go | 16 ----- pkg/vm/engine/disttae/txn_table.go | 74 -------------------- pkg/vm/engine/disttae/txn_table_delegate.go | 9 --- pkg/vm/engine/disttae/txn_table_partition.go | 4 -- pkg/vm/engine/memoryengine/table.go | 26 ------- pkg/vm/engine/types.go | 2 - 6 files changed, 131 deletions(-) diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index ac6a4746f1baf..5060d4099cd7b 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -1187,22 +1187,6 @@ func (mr *MockRelationMockRecorder) GetEngineType() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEngineType", reflect.TypeOf((*MockRelation)(nil).GetEngineType)) } -// GetHideKeys mocks base method. -func (m *MockRelation) GetHideKeys(arg0 context.Context) ([]*engine.Attribute, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetHideKeys", arg0) - ret0, _ := ret[0].([]*engine.Attribute) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetHideKeys indicates an expected call of GetHideKeys. -func (mr *MockRelationMockRecorder) GetHideKeys(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHideKeys", reflect.TypeOf((*MockRelation)(nil).GetHideKeys), arg0) -} - -// GetNonAppendableObjectStats mocks base method. func (m *MockRelation) GetNonAppendableObjectStats(ctx context.Context) ([]objectio.ObjectStats, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetNonAppendableObjectStats", ctx) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index a7b7683800d63..6f23f723654c2 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -1006,68 +1006,6 @@ func (tbl *txnTable) collectUnCommittedDataObjs(txnOffset int) ([]objectio.Objec return unCommittedObjects, unCommittedObjNames } -//func (tbl *txnTable) collectDirtyBlocks( -// state *logtailreplay.PartitionState, -// uncommittedObjects []objectio.ObjectStats, -// txnOffset int, // Transaction writes offset used to specify the starting position for reading data. -//) map[types.Blockid]struct{} { -// dirtyBlks := make(map[types.Blockid]struct{}) -// //collect partitionState.dirtyBlocks which may be invisible to this txn into dirtyBlks. -// { -// iter := state.NewDirtyBlocksIter() -// for iter.Next() { -// entry := iter.Entry() -// //lazy load deletes for block. -// dirtyBlks[entry] = struct{}{} -// } -// iter.Close() -// -// } -// -// //only collect dirty blocks in PartitionState.blocks into dirtyBlks. -// for _, bid := range tbl.GetDirtyPersistedBlks(state) { -// dirtyBlks[bid] = struct{}{} -// } -// -// if tbl.getTxn().hasDeletesOnUncommitedObject() { -// ForeachBlkInObjStatsList(true, nil, func(blk objectio.BlockInfo, _ objectio.BlockObject) bool { -// if tbl.getTxn().hasUncommittedDeletesOnBlock(&blk.BlockID) { -// dirtyBlks[blk.BlockID] = struct{}{} -// } -// return true -// }, uncommittedObjects...) -// } -// -// if tbl.db.op.IsSnapOp() { -// txnOffset = tbl.getTxn().GetSnapshotWriteOffset() -// } -// -// tbl.getTxn().ForEachTableWrites( -// tbl.db.databaseId, -// tbl.tableId, -// txnOffset, -// func(entry Entry) { -// // the CN workspace can only handle `INSERT` and `DELETE` operations. Other operations will be skipped, -// // TODO Adjustments will be made here in the future -// if entry.typ == DELETE || entry.typ == DELETE_TXN { -// if entry.IsGeneratedByTruncate() { -// return -// } -// //deletes in tbl.writes maybe comes from PartitionState.rows or PartitionState.blocks. -// if entry.fileName == "" && -// entry.tableId != catalog.MO_DATABASE_ID && entry.tableId != catalog.MO_TABLES_ID && entry.tableId != catalog.MO_COLUMNS_ID { -// vs := vector.MustFixedColWithTypeCheck[types.Rowid](entry.bat.GetVector(0)) -// for _, v := range vs { -// id, _ := v.Decode() -// dirtyBlks[id] = struct{}{} -// } -// } -// } -// }) -// -// return dirtyBlks -//} - // the return defs has no rowid column func (tbl *txnTable) TableDefs(ctx context.Context) ([]engine.TableDef, error) { //return tbl.defs, nil @@ -1493,18 +1431,6 @@ func (tbl *txnTable) GetPrimaryKeys(ctx context.Context) ([]*engine.Attribute, e return attrs, nil } -func (tbl *txnTable) GetHideKeys(ctx context.Context) ([]*engine.Attribute, error) { - attrs := make([]*engine.Attribute, 0, 1) - attrs = append(attrs, &engine.Attribute{ - IsHidden: true, - IsRowId: true, - Name: objectio.PhysicalAddr_Attr, - Type: types.New(types.T_Rowid, 0, 0), - Primary: true, - }) - return attrs, nil -} - func (tbl *txnTable) Write(ctx context.Context, bat *batch.Batch) error { if tbl.db.op.IsSnapOp() { return moerr.NewInternalErrorNoCtx("write operation is not allowed in snapshot transaction") diff --git a/pkg/vm/engine/disttae/txn_table_delegate.go b/pkg/vm/engine/disttae/txn_table_delegate.go index 8331d7948e8b1..cb85a664393ac 100644 --- a/pkg/vm/engine/disttae/txn_table_delegate.go +++ b/pkg/vm/engine/disttae/txn_table_delegate.go @@ -903,15 +903,6 @@ func (tbl *txnTableDelegate) GetPrimaryKeys( return tbl.origin.GetPrimaryKeys(ctx) } -func (tbl *txnTableDelegate) GetHideKeys( - ctx context.Context, -) ([]*engine.Attribute, error) { - if tbl.partition.is { - return tbl.partition.tbl.GetHideKeys(ctx) - } - return tbl.origin.GetHideKeys(ctx) -} - func (tbl *txnTableDelegate) Write( ctx context.Context, bat *batch.Batch, diff --git a/pkg/vm/engine/disttae/txn_table_partition.go b/pkg/vm/engine/disttae/txn_table_partition.go index 83001b707fa1f..4474106e4212a 100644 --- a/pkg/vm/engine/disttae/txn_table_partition.go +++ b/pkg/vm/engine/disttae/txn_table_partition.go @@ -346,10 +346,6 @@ func (t *partitionTxnTable) GetPrimaryKeys(ctx context.Context) ([]*engine.Attri return t.primary.GetPrimaryKeys(ctx) } -func (t *partitionTxnTable) GetHideKeys(ctx context.Context) ([]*engine.Attribute, error) { - return t.primary.GetHideKeys(ctx) -} - func (t *partitionTxnTable) AddTableDef(context.Context, engine.TableDef) error { return nil } diff --git a/pkg/vm/engine/memoryengine/table.go b/pkg/vm/engine/memoryengine/table.go index 89777663decf4..2baddd4464593 100644 --- a/pkg/vm/engine/memoryengine/table.go +++ b/pkg/vm/engine/memoryengine/table.go @@ -487,32 +487,6 @@ func (t *Table) Write(ctx context.Context, data *batch.Batch) error { return nil } -func (t *Table) GetHideKeys(ctx context.Context) ([]*engine.Attribute, error) { - resps, err := DoTxnRequest[GetHiddenKeysResp]( - ctx, - t.txnOperator, - true, - t.engine.anyShard, - OpGetHiddenKeys, - &GetHiddenKeysReq{ - TableID: t.id, - }, - ) - if err != nil { - return nil, err - } - - resp := resps[0] - - // convert from []engine.Attribute to []*engine.Attribute - attrs := make([]*engine.Attribute, 0, len(resp.Attrs)) - for i := 0; i < len(resp.Attrs); i++ { - attrs = append(attrs, &resp.Attrs[i]) - } - - return attrs, nil -} - func (t *Table) GetTableID(ctx context.Context) uint64 { return uint64(t.id) } diff --git a/pkg/vm/engine/types.go b/pkg/vm/engine/types.go index a980c8aa85bc9..71fa925ad88d6 100644 --- a/pkg/vm/engine/types.go +++ b/pkg/vm/engine/types.go @@ -871,8 +871,6 @@ type Relation interface { GetPrimaryKeys(context.Context) ([]*Attribute, error) - GetHideKeys(context.Context) ([]*Attribute, error) - // Note: Write Will access Fileservice Write(context.Context, *batch.Batch) error From 4933b5adc2eefb126015e4d0b53bd6b782b17a02 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 19:46:10 +0800 Subject: [PATCH 08/12] update 8 --- pkg/frontend/test/engine_mock.go | 8 ----- pkg/vm/engine/disttae/txn_table.go | 7 ---- pkg/vm/engine/disttae/txn_table_delegate.go | 10 ------ pkg/vm/engine/disttae/txn_table_partition.go | 4 --- pkg/vm/engine/memoryengine/table.go | 35 -------------------- pkg/vm/engine/types.go | 2 -- 6 files changed, 66 deletions(-) diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index 5060d4099cd7b..f4a8cca66ad82 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -1451,14 +1451,6 @@ func (mr *MockRelationMockRecorder) TableRenameInTxn(ctx, constraint interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TableRenameInTxn", reflect.TypeOf((*MockRelation)(nil).TableRenameInTxn), ctx, constraint) } -// Update mocks base method. -func (m *MockRelation) Update(arg0 context.Context, arg1 *batch.Batch) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - // Update indicates an expected call of Update. func (mr *MockRelationMockRecorder) Update(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 6f23f723654c2..c62b5830030b9 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -1475,13 +1475,6 @@ func (tbl *txnTable) Write(ctx context.Context, bat *batch.Batch) error { return tbl.getTxn().dumpBatch(ctx, tbl.getTxn().GetSnapshotWriteOffset()) } -func (tbl *txnTable) Update(ctx context.Context, bat *batch.Batch) error { - if tbl.db.op.IsSnapOp() { - return moerr.NewInternalErrorNoCtx("update operation is not allowed in snapshot transaction") - } - return nil -} - func (tbl *txnTable) ensureSeqnumsAndTypesExpectRowid() { if tbl.seqnums != nil && tbl.typs != nil { return diff --git a/pkg/vm/engine/disttae/txn_table_delegate.go b/pkg/vm/engine/disttae/txn_table_delegate.go index cb85a664393ac..9e91681f0de51 100644 --- a/pkg/vm/engine/disttae/txn_table_delegate.go +++ b/pkg/vm/engine/disttae/txn_table_delegate.go @@ -913,16 +913,6 @@ func (tbl *txnTableDelegate) Write( return tbl.origin.Write(ctx, bat) } -func (tbl *txnTableDelegate) Update( - ctx context.Context, - bat *batch.Batch, -) error { - if tbl.partition.is { - return tbl.partition.tbl.Update(ctx, bat) - } - return tbl.origin.Update(ctx, bat) -} - func (tbl *txnTableDelegate) Delete( ctx context.Context, bat *batch.Batch, diff --git a/pkg/vm/engine/disttae/txn_table_partition.go b/pkg/vm/engine/disttae/txn_table_partition.go index 4474106e4212a..3a7a5e6213e41 100644 --- a/pkg/vm/engine/disttae/txn_table_partition.go +++ b/pkg/vm/engine/disttae/txn_table_partition.go @@ -428,10 +428,6 @@ func (t *partitionTxnTable) Write(context.Context, *batch.Batch) error { panic("BUG: cannot write data to partition primary table") } -func (t *partitionTxnTable) Update(context.Context, *batch.Batch) error { - panic("BUG: cannot update data to partition primary table") -} - func (t *partitionTxnTable) Delete(context.Context, *batch.Batch, string) error { panic("BUG: cannot delete data to partition primary table") } diff --git a/pkg/vm/engine/memoryengine/table.go b/pkg/vm/engine/memoryengine/table.go index 2baddd4464593..092d79f4311dc 100644 --- a/pkg/vm/engine/memoryengine/table.go +++ b/pkg/vm/engine/memoryengine/table.go @@ -417,41 +417,6 @@ func (t *Table) TableRenameInTxn(ctx context.Context, constraint [][]byte) error return nil } -func (t *Table) Update(ctx context.Context, data *batch.Batch) error { - data.SetRowCount(data.RowCount()) - shards, err := t.engine.shardPolicy.Batch( - ctx, - t.id, - t.TableDefs, - data, - getTNServices(t.engine.cluster), - ) - if err != nil { - return err - } - - for _, shard := range shards { - _, err := DoTxnRequest[UpdateResp]( - ctx, - t.txnOperator, - false, - thisShard(shard.Shard), - OpUpdate, - &UpdateReq{ - TableID: t.id, - DatabaseName: t.databaseName, - TableName: t.tableName, - Batch: shard.Batch, - }, - ) - if err != nil { - return err - } - } - - return nil -} - func (t *Table) Write(ctx context.Context, data *batch.Batch) error { data.SetRowCount(data.RowCount()) shards, err := t.engine.shardPolicy.Batch( diff --git a/pkg/vm/engine/types.go b/pkg/vm/engine/types.go index 71fa925ad88d6..6c9e2a8bb98c6 100644 --- a/pkg/vm/engine/types.go +++ b/pkg/vm/engine/types.go @@ -874,8 +874,6 @@ type Relation interface { // Note: Write Will access Fileservice Write(context.Context, *batch.Batch) error - Update(context.Context, *batch.Batch) error - // Delete(context.Context, *vector.Vector, string) error Delete(context.Context, *batch.Batch, string) error From 970847c88badf9ff35d2ec587dd3c2180b629d2c Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 20:06:55 +0800 Subject: [PATCH 09/12] update 9 --- pkg/frontend/test/engine_mock.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index f4a8cca66ad82..516959fbc6e65 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -1451,12 +1451,6 @@ func (mr *MockRelationMockRecorder) TableRenameInTxn(ctx, constraint interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TableRenameInTxn", reflect.TypeOf((*MockRelation)(nil).TableRenameInTxn), ctx, constraint) } -// Update indicates an expected call of Update. -func (mr *MockRelationMockRecorder) Update(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockRelation)(nil).Update), arg0, arg1) -} - // UpdateConstraint mocks base method. func (m *MockRelation) UpdateConstraint(arg0 context.Context, arg1 *engine.ConstraintDef) error { m.ctrl.T.Helper() From ad949c2676e32cc16f28ab8556cee8462b00c67c Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 20:43:57 +0800 Subject: [PATCH 10/12] update 10 --- pkg/frontend/session_test.go | 1 - pkg/util/test/cron_task_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/pkg/frontend/session_test.go b/pkg/frontend/session_test.go index e6e38a1143795..abe3553292bf0 100644 --- a/pkg/frontend/session_test.go +++ b/pkg/frontend/session_test.go @@ -355,7 +355,6 @@ func TestSession_TxnCompilerContext(t *testing.T) { table.EXPECT().GetTableDef(gomock.Any()).Return(&plan.TableDef{}).AnyTimes() table.EXPECT().CopyTableDef(gomock.Any()).Return(&plan.TableDef{}).AnyTimes() table.EXPECT().GetPrimaryKeys(gomock.Any()).Return(nil, nil).AnyTimes() - table.EXPECT().GetHideKeys(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().Stats(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().TableColumns(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().GetTableID(gomock.Any()).Return(uint64(10)).AnyTimes() diff --git a/pkg/util/test/cron_task_test.go b/pkg/util/test/cron_task_test.go index 56b04719db3ff..4476be3a26e3c 100644 --- a/pkg/util/test/cron_task_test.go +++ b/pkg/util/test/cron_task_test.go @@ -81,7 +81,6 @@ func TestCalculateStorageUsage(t *testing.T) { table.EXPECT().Ranges(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().TableDefs(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().GetPrimaryKeys(gomock.Any()).Return(nil, nil).AnyTimes() - table.EXPECT().GetHideKeys(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().TableColumns(gomock.Any()).Return(nil, nil).AnyTimes() table.EXPECT().GetTableID(gomock.Any()).Return(uint64(10)).AnyTimes() db := mock_frontend.NewMockDatabase(ctrl) From 591b2ef0fbd42ce901b36467072839325509715d Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 12 May 2025 21:24:46 +0800 Subject: [PATCH 11/12] update 11 --- pkg/vm/engine/tae/containers/utils.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/vm/engine/tae/containers/utils.go b/pkg/vm/engine/tae/containers/utils.go index 73ff4fe7ba55d..373d8c7ccd695 100644 --- a/pkg/vm/engine/tae/containers/utils.go +++ b/pkg/vm/engine/tae/containers/utils.go @@ -24,7 +24,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/container/vector" movec "github.com/matrixorigin/matrixone/pkg/container/vector" ) @@ -1156,7 +1155,7 @@ func VectorsCopyToBatch( return } for i, vec := range vecs { - outputBat.Vecs[i] = vector.NewVec(*vec.GetType()) + outputBat.Vecs[i] = movec.NewVec(*vec.GetType()) if err = outputBat.Vecs[i].UnionBatch( &vec, 0, From 871b3340530fbb065df196b2e18bde027ab02c05 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Tue, 13 May 2025 12:23:57 +0800 Subject: [PATCH 12/12] update 12 --- pkg/vm/engine/readutil/expr_filter.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/vm/engine/readutil/expr_filter.go b/pkg/vm/engine/readutil/expr_filter.go index 7b4d38a4d48b7..5e6f186121ab6 100644 --- a/pkg/vm/engine/readutil/expr_filter.go +++ b/pkg/vm/engine/readutil/expr_filter.go @@ -237,11 +237,11 @@ func CompileFilterExpr( switch exprImpl.F.Func.ObjName { case "or": highSelectivityHint = true - fastOps := make([]FastFilterOp, len(exprImpl.F.Args)) - loadOps := make([]LoadOp, len(exprImpl.F.Args)) - objectOps := make([]ObjectFilterOp, len(exprImpl.F.Args)) - blockOps := make([]BlockFilterOp, len(exprImpl.F.Args)) - seekOps := make([]SeekFirstBlockOp, len(exprImpl.F.Args)) + fastOps := make([]FastFilterOp, 0, len(exprImpl.F.Args)) + loadOps := make([]LoadOp, 0, len(exprImpl.F.Args)) + objectOps := make([]ObjectFilterOp, 0, len(exprImpl.F.Args)) + blockOps := make([]BlockFilterOp, 0, len(exprImpl.F.Args)) + seekOps := make([]SeekFirstBlockOp, 0, len(exprImpl.F.Args)) for idx := range exprImpl.F.Args { op1, op2, op3, op4, op5, can, hsh := CompileFilterExpr(exprImpl.F.Args[idx], tableDef, fs) @@ -330,11 +330,11 @@ func CompileFilterExpr( case "and": highSelectivityHint = true - fastOps := make([]FastFilterOp, len(exprImpl.F.Args)) - loadOps := make([]LoadOp, len(exprImpl.F.Args)) - objectOps := make([]ObjectFilterOp, len(exprImpl.F.Args)) - blockOps := make([]BlockFilterOp, len(exprImpl.F.Args)) - seekOps := make([]SeekFirstBlockOp, len(exprImpl.F.Args)) + fastOps := make([]FastFilterOp, 0, len(exprImpl.F.Args)) + loadOps := make([]LoadOp, 0, len(exprImpl.F.Args)) + objectOps := make([]ObjectFilterOp, 0, len(exprImpl.F.Args)) + blockOps := make([]BlockFilterOp, 0, len(exprImpl.F.Args)) + seekOps := make([]SeekFirstBlockOp, 0, len(exprImpl.F.Args)) for idx := range exprImpl.F.Args { op1, op2, op3, op4, op5, can, hsh := CompileFilterExpr(exprImpl.F.Args[idx], tableDef, fs)