Skip to content

Commit 4a3918b

Browse files
authored
Merge branch 'main' into add-ut-for-disttae-1
2 parents 7719e8a + 7605955 commit 4a3918b

32 files changed

+2402
-728
lines changed

pkg/datasync/consumer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ func TestConsumeEntries(t *testing.T) {
551551
err := c.consumeEntries(ctx, []logservice.LogRecord{
552552
{
553553
Lsn: 10,
554-
Data: make([]byte, 100),
554+
Data: dataWithValidVersion(make([]byte, 100)),
555555
},
556556
}, false)
557557
assert.NoError(t, err)
@@ -573,7 +573,7 @@ func TestConsumeEntries(t *testing.T) {
573573
err := c.consumeEntries(ctx, []logservice.LogRecord{
574574
{
575575
Lsn: 10,
576-
Data: make([]byte, 100),
576+
Data: dataWithValidVersion(make([]byte, 100)),
577577
},
578578
}, true)
579579
assert.Error(t, err)

pkg/datasync/entry.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package datasync
1616

1717
import (
18-
"bytes"
1918
"strings"
2019

2120
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -51,31 +50,19 @@ func getLocations(rec logservice.LogRecord, tag string) []string {
5150
logutil.Errorf("invalid data size %d", len(data))
5251
return nil
5352
}
54-
buffer := bytes.NewBuffer(data[dataHeaderSize:])
55-
m := &logservicedriver.Meta{}
56-
_, err := m.ReadFrom(buffer)
57-
if err != nil {
58-
logutil.Errorf("failed to read data from buffer: %v", err)
59-
return nil
60-
}
61-
if m.GetType() != logservicedriver.TNormal {
62-
return nil
63-
}
6453
var locations []string
65-
for range m.GetAddr() {
66-
e := entry.NewEmptyEntry()
67-
_, err := e.ReadFrom(buffer)
68-
if err != nil {
69-
logutil.Errorf("failed to read data from buffer: %v", err)
70-
return nil
71-
}
72-
ei := e.Entry.GetInfo().(*entry2.Info)
73-
payload := e.Entry.GetPayload()
54+
_, err := logservicedriver.DecodeLogEntry(data[headerSize+replicaIDSize:], func(en *entry.Entry) {
55+
ei := en.Entry.GetInfo().(*entry2.Info)
56+
payload := en.Entry.GetPayload()
7457
if ei.Group == wal.GroupPrepare {
7558
locations = append(locations, parseCommonFiles(payload, tag)...)
7659
} else if ei.Group == store.GroupFiles {
7760
locations = append(locations, parseMetaFiles(payload, tag)...)
7861
}
62+
})
63+
if err != nil {
64+
logutil.Errorf("decode logentry error %s", err.Error())
65+
return nil
7966
}
8067
return locations
8168
}

pkg/datasync/entry_test.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,24 @@ import (
4040
)
4141

4242
type mockEntry struct {
43-
entryType uint16
44-
version uint16
45-
meta logservicedriver.Meta
46-
entries []*driverEntry.Entry
47-
payloadSize uint64
43+
logservicedriver.V1Meta
44+
entryType uint16
45+
version uint16
46+
entries []*driverEntry.Entry
4847
}
4948

5049
func newMockEntry() *mockEntry {
5150
return &mockEntry{
5251
entryType: objectio.IOET_ObjMeta,
52+
version: logservicedriver.IOET_WALRecord_V1,
5353
}
5454
}
5555

5656
func (m *mockEntry) appendEntry(e *driverEntry.Entry) {
5757
m.entries = append(m.entries, e)
58-
m.meta.AddAddr(e.DSN, m.payloadSize)
59-
m.payloadSize += uint64(e.GetSize())
58+
sz := m.V1Meta.GetPayloadSize()
59+
m.V1Meta.AddAddr(e.DSN, sz)
60+
m.V1Meta.SetPayloadSize(sz + uint64(e.GetSize()))
6061
}
6162

6263
func (m *mockEntry) WriteTo(w io.Writer) (int64, error) {
@@ -69,12 +70,12 @@ func (m *mockEntry) WriteTo(w io.Writer) (int64, error) {
6970
return 0, err
7071
}
7172
n += 2
72-
nn, err := m.meta.WriteTo(w)
73+
nn, err := m.V1Meta.WriteTo(w)
7374
if err != nil {
7475
return 0, err
7576
}
7677
n += nn
77-
if m.meta.GetType() == logservicedriver.TNormal {
78+
if m.V1Meta.GetType() == logservicedriver.Cmd_Normal {
7879
for _, e := range m.entries {
7980
nn, err := e.WriteTo(w)
8081
if err != nil {
@@ -101,13 +102,13 @@ func runWithBaseEnv(fn func(cat *catalog.Catalog, txn txnif.AsyncTxn) error) err
101102
}
102103

103104
type parameter struct {
104-
metaType logservicedriver.MetaType
105+
cmdType logservicedriver.CmdType
105106
groupType uint32
106107
}
107108

108109
func newParameter() *parameter {
109110
return &parameter{
110-
metaType: logservicedriver.TNormal,
111+
cmdType: logservicedriver.Cmd_Normal,
111112
groupType: wal.GroupPrepare,
112113
}
113114
}
@@ -117,8 +118,8 @@ func (p *parameter) withGroupType(t uint32) *parameter {
117118
return p
118119
}
119120

120-
func (p *parameter) withMetaType(t logservicedriver.MetaType) *parameter {
121-
p.metaType = t
121+
func (p *parameter) withCmdType(t logservicedriver.CmdType) *parameter {
122+
p.cmdType = t
122123
return p
123124
}
124125

@@ -182,8 +183,10 @@ func generateCmdPayload(param parameter, loc objectio.Location) ([]byte, error)
182183
drEntry.Entry.PrepareWrite()
183184

184185
me := newMockEntry()
185-
me.meta.SetType(param.metaType)
186-
me.appendEntry(drEntry)
186+
me.V1Meta.SetType(param.cmdType)
187+
if param.cmdType == logservicedriver.Cmd_Normal {
188+
me.appendEntry(drEntry)
189+
}
187190
var buf bytes.Buffer
188191
_, err = me.WriteTo(&buf)
189192
if err != nil {
@@ -214,7 +217,7 @@ func generateCkpPayload(data []byte) ([]byte, error) {
214217
drEntry.Entry.PrepareWrite()
215218

216219
me := newMockEntry()
217-
me.meta.SetType(logservicedriver.TNormal)
220+
me.V1Meta.SetType(logservicedriver.Cmd_Normal)
218221
me.appendEntry(drEntry)
219222
var buf bytes.Buffer
220223
_, err = me.WriteTo(&buf)
@@ -244,6 +247,14 @@ func genRecord(payload []byte, upstreamLsn uint64) logservice.LogRecord {
244247
return rec
245248
}
246249

250+
func dataWithValidVersion(p []byte) []byte {
251+
if len(p) >= 16 {
252+
p[12] = 1
253+
p[14] = 1
254+
}
255+
return p
256+
}
257+
247258
func TestEntry_ParseLocation(t *testing.T) {
248259
t.Run("invalid record type", func(t *testing.T) {
249260
rec := logservice.LogRecord{
@@ -261,14 +272,14 @@ func TestEntry_ParseLocation(t *testing.T) {
261272

262273
t.Run("read buffer error", func(t *testing.T) {
263274
rec := logservice.LogRecord{
264-
Data: make([]byte, 22),
275+
Data: dataWithValidVersion(make([]byte, 22)),
265276
}
266277
assert.Equal(t, []string(nil), getLocations(rec, ""))
267278
})
268279

269280
t.Run("invalid meta type", func(t *testing.T) {
270281
p, err := generateCmdPayload(
271-
*newParameter().withMetaType(logservicedriver.TReplay),
282+
*newParameter().withCmdType(logservicedriver.Cmd_SkipDSN),
272283
genLocation(uuid.New(), 0, 0, 0, 0, 0),
273284
)
274285
assert.NoError(t, err)

pkg/datasync/producer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ func TestProducer_Start(t *testing.T) {
173173
withProducerStarted(t, rt, c, true, true, func(ctx context.Context, p Producer) {
174174
var wg sync.WaitGroup
175175
rec := c.GetLogRecord(16)
176+
dataWithValidVersion(rec.Data)
176177
w := newWrappedData(rec.Data, 0, &wg)
177178
wg.Add(1)
178179
p.Enqueue(ctx, w)
@@ -202,6 +203,7 @@ func TestProducer_Start(t *testing.T) {
202203
withProducerStarted(t, rt, c, true, false, func(ctx context.Context, p Producer) {
203204
var wg sync.WaitGroup
204205
rec := c.GetLogRecord(16)
206+
dataWithValidVersion(rec.Data)
205207
w := newWrappedData(rec.Data, 2, &wg)
206208
wg.Add(1)
207209
p.Enqueue(ctx, w)

pkg/fulltext/fulltext.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,10 @@ func ParsePatternInNLMode(pattern string) ([]*Pattern, error) {
826826
}
827827
}
828828

829+
if len(list) == 0 {
830+
return nil, moerr.NewInternalErrorNoCtx("Invalid input search string. search string onverted to empty pattern")
831+
}
832+
829833
// assign index
830834
idx := int32(0)
831835
for _, p := range list {

pkg/fulltext/fulltext_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,23 @@ func TestPatternFail(t *testing.T) {
195195
}
196196
}
197197

198+
func TestPatternNLFail(t *testing.T) {
199+
200+
tests := []TestCase{
201+
{
202+
pattern: "+[[[",
203+
},
204+
{
205+
pattern: "+''",
206+
},
207+
}
208+
209+
for _, c := range tests {
210+
_, err := PatternToString(c.pattern, int64(tree.FULLTEXT_NL))
211+
require.NotNil(t, err)
212+
}
213+
}
214+
198215
func TestFullTextNL(t *testing.T) {
199216

200217
pattern := "apple banana"

pkg/sql/plan/shuffle.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -659,9 +659,6 @@ func getShuffleDop(ncpu int, lencn int, hashmapSize float64) (dop int) {
659659
ncpu = 4
660660
}
661661
maxret := ncpu * 4
662-
if maxret > 64 {
663-
maxret = 64 // to avoid a hang bug, fix this in the future
664-
}
665662
// these magic number comes from hashmap resize factor. see hashtable/common.go, in maxElemCnt function
666663
ret1 := int(hashmapSize/float64(lencn)/12800000) + 1
667664
if ret1 >= maxret {

pkg/sql/plan/shuffle_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func TestGetShuffleDop(t *testing.T) {
209209
n = getShuffleDop(4, 2, 100000000000)
210210
require.Equal(t, 16, n)
211211
n = getShuffleDop(64, 1, 10000000000000)
212-
require.Equal(t, 64, n)
212+
require.Equal(t, 256, n)
213213

214214
n = getShuffleDop(16, 3, 1500000000)
215215
require.Equal(t, 40, n)
@@ -248,11 +248,11 @@ func TestGetShuffleDop(t *testing.T) {
248248
require.Equal(t, 16, n)
249249

250250
n = getShuffleDop(64, 1, 1500000000)
251-
require.Equal(t, 64, n)
251+
require.Equal(t, 256, n)
252252
n = getShuffleDop(64, 1, 150000000)
253-
require.Equal(t, 64, n)
253+
require.Equal(t, 256, n)
254254
n = getShuffleDop(64, 1, 15000000)
255-
require.Equal(t, 64, n)
255+
require.Equal(t, 128, n)
256256
n = getShuffleDop(64, 1, 1500000)
257257
require.Equal(t, 64, n)
258258

0 commit comments

Comments
 (0)