Skip to content

Commit 68671df

Browse files
committed
Avoid using map based buffer
1 parent f524f61 commit 68671df

File tree

10 files changed

+208
-49
lines changed

10 files changed

+208
-49
lines changed

arrow/json/writer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7+
"io"
8+
79
"github.com/apache/arrow/go/arrow"
810
"github.com/apache/arrow/go/arrow/array"
9-
"io"
1011
)
1112

1213
var (

arrow/json/writer_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,15 +211,15 @@ func TestToGo(t *testing.T) {
211211
// uint8 TODO support this case
212212
// []uint8 will be converted base64-ed string
213213
/*
214-
{
215-
data: func() *array.Data {
216-
b := array.NewUint8Builder(pool)
217-
b.AppendValues([]uint8{0, 1, 2}, nil)
218-
return b.NewUint8Array().Data()
219-
}(),
220-
expected: []uint8{0, 1, 2},
221-
err: nil,
222-
},
214+
{
215+
data: func() *array.Data {
216+
b := array.NewUint8Builder(pool)
217+
b.AppendValues([]uint8{0, 1, 2}, nil)
218+
return b.NewUint8Array().Data()
219+
}(),
220+
expected: []uint8{0, 1, 2},
221+
err: nil,
222+
},
223223
*/
224224

225225
// uint16

columnifier/parquet.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package columnifier
22

33
import (
44
"bytes"
5+
"io/ioutil"
6+
57
"github.com/reproio/columnify/arrow/json"
68
"github.com/reproio/columnify/record"
79
"github.com/xitongsys/parquet-go/marshal"
8-
"io/ioutil"
910

1011
"github.com/reproio/columnify/parquet"
1112
"github.com/reproio/columnify/schema"

record/arrow.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package record
22

33
import (
44
"fmt"
5-
"github.com/apache/arrow/go/arrow"
6-
"github.com/apache/arrow/go/arrow/array"
7-
"github.com/apache/arrow/go/arrow/memory"
85
"strconv"
96
"time"
7+
8+
"github.com/apache/arrow/go/arrow"
9+
"github.com/apache/arrow/go/arrow/array"
1010
)
1111

1212
type WrappedRecord struct {
@@ -19,24 +19,18 @@ func NewWrappedRecord(b *array.RecordBuilder) *WrappedRecord {
1919
}
2020
}
2121

22-
func formatMapToArrowRecord(s *arrow.Schema, maps []map[string]interface{}) (*WrappedRecord, error) {
23-
pool := memory.NewGoAllocator()
24-
b := array.NewRecordBuilder(pool, s)
25-
defer b.Release()
26-
27-
for _, m := range maps {
28-
for i, f := range s.Fields() {
29-
if v, ok := m[f.Name]; ok {
30-
if _, err := formatMapToArrowField(b.Field(i), f.Type, f.Nullable, v); err != nil {
31-
return nil, err
32-
}
33-
} else {
34-
b.Field(i).AppendNull()
22+
func formatMapToArrowRecord(b *array.RecordBuilder, m map[string]interface{}) (*array.RecordBuilder, error) {
23+
for i, f := range b.Schema().Fields() {
24+
if v, ok := m[f.Name]; ok {
25+
if _, err := formatMapToArrowField(b.Field(i), f.Type, f.Nullable, v); err != nil {
26+
return nil, err
3527
}
28+
} else {
29+
b.Field(i).AppendNull()
3630
}
3731
}
3832

39-
return NewWrappedRecord(b), nil
33+
return b, nil
4034
}
4135

4236
func formatMapToArrowStruct(b *array.StructBuilder, s *arrow.StructType, m map[string]interface{}) (*array.StructBuilder, error) {

record/arrow_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,17 +430,23 @@ func TestNewArrowSchemaFromAvroSchema(t *testing.T) {
430430
},
431431
}
432432

433+
pool := memory.NewGoAllocator()
433434
for _, c := range cases {
434435
expectedRecord := c.expected(c.schema)
435436

436-
actual, err := formatMapToArrowRecord(c.schema.ArrowSchema, c.input)
437+
b := array.NewRecordBuilder(pool, c.schema.ArrowSchema)
438+
defer b.Release()
437439

438-
if err != c.err {
439-
t.Errorf("expected: %v, but actual: %v\n", c.err, err)
440+
for _, v := range c.input {
441+
_, err := formatMapToArrowRecord(b, v)
442+
if err != c.err {
443+
t.Errorf("expected: %v, but actual: %v\n", c.err, err)
444+
}
440445
}
441446

442-
if !reflect.DeepEqual(actual, expectedRecord) {
443-
t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, actual)
447+
r := NewWrappedRecord(b)
448+
if !reflect.DeepEqual(r, expectedRecord) {
449+
t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, r)
444450
}
445451
}
446452
}

record/avro.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"bytes"
55
"fmt"
66

7+
"github.com/apache/arrow/go/arrow/array"
8+
"github.com/apache/arrow/go/arrow/memory"
79
"github.com/reproio/columnify/schema"
810

911
"github.com/linkedin/goavro/v2"
@@ -57,10 +59,30 @@ func FormatAvroToMap(data []byte) ([]map[string]interface{}, error) {
5759
}
5860

5961
func FormatAvroToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) {
60-
maps, err := FormatAvroToMap(data)
62+
pool := memory.NewGoAllocator()
63+
b := array.NewRecordBuilder(pool, s.ArrowSchema)
64+
defer b.Release()
65+
66+
r, err := goavro.NewOCFReader(bytes.NewReader(data))
6167
if err != nil {
6268
return nil, err
6369
}
6470

65-
return formatMapToArrowRecord(s.ArrowSchema, maps)
71+
for r.Scan() {
72+
v, err := r.Read()
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
m, mapOk := v.(map[string]interface{})
78+
if !mapOk {
79+
return nil, fmt.Errorf("invalid value %v: %w", v, ErrUnconvertibleRecord)
80+
}
81+
82+
if _, err = formatMapToArrowRecord(b, flattenAvroUnion(m)); err != nil {
83+
return nil, err
84+
}
85+
}
86+
87+
return NewWrappedRecord(b), nil
6688
}

record/csv.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
"strconv"
88
"strings"
99

10+
"github.com/apache/arrow/go/arrow/array"
11+
"github.com/apache/arrow/go/arrow/memory"
12+
1013
"github.com/reproio/columnify/schema"
1114
)
1215

@@ -89,10 +92,62 @@ func FormatCsvToMap(s *schema.IntermediateSchema, data []byte, delimiter delimit
8992
}
9093

9194
func FormatCsvToArrow(s *schema.IntermediateSchema, data []byte, delimiter delimiter) (*WrappedRecord, error) {
92-
maps, err := FormatCsvToMap(s, data, delimiter)
95+
pool := memory.NewGoAllocator()
96+
b := array.NewRecordBuilder(pool, s.ArrowSchema)
97+
defer b.Release()
98+
99+
names, err := getFieldNamesFromSchema(s)
93100
if err != nil {
94101
return nil, err
95102
}
96103

97-
return formatMapToArrowRecord(s.ArrowSchema, maps)
104+
reader := csv.NewReader(strings.NewReader(string(data)))
105+
reader.Comma = rune(delimiter)
106+
107+
numFields := len(names)
108+
for {
109+
values, err := reader.Read()
110+
if err == io.EOF {
111+
break
112+
}
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
if numFields != len(values) {
118+
return nil, fmt.Errorf("incompleted value %v: %w", values, ErrUnconvertibleRecord)
119+
}
120+
121+
e := make(map[string]interface{})
122+
for i, v := range values {
123+
// bool
124+
if v != "0" && v != "1" {
125+
if vv, err := strconv.ParseBool(v); err == nil {
126+
e[names[i]] = vv
127+
continue
128+
}
129+
}
130+
131+
// int
132+
if vv, err := strconv.ParseInt(v, 10, 64); err == nil {
133+
e[names[i]] = vv
134+
continue
135+
}
136+
137+
// float
138+
if vv, err := strconv.ParseFloat(v, 64); err == nil {
139+
e[names[i]] = vv
140+
continue
141+
}
142+
143+
// others; to string
144+
e[names[i]] = v
145+
}
146+
147+
if _, err := formatMapToArrowRecord(b, e); err != nil {
148+
return nil, err
149+
}
150+
}
151+
152+
return NewWrappedRecord(b), nil
98153
}

record/jsonl.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"encoding/json"
55
"strings"
66

7+
"github.com/apache/arrow/go/arrow/array"
8+
"github.com/apache/arrow/go/arrow/memory"
9+
710
"github.com/reproio/columnify/schema"
811
)
912

@@ -29,10 +32,25 @@ func FormatJsonlToMap(data []byte) ([]map[string]interface{}, error) {
2932
}
3033

3134
func FormatJsonlToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) {
32-
maps, err := FormatJsonlToMap(data)
33-
if err != nil {
34-
return nil, err
35+
pool := memory.NewGoAllocator()
36+
b := array.NewRecordBuilder(pool, s.ArrowSchema)
37+
defer b.Release()
38+
39+
for _, l := range strings.Split(string(data), "\n") {
40+
if l == "" {
41+
// skip blank line
42+
continue
43+
}
44+
45+
var e map[string]interface{}
46+
if err := json.Unmarshal([]byte(l), &e); err != nil {
47+
return nil, err
48+
}
49+
50+
if _, err := formatMapToArrowRecord(b, e); err != nil {
51+
return nil, err
52+
}
3553
}
3654

37-
return formatMapToArrowRecord(s.ArrowSchema, maps)
55+
return NewWrappedRecord(b), nil
3856
}

record/ltsv.go

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"strconv"
55
"strings"
66

7+
"github.com/apache/arrow/go/arrow/array"
8+
"github.com/apache/arrow/go/arrow/memory"
9+
710
"github.com/reproio/columnify/schema"
811

912
"github.com/Songmu/go-ltsv"
@@ -54,10 +57,48 @@ func FormatLtsvToMap(data []byte) ([]map[string]interface{}, error) {
5457
}
5558

5659
func FormatLtsvToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) {
57-
maps, err := FormatLtsvToMap(data)
58-
if err != nil {
59-
return nil, err
60+
pool := memory.NewGoAllocator()
61+
b := array.NewRecordBuilder(pool, s.ArrowSchema)
62+
defer b.Release()
63+
64+
for _, l := range strings.Split(string(data), "\n") {
65+
v := map[string]string{}
66+
67+
err := ltsv.Unmarshal([]byte(l), &v)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
m := make(map[string]interface{})
73+
for k, v := range v {
74+
// bool
75+
if v != "0" && v != "1" {
76+
if vv, err := strconv.ParseBool(v); err == nil {
77+
m[k] = vv
78+
continue
79+
}
80+
}
81+
82+
// int
83+
if vv, err := strconv.ParseInt(v, 10, 64); err == nil {
84+
m[k] = vv
85+
continue
86+
}
87+
88+
// float
89+
if vv, err := strconv.ParseFloat(v, 64); err == nil {
90+
m[k] = vv
91+
continue
92+
}
93+
94+
// others; to string
95+
m[k] = v
96+
}
97+
98+
if _, err := formatMapToArrowRecord(b, m); err != nil {
99+
return nil, err
100+
}
60101
}
61102

62-
return formatMapToArrowRecord(s.ArrowSchema, maps)
103+
return NewWrappedRecord(b), nil
63104
}

record/msgpack.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"fmt"
66
"io"
77

8+
"github.com/apache/arrow/go/arrow/array"
9+
"github.com/apache/arrow/go/arrow/memory"
10+
811
"github.com/reproio/columnify/schema"
912

1013
"github.com/vmihailenco/msgpack/v4"
@@ -34,10 +37,28 @@ func FormatMsgpackToMap(data []byte) ([]map[string]interface{}, error) {
3437
}
3538

3639
func FormatMsgpackToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) {
37-
maps, err := FormatMsgpackToMap(data)
38-
if err != nil {
39-
return nil, err
40+
pool := memory.NewGoAllocator()
41+
b := array.NewRecordBuilder(pool, s.ArrowSchema)
42+
defer b.Release()
43+
44+
d := msgpack.NewDecoder(bytes.NewReader(data))
45+
for {
46+
arr, err := d.DecodeInterface()
47+
if err == io.EOF {
48+
break
49+
} else if err != nil {
50+
return nil, err
51+
}
52+
53+
m, mapOk := arr.(map[string]interface{})
54+
if !mapOk {
55+
return nil, fmt.Errorf("invalid input %v: %w", arr, ErrUnconvertibleRecord)
56+
}
57+
58+
if _, err = formatMapToArrowRecord(b, m); err != nil {
59+
return nil, err
60+
}
4061
}
4162

42-
return formatMapToArrowRecord(s.ArrowSchema, maps)
63+
return NewWrappedRecord(b), nil
4364
}

0 commit comments

Comments
 (0)