Skip to content

Commit 19eff6f

Browse files
authored
Fixed tests with data sources (#39)
* Added reading from offset to parquet file reader * Added end-to-end tests for parquet file reader * Added ability to stop generating values by plugins to benchmark faker * Moved system worker init logic from WorkerInitFunc to Init * Returned support of PostgreSQL to vector search tests * Fixed handling of empty generated values in generic insert worker * Fixed configuring parquet reader offset for readonly tests * Implemented ability to read parquet file in a circular way * Added feature of reading parquet datasources in curcular way for readonly tests * [Parquet Reader] Moved skip until offset logic to dedicated function * [Parquet Reader] Simplified internal logic by removing rowsToSkip structure field * [Parquet Reader] Fixed bug with circular reading with offset * [Parquet Reader] Added test case for circular reading with offset * [Parquet Reader] Added test cases * [Vector Search] Minor fixes * [Benchmark] Changed RandomizerPlugin interface by requiring return of all common generated values by GenCommonFakeValues * [DB Bench] Made DataSetSourcePlugin concurrent-friendly
1 parent 74f4e3f commit 19eff6f

File tree

10 files changed

+505
-153
lines changed

10 files changed

+505
-153
lines changed
2.06 KB
Binary file not shown.

acronis-db-bench/dataset-source/parquet.go

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ type ParquetFileDataSource struct {
2020
recordReader pqarrow.RecordReader
2121

2222
currentRecord arrow.Record
23-
currentOffset int
23+
currentOffset int64
24+
circular bool
2425
}
2526

26-
func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) {
27+
func NewParquetFileDataSource(filePath string, offset int64, circular bool) (*ParquetFileDataSource, error) {
2728
var rdr, err = file.OpenParquetFile(filePath, true)
2829
if err != nil {
2930
return nil, fmt.Errorf("error opening parquet file: %v", err)
@@ -58,11 +59,59 @@ func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) {
5859
return nil, fmt.Errorf("error creating record reader: %v", err)
5960
}
6061

61-
return &ParquetFileDataSource{
62+
var source = &ParquetFileDataSource{
6263
columns: columnNames,
6364
fileReader: rdr,
6465
recordReader: recordReader,
65-
}, nil
66+
circular: circular,
67+
}
68+
69+
if skipErr := source.skipUntilOffset(offset); skipErr != nil {
70+
return nil, skipErr
71+
}
72+
73+
return source, nil
74+
}
75+
76+
func min(a, b int64) int64 {
77+
if a < b {
78+
return a
79+
}
80+
return b
81+
}
82+
83+
func (ds *ParquetFileDataSource) skipUntilOffset(rowsToSkip int64) error {
84+
var skipCount int64 = 0
85+
86+
for ds.currentOffset < rowsToSkip {
87+
if ds.currentRecord == nil {
88+
if !ds.recordReader.Next() {
89+
if ds.circular {
90+
ds.resetReader()
91+
if !ds.recordReader.Next() {
92+
return fmt.Errorf("failed to read after reset")
93+
}
94+
rowsToSkip -= skipCount
95+
} else {
96+
return nil
97+
}
98+
}
99+
ds.currentRecord = ds.recordReader.Record()
100+
}
101+
102+
remainingInRecord := ds.currentRecord.NumRows() - ds.currentOffset
103+
skipCount = min(rowsToSkip-ds.currentOffset, remainingInRecord)
104+
105+
ds.currentOffset += skipCount
106+
107+
if ds.currentOffset >= ds.currentRecord.NumRows() {
108+
ds.currentRecord.Release()
109+
ds.currentRecord = nil
110+
ds.currentOffset = 0
111+
}
112+
}
113+
114+
return nil
66115
}
67116

68117
func (ds *ParquetFileDataSource) GetColumnNames() []string {
@@ -73,6 +122,12 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) {
73122
if ds.currentRecord == nil {
74123
if ds.recordReader.Next() {
75124
ds.currentRecord = ds.recordReader.Record()
125+
} else if ds.circular {
126+
ds.resetReader()
127+
if !ds.recordReader.Next() {
128+
return nil, fmt.Errorf("failed to read after reset")
129+
}
130+
ds.currentRecord = ds.recordReader.Record()
76131
} else {
77132
return nil, nil
78133
}
@@ -84,15 +139,15 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) {
84139
var colData interface{}
85140
switch specificArray := col.(type) {
86141
case *array.Int64:
87-
colData = specificArray.Value(ds.currentOffset)
142+
colData = specificArray.Value(int(ds.currentOffset))
88143
case *array.Float64:
89-
colData = specificArray.Value(ds.currentOffset)
144+
colData = specificArray.Value(int(ds.currentOffset))
90145
case *array.String:
91-
colData = specificArray.Value(ds.currentOffset)
146+
colData = specificArray.Value(int(ds.currentOffset))
92147
case *array.Binary:
93-
colData = specificArray.Value(ds.currentOffset)
148+
colData = specificArray.Value(int(ds.currentOffset))
94149
case *array.List:
95-
var beg, end = specificArray.ValueOffsets(ds.currentOffset)
150+
var beg, end = specificArray.ValueOffsets(int(ds.currentOffset))
96151
var values = array.NewSlice(specificArray.ListValues(), beg, end)
97152
switch specificNestedArray := values.(type) {
98153
case *array.Float32:
@@ -105,7 +160,7 @@ func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) {
105160
}
106161
ds.currentOffset++
107162

108-
if int64(ds.currentOffset) >= ds.currentRecord.NumRows() {
163+
if ds.currentOffset >= ds.currentRecord.NumRows() {
109164
ds.currentRecord.Release()
110165
ds.currentRecord = nil
111166
ds.currentOffset = 0
@@ -118,3 +173,37 @@ func (ds *ParquetFileDataSource) Close() {
118173
ds.recordReader.Release()
119174
ds.fileReader.Close()
120175
}
176+
177+
func (ds *ParquetFileDataSource) resetReader() {
178+
if ds.currentRecord != nil {
179+
ds.currentRecord.Release()
180+
ds.currentRecord = nil
181+
}
182+
ds.recordReader.Release()
183+
184+
mem := memory.NewGoAllocator()
185+
reader, err := pqarrow.NewFileReader(ds.fileReader, pqarrow.ArrowReadProperties{
186+
BatchSize: defaultBatchSize,
187+
}, mem)
188+
if err != nil {
189+
panic(fmt.Sprintf("error creating Arrow file reader: %v", err))
190+
}
191+
192+
var columns []int
193+
for i := 0; i < ds.fileReader.MetaData().Schema.NumColumns(); i++ {
194+
columns = append(columns, i)
195+
}
196+
197+
var rgrs []int
198+
for r := 0; r < ds.fileReader.NumRowGroups(); r++ {
199+
rgrs = append(rgrs, r)
200+
}
201+
202+
recordReader, err := reader.GetRecordReader(context.Background(), columns, rgrs)
203+
if err != nil {
204+
panic(fmt.Sprintf("error creating record reader: %v", err))
205+
}
206+
207+
ds.recordReader = recordReader
208+
ds.currentOffset = 0
209+
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package dataset_source
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestReadParquetWithOffset(t *testing.T) {
11+
// Test cases with different offset values
12+
testCases := []struct {
13+
name string
14+
offset int64
15+
expected int64 // Expected number of records to read
16+
}{
17+
{
18+
name: "Read from beginning",
19+
offset: 0,
20+
expected: 100, // File has 100 records
21+
},
22+
{
23+
name: "Read from middle",
24+
offset: 50,
25+
expected: 50,
26+
},
27+
{
28+
name: "Read from end",
29+
offset: 99,
30+
expected: 1,
31+
},
32+
{
33+
name: "Read beyond file size",
34+
offset: 200,
35+
expected: 0,
36+
},
37+
}
38+
39+
for _, tc := range testCases {
40+
t.Run(tc.name, func(t *testing.T) {
41+
// Create a new parquet reader
42+
reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false)
43+
require.NoError(t, err)
44+
defer reader.Close()
45+
46+
// Read all records
47+
var count int64
48+
for {
49+
row, err := reader.GetNextRow()
50+
if err != nil {
51+
break
52+
}
53+
if row == nil {
54+
break
55+
}
56+
count++
57+
}
58+
59+
// Verify the number of records read
60+
assert.Equal(t, tc.expected, count)
61+
})
62+
}
63+
}
64+
65+
func TestReadParquetWithOffsetAndLimit(t *testing.T) {
66+
// Test cases combining offset and limit
67+
testCases := []struct {
68+
name string
69+
offset int64
70+
limit int64
71+
expectedFirst int64
72+
expected int64
73+
}{
74+
{
75+
name: "Read first 10 records",
76+
offset: 0,
77+
limit: 10,
78+
expectedFirst: 1,
79+
expected: 10,
80+
},
81+
{
82+
name: "Read 20 records from middle",
83+
offset: 40,
84+
limit: 20,
85+
expectedFirst: 41,
86+
expected: 20,
87+
},
88+
{
89+
name: "Read beyond file size",
90+
offset: 90,
91+
limit: 20,
92+
expectedFirst: 91,
93+
expected: 10, // Only 10 records left from offset 90
94+
},
95+
}
96+
97+
for _, tc := range testCases {
98+
t.Run(tc.name, func(t *testing.T) {
99+
reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, false)
100+
require.NoError(t, err)
101+
defer reader.Close()
102+
103+
var count int64
104+
for {
105+
row, err := reader.GetNextRow()
106+
if err != nil {
107+
break
108+
}
109+
if row == nil {
110+
break
111+
}
112+
if count >= tc.limit {
113+
break
114+
}
115+
116+
assert.Equal(t, 1, len(row))
117+
118+
if castedRow, casted := row[0].(int64); casted {
119+
if count == 0 {
120+
assert.Equal(t, tc.expectedFirst, castedRow)
121+
}
122+
} else {
123+
t.Error("wrong data type")
124+
return
125+
}
126+
127+
count++
128+
}
129+
130+
assert.Equal(t, tc.expected, count)
131+
})
132+
}
133+
}
134+
135+
func TestReadParquetCircular(t *testing.T) {
136+
testCases := []struct {
137+
name string
138+
offset int64
139+
expectedRounds int // Number of complete file reads to perform
140+
expectedFirst int64
141+
expectedTotal int64 // Total number of records to read
142+
}{
143+
{
144+
name: "Read file twice",
145+
offset: 0,
146+
expectedRounds: 2,
147+
expectedFirst: 1,
148+
expectedTotal: 201, // 100 records + 101 records (including the first record of the second round)
149+
},
150+
{
151+
name: "Read file twice from middle",
152+
offset: 50,
153+
expectedRounds: 2,
154+
expectedFirst: 51,
155+
expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round)
156+
},
157+
{
158+
name: "Read file twice from middle, skipping file once",
159+
offset: 150,
160+
expectedRounds: 2,
161+
expectedFirst: 51,
162+
expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round)
163+
},
164+
{
165+
name: "Read file twice from middle, skipping file twice",
166+
offset: 250,
167+
expectedRounds: 2,
168+
expectedFirst: 51,
169+
expectedTotal: 201, // 50 records + 100 records + 51 records (including the first record of the second round)
170+
},
171+
}
172+
173+
for _, tc := range testCases {
174+
t.Run(tc.name, func(t *testing.T) {
175+
reader, err := NewParquetFileDataSource("numbers.parquet", tc.offset, true)
176+
require.NoError(t, err)
177+
defer reader.Close()
178+
179+
var count int64
180+
var rounds int
181+
var firstRow []interface{}
182+
var isFirstRow = true
183+
184+
for {
185+
row, err := reader.GetNextRow()
186+
if err != nil {
187+
t.Fatalf("Unexpected error: %v", err)
188+
}
189+
if row == nil {
190+
t.Fatalf("Unexpected nil row")
191+
}
192+
193+
// Store the first row we read
194+
if isFirstRow {
195+
firstRow = row
196+
isFirstRow = false
197+
}
198+
199+
// Check if we've completed a full round
200+
if count > 0 && count%100 == 0 && !isFirstRow {
201+
rounds++
202+
// Verify we're back at the beginning by comparing with first row
203+
if rounds > 1 {
204+
assert.Equal(t, firstRow, row, "Row should match after completing a round")
205+
}
206+
}
207+
208+
assert.Equal(t, 1, len(row))
209+
210+
if castedRow, casted := row[0].(int64); casted {
211+
if count == 0 {
212+
assert.Equal(t, tc.expectedFirst, castedRow)
213+
}
214+
} else {
215+
t.Error("wrong data type")
216+
return
217+
}
218+
219+
count++
220+
221+
// Stop after expected number of rounds
222+
if rounds >= tc.expectedRounds {
223+
break
224+
}
225+
}
226+
227+
assert.Equal(t, tc.expectedTotal, count, "Total number of records read")
228+
assert.Equal(t, tc.expectedRounds, rounds, "Number of complete rounds")
229+
})
230+
}
231+
}

0 commit comments

Comments
 (0)