Skip to content

Commit 9c99724

Browse files
authored
pqarrow/arrowutils: Support merging nested list and structs (#950)
1 parent a422bde commit 9c99724

File tree

2 files changed

+95
-1
lines changed

2 files changed

+95
-1
lines changed

pqarrow/arrowutils/merge_test.go

+62
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,68 @@ func TestMerge(t *testing.T) {
279279
}
280280
}
281281

282+
func TestMergeNestedListStruct(t *testing.T) {
283+
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
284+
defer mem.AssertSize(t, 0)
285+
286+
rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{
287+
{Name: "int64", Type: arrow.PrimitiveTypes.Int64},
288+
{Name: "list", Type: arrow.ListOf(arrow.StructOf([]arrow.Field{
289+
{Name: "int32", Type: arrow.PrimitiveTypes.Int32},
290+
{Name: "uint64", Type: arrow.PrimitiveTypes.Uint64},
291+
}...))},
292+
}, nil))
293+
defer rb.Release()
294+
295+
var recs []arrow.Record
296+
defer func() {
297+
for _, r := range recs {
298+
r.Release()
299+
}
300+
}()
301+
302+
int64Builder := rb.Field(0).(*array.Int64Builder)
303+
listBuilder := rb.Field(1).(*array.ListBuilder)
304+
listStructBuilder := listBuilder.ValueBuilder().(*array.StructBuilder)
305+
listStructInt32Builder := listStructBuilder.FieldBuilder(0).(*array.Int32Builder)
306+
listStructUint64Builder := listStructBuilder.FieldBuilder(1).(*array.Uint64Builder)
307+
308+
int64Builder.Append(-123)
309+
listBuilder.Append(true)
310+
listStructBuilder.Append(true)
311+
listStructInt32Builder.Append(123)
312+
listStructUint64Builder.Append(123 * 2)
313+
listStructBuilder.Append(true)
314+
listStructInt32Builder.Append(123 * 3)
315+
listStructUint64Builder.Append(123 * 4)
316+
recs = append(recs, rb.NewRecord())
317+
318+
int64Builder.Append(-123 * 2)
319+
listBuilder.Append(true)
320+
listStructBuilder.Append(true)
321+
listStructInt32Builder.Append(123 * 5)
322+
listStructUint64Builder.Append(123 * 6)
323+
listStructBuilder.Append(true)
324+
listStructInt32Builder.Append(123 * 7)
325+
listStructUint64Builder.Append(123 * 8)
326+
listStructBuilder.Append(true)
327+
listStructInt32Builder.Append(123 * 9)
328+
listStructUint64Builder.Append(123 * 10)
329+
recs = append(recs, rb.NewRecord())
330+
331+
mergeRecord, err := arrowutils.MergeRecords(mem, recs, []arrowutils.SortingColumn{
332+
{Index: 0, Direction: arrowutils.Ascending},
333+
}, 0)
334+
require.NoError(t, err)
335+
defer mergeRecord.Release()
336+
337+
require.Equal(t, int64(2), mergeRecord.NumCols())
338+
require.Equal(t, int64(2), mergeRecord.NumRows())
339+
340+
require.Equal(t, `[-246 -123]`, mergeRecord.Column(0).String())
341+
require.Equal(t, `[{[615 861 1107] [738 984 1230]} {[123 369] [246 492]}]`, mergeRecord.Column(1).String())
342+
}
343+
282344
func BenchmarkMergeRecords(b *testing.B) {
283345
ctx := context.Background()
284346
mem := memory.NewGoAllocator()

pqarrow/builder/utils.go

+33-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,15 @@ func AppendValue(cb ColumnBuilder, arr arrow.Array, i int) error {
8080
b.Append(arr.(*array.FixedSizeBinary).Value(i))
8181
case *array.BooleanBuilder:
8282
b.Append(arr.(*array.Boolean).Value(i))
83+
case *array.StructBuilder:
84+
arrStruct := arr.(*array.Struct)
85+
86+
b.Append(true)
87+
for j := 0; j < b.NumField(); j++ {
88+
if err := AppendValue(b.FieldBuilder(j), arrStruct.Field(j), i); err != nil {
89+
return fmt.Errorf("failed to append struct field: %w", err)
90+
}
91+
}
8392
case *array.BinaryDictionaryBuilder:
8493
switch a := arr.(type) {
8594
case *array.Dictionary:
@@ -119,6 +128,12 @@ func buildList(vb any, b ListLikeBuilder, arr arrow.Array, i int) error {
119128
defer values.Release()
120129

121130
switch v := values.(type) {
131+
case *array.Int64:
132+
int64Builder := vb.(*OptInt64Builder)
133+
b.Append(true)
134+
for j := 0; j < v.Len(); j++ {
135+
int64Builder.Append(v.Value(j))
136+
}
122137
case *array.Dictionary:
123138
switch dict := v.Dictionary().(type) {
124139
case *array.Binary:
@@ -130,10 +145,27 @@ func buildList(vb any, b ListLikeBuilder, arr arrow.Array, i int) error {
130145
return err
131146
}
132147
default:
133-
return fmt.Errorf("uknown value builder type %T", bldr)
148+
return fmt.Errorf("unknown value builder type %T", bldr)
134149
}
135150
}
136151
}
152+
case *array.Struct:
153+
structBuilder, ok := vb.(*array.StructBuilder)
154+
if !ok {
155+
return fmt.Errorf("unsupported type for ListLikeBuilder: %T", vb)
156+
}
157+
158+
b.Append(true)
159+
for j := 0; j < v.Len(); j++ {
160+
structBuilder.Append(true)
161+
for k := 0; k < v.NumField(); k++ {
162+
if err := AppendValue(structBuilder.FieldBuilder(k), v.Field(k), j); err != nil {
163+
return err
164+
}
165+
}
166+
}
167+
default:
168+
return fmt.Errorf("unsupported type for List builder %T", v)
137169
}
138170

139171
return nil

0 commit comments

Comments
 (0)