Skip to content

Commit 1eddca0

Browse files
committed
Added reading from offset to parquet file reader
1 parent f0b23d6 commit 1eddca0

File tree

1 file changed

+32
-1
lines changed

1 file changed

+32
-1
lines changed

acronis-db-bench/dataset-source/parquet.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ type ParquetFileDataSource struct {
2121

2222
currentRecord arrow.Record
2323
currentOffset int
24+
globalOffset int64
25+
rowsToSkip int64
2426
}
2527

26-
func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) {
28+
func NewParquetFileDataSource(filePath string, offset int64) (*ParquetFileDataSource, error) {
2729
var rdr, err = file.OpenParquetFile(filePath, true)
2830
if err != nil {
2931
return nil, fmt.Errorf("error opening parquet file: %v", err)
@@ -62,6 +64,7 @@ func NewParquetFileDataSource(filePath string) (*ParquetFileDataSource, error) {
6264
columns: columnNames,
6365
fileReader: rdr,
6466
recordReader: recordReader,
67+
rowsToSkip: offset,
6568
}, nil
6669
}
6770

@@ -70,6 +73,27 @@ func (ds *ParquetFileDataSource) GetColumnNames() []string {
7073
}
7174

7275
func (ds *ParquetFileDataSource) GetNextRow() ([]interface{}, error) {
76+
for ds.globalOffset < ds.rowsToSkip {
77+
if ds.currentRecord == nil {
78+
if !ds.recordReader.Next() {
79+
return nil, nil
80+
}
81+
ds.currentRecord = ds.recordReader.Record()
82+
}
83+
84+
remainingInRecord := ds.currentRecord.NumRows() - int64(ds.currentOffset)
85+
skipCount := min(ds.rowsToSkip-ds.globalOffset, remainingInRecord)
86+
87+
ds.currentOffset += int(skipCount)
88+
ds.globalOffset += skipCount
89+
90+
if int64(ds.currentOffset) >= ds.currentRecord.NumRows() {
91+
ds.currentRecord.Release()
92+
ds.currentRecord = nil
93+
ds.currentOffset = 0
94+
}
95+
}
96+
7397
if ds.currentRecord == nil {
7498
if ds.recordReader.Next() {
7599
ds.currentRecord = ds.recordReader.Record()
@@ -118,3 +142,10 @@ func (ds *ParquetFileDataSource) Close() {
118142
ds.recordReader.Release()
119143
ds.fileReader.Close()
120144
}
145+
146+
func min(a, b int64) int64 {
147+
if a < b {
148+
return a
149+
}
150+
return b
151+
}

0 commit comments

Comments
 (0)