@@ -90,8 +90,19 @@ await ReadPrimitiveField(dataTable, groupReader, rowBeginIndex, field, skipRecor
90
90
readRecords , isFirstColumn , cancellationToken , progress ) ;
91
91
break ;
92
92
case ParquetSchemaElement . FieldTypeId . List :
93
- await ReadListField ( dataTable , groupReader , rowBeginIndex , field , skipRecords ,
94
- readRecords , isFirstColumn , cancellationToken , progress ) ;
93
+ var listField = field . GetSingle ( "list" ) ;
94
+ ParquetSchemaElement itemField ;
95
+ try
96
+ {
97
+ itemField = listField . GetSingle ( "item" ) ;
98
+ }
99
+ catch ( Exception ex )
100
+ {
101
+ throw new UnsupportedFieldException ( $ "Cannot load field `{ field . Path } `. Invalid List type.", ex ) ;
102
+ }
103
+ var fieldIndex = dataTable . Columns [ field . Path ] ! . Ordinal ;
104
+ await ReadListField ( dataTable , groupReader , rowBeginIndex , itemField , fieldIndex ,
105
+ skipRecords , readRecords , isFirstColumn , cancellationToken , progress ) ;
95
106
break ;
96
107
case ParquetSchemaElement . FieldTypeId . Map :
97
108
await ReadMapField ( dataTable , groupReader , rowBeginIndex , field , skipRecords ,
@@ -115,64 +126,56 @@ private async Task ReadPrimitiveField(DataTableLite dataTable, ParquetRowGroupRe
115
126
int skippedRecords = 0 ;
116
127
var dataColumn = await groupReader . ReadColumnAsync ( field . DataField ?? throw new Exception ( $ "Pritimive field `{ field . Path } ` is missing its data field") , cancellationToken ) ;
117
128
129
+ bool doesFieldBelongToAList = dataColumn . RepetitionLevels ? . Any ( l => l > 0 ) ?? false ;
118
130
int fieldIndex = dataTable . Columns [ field . Path ] ? . Ordinal ?? throw new Exception ( $ "Column `{ field . Path } ` is missing") ;
119
- var fieldType = dataTable . Columns [ field . Path ] . Type ; var byteArrayValueType = typeof ( ByteArrayValue ) ;
120
- foreach ( var value in dataColumn . Data )
131
+ if ( doesFieldBelongToAList )
121
132
{
122
- cancellationToken . ThrowIfCancellationRequested ( ) ;
123
-
124
- if ( skipRecords > skippedRecords )
133
+ dataColumn = null ;
134
+ await ReadListField ( dataTable , groupReader , rowBeginIndex , field , fieldIndex , skipRecords , readRecords , isFirstColumn , cancellationToken , progress ) ;
135
+ }
136
+ else
137
+ {
138
+ var fieldType = dataTable . Columns [ field . Path ] . Type ; var byteArrayValueType = typeof ( ByteArrayValue ) ;
139
+ foreach ( var value in dataColumn . Data )
125
140
{
126
- skippedRecords ++ ;
127
- continue ;
128
- }
141
+ cancellationToken . ThrowIfCancellationRequested ( ) ;
129
142
130
- if ( rowIndex - rowBeginIndex >= readRecords )
131
- break ;
143
+ if ( skipRecords > skippedRecords )
144
+ {
145
+ skippedRecords ++ ;
146
+ continue ;
147
+ }
132
148
133
- if ( isFirstColumn )
134
- {
135
- dataTable . NewRow ( ) ;
136
- }
149
+ if ( rowIndex - rowBeginIndex >= readRecords )
150
+ break ;
137
151
138
- if ( value == DBNull . Value || value is null )
139
- {
140
- dataTable . Rows [ rowIndex ] ! [ fieldIndex ] = DBNull . Value ;
141
- }
142
- else if ( fieldType == byteArrayValueType )
143
- {
144
- dataTable . Rows [ rowIndex ] ! [ fieldIndex ] = new ByteArrayValue ( field . Path , ( byte [ ] ) value ) ;
145
- }
146
- else
147
- {
148
- dataTable . Rows [ rowIndex ] ! [ fieldIndex ] = FixDateTime ( value , field ) ;
149
- }
152
+ if ( isFirstColumn )
153
+ {
154
+ dataTable . NewRow ( ) ;
155
+ }
150
156
151
- rowIndex ++ ;
152
- progress ? . Report ( 1 ) ;
157
+ if ( value == DBNull . Value || value is null )
158
+ {
159
+ dataTable . Rows [ rowIndex ] ! [ fieldIndex ] = DBNull . Value ;
160
+ }
161
+ else if ( fieldType == byteArrayValueType )
162
+ {
163
+ dataTable . Rows [ rowIndex ] ! [ fieldIndex ] = new ByteArrayValue ( field . Path , ( byte [ ] ) value ) ;
164
+ }
165
+ else
166
+ {
167
+ dataTable . Rows [ rowIndex ] ! [ fieldIndex ] = FixDateTime ( value , field ) ;
168
+ }
169
+
170
+ rowIndex ++ ;
171
+ progress ? . Report ( 1 ) ;
172
+ }
153
173
}
154
174
}
155
175
156
- /// <summary>
157
- /// This is a patch fix to handle malformed datetime fields. We assume TIMESTAMP fields are DateTime values.
158
- /// </summary>
159
- /// <param name="value">Original value</param>
160
- /// <param name="field">Schema element</param>
161
- /// <returns>If the field is a timestamp, a DateTime object will be returned. Otherwise the value will not be changed.</returns>
162
- private async Task ReadListField ( DataTableLite dataTable , ParquetRowGroupReader groupReader , int rowBeginIndex , ParquetSchemaElement field ,
176
+ private async Task ReadListField ( DataTableLite dataTable , ParquetRowGroupReader groupReader , int rowBeginIndex , ParquetSchemaElement itemField , int fieldIndex ,
163
177
long skipRecords , long readRecords , bool isFirstColumn , CancellationToken cancellationToken , IProgress < int > ? progress )
164
178
{
165
- var listField = field . GetSingle ( "list" ) ;
166
- ParquetSchemaElement itemField ;
167
- try
168
- {
169
- itemField = listField . GetSingle ( "item" ) ;
170
- }
171
- catch ( Exception ex )
172
- {
173
- throw new UnsupportedFieldException ( $ "Cannot load field `{ field . Path } `. Invalid List type.", ex ) ;
174
- }
175
-
176
179
if ( itemField . FieldType ( ) == ParquetSchemaElement . FieldTypeId . Primitive )
177
180
{
178
181
int rowIndex = rowBeginIndex ;
@@ -181,7 +184,6 @@ private async Task ReadListField(DataTableLite dataTable, ParquetRowGroupReader
181
184
var dataColumn = await groupReader . ReadColumnAsync ( itemField . DataField ! , cancellationToken ) ;
182
185
183
186
ArrayList ? rowValue = null ;
184
- var fieldIndex = dataTable . Columns [ field . Path ] ! . Ordinal ;
185
187
for ( int i = 0 ; i < dataColumn . Data . Length ; i ++ )
186
188
{
187
189
cancellationToken . ThrowIfCancellationRequested ( ) ;
@@ -230,6 +232,58 @@ bool IsEndOfRow() => (i + 1) == dataColumn.RepetitionLevels!.Length
230
232
}
231
233
}
232
234
}
235
+ else if ( itemField . FieldType ( ) == ParquetSchemaElement . FieldTypeId . Struct )
236
+ {
237
+ //Read struct data as a new datatable
238
+ DataTableLite structFieldTable = BuildDataTable ( itemField , itemField . Children . Select ( f => f . Path ) . ToList ( ) , ( int ) readRecords ) ;
239
+
240
+ //Need to calculate progress differently for structs
241
+ var structFieldReadProgress = StructReadProgress ( progress , structFieldTable . Columns . Count ) ;
242
+
243
+ //Read the struct data and populate the datatable
244
+ await ProcessRowGroup ( structFieldTable , groupReader , skipRecords , readRecords , cancellationToken , structFieldReadProgress ) ;
245
+
246
+ //We need to pivot the data into a new data table (because we read it in columnar fashion above)
247
+ int rowIndex = rowBeginIndex ;
248
+ foreach ( var values in structFieldTable . Rows )
249
+ {
250
+ var newStructFieldTable = BuildDataTable ( itemField , itemField . Children . Select ( f => f . Path ) . ToList ( ) , ( int ) readRecords ) ;
251
+ for ( var columnOrdinal = 0 ; columnOrdinal < values . Length ; columnOrdinal ++ )
252
+ {
253
+ if ( values [ columnOrdinal ] == DBNull . Value )
254
+ {
255
+ //Empty array
256
+ continue ;
257
+ }
258
+
259
+ var columnValues = ( ListValue ) values [ columnOrdinal ] ;
260
+ for ( var rowValueIndex = 0 ; rowValueIndex < columnValues . Data . Count ; rowValueIndex ++ )
261
+ {
262
+ var columnValue = columnValues . Data [ rowValueIndex ] ?? throw new SystemException ( "This should never happen" ) ;
263
+ bool isFirstValueColumn = columnOrdinal == 0 ;
264
+ if ( isFirstValueColumn )
265
+ {
266
+ newStructFieldTable . NewRow ( ) ;
267
+ }
268
+ newStructFieldTable . Rows [ rowValueIndex ] [ columnOrdinal ] = columnValue ;
269
+ }
270
+ }
271
+
272
+ if ( isFirstColumn )
273
+ dataTable . NewRow ( ) ;
274
+
275
+ var listValuesDataTable = newStructFieldTable . ToDataTable ( cancellationToken ) ;
276
+ var listValues = new ArrayList ( listValuesDataTable . Rows . Count ) ;
277
+ foreach ( DataRow row in listValuesDataTable . Rows )
278
+ {
279
+ var newStructField = new StructValue ( itemField . Path , row ) ;
280
+ listValues . Add ( newStructField ) ;
281
+ }
282
+
283
+ dataTable . Rows [ rowIndex ] [ fieldIndex ] = new ListValue ( listValues , typeof ( StructValue ) ) ;
284
+ rowIndex ++ ;
285
+ }
286
+ }
233
287
else
234
288
{
235
289
throw new NotSupportedException ( $ "Lists of { itemField . FieldType ( ) } s are not currently supported") ;
0 commit comments