@@ -13,15 +13,17 @@ public static DataTable ParquetReaderToDataTable(ParquetReader parquetReader, Li
13
13
{
14
14
//Get list of data fields and construct the DataTable
15
15
DataTable dataTable = new DataTable ( ) ;
16
- List < Parquet . Data . DataField > fields = new List < Parquet . Data . DataField > ( ) ;
16
+ var fields = new List < ( Parquet . Thrift . SchemaElement , Parquet . Data . DataField ) > ( ) ;
17
17
var dataFields = parquetReader . Schema . GetDataFields ( ) ;
18
18
foreach ( string selectedField in selectedFields )
19
19
{
20
20
var dataField = dataFields . FirstOrDefault ( f => f . Name . Equals ( selectedField , StringComparison . InvariantCultureIgnoreCase ) ) ;
21
21
if ( dataField != null )
22
22
{
23
- fields . Add ( dataField ) ;
24
- DataColumn newColumn = new DataColumn ( dataField . Name , ParquetNetTypeToCSharpType ( dataField . DataType ) ) ;
23
+ var thriftSchema = parquetReader . ThriftMetadata . Schema . First ( f => f . Name . Equals ( selectedField , StringComparison . InvariantCultureIgnoreCase ) ) ;
24
+
25
+ fields . Add ( ( thriftSchema , dataField ) ) ;
26
+ DataColumn newColumn = new DataColumn ( dataField . Name , ParquetNetTypeToCSharpType ( thriftSchema , dataField . DataType ) ) ;
25
27
dataTable . Columns . Add ( newColumn ) ;
26
28
}
27
29
else
@@ -64,17 +66,20 @@ public static DataTable ParquetReaderToDataTable(ParquetReader parquetReader, Li
64
66
return dataTable ;
65
67
}
66
68
67
- private static void ProcessRowGroup ( DataTable dataTable , ParquetRowGroupReader groupReader , List < Parquet . Data . DataField > fields ,
69
+ private static void ProcessRowGroup ( DataTable dataTable , ParquetRowGroupReader groupReader , List < ( Parquet . Thrift . SchemaElement , Parquet . Data . DataField ) > fields ,
68
70
int skipRecords , int readRecords , CancellationToken cancellationToken )
69
71
{
70
72
int rowBeginIndex = dataTable . Rows . Count ;
71
73
bool isFirstColumn = true ;
72
74
73
- foreach ( var field in fields )
75
+ foreach ( var fieldTuple in fields )
74
76
{
75
77
if ( cancellationToken . IsCancellationRequested )
76
78
break ;
77
79
80
+ var logicalType = fieldTuple . Item1 . LogicalType ;
81
+ var field = fieldTuple . Item2 ;
82
+
78
83
int rowIndex = rowBeginIndex ;
79
84
80
85
int skippedRecords = 0 ;
@@ -101,7 +106,23 @@ private static void ProcessRowGroup(DataTable dataTable, ParquetRowGroupReader g
101
106
if ( value == null )
102
107
dataTable . Rows [ rowIndex ] [ field . Name ] = DBNull . Value ;
103
108
else if ( field . DataType == Parquet . Data . DataType . DateTimeOffset )
104
- dataTable . Rows [ rowIndex ] [ field . Name ] = ( ( DateTimeOffset ) value ) . DateTime ; //converts to local time!
109
+ dataTable . Rows [ rowIndex ] [ field . Name ] = ( ( DateTimeOffset ) value ) . DateTime ;
110
+ else if ( field . DataType == Parquet . Data . DataType . Int64
111
+ && logicalType . TIMESTAMP != null )
112
+ {
113
+ int divideBy = 0 ;
114
+ if ( logicalType . TIMESTAMP . Unit . NANOS != null )
115
+ divideBy = 1000 * 1000 ;
116
+ else if ( logicalType . TIMESTAMP . Unit . MICROS != null )
117
+ divideBy = 1000 ;
118
+ else if ( logicalType . TIMESTAMP . Unit . MILLIS != null )
119
+ divideBy = 1 ;
120
+
121
+ if ( divideBy > 0 )
122
+ dataTable . Rows [ rowIndex ] [ field . Name ] = DateTimeOffset . FromUnixTimeMilliseconds ( ( long ) value / divideBy ) . DateTime ;
123
+ else //Not sure if this 'else' is correct but adding just in case
124
+ dataTable . Rows [ rowIndex ] [ field . Name ] = DateTimeOffset . FromUnixTimeSeconds ( ( long ) value ) ;
125
+ }
105
126
else
106
127
dataTable . Rows [ rowIndex ] [ field . Name ] = value ;
107
128
@@ -113,7 +134,7 @@ private static void ProcessRowGroup(DataTable dataTable, ParquetRowGroupReader g
113
134
}
114
135
115
136
116
- public static Type ParquetNetTypeToCSharpType ( Parquet . Data . DataType type )
137
+ public static Type ParquetNetTypeToCSharpType ( Parquet . Thrift . SchemaElement thriftSchema , Parquet . Data . DataType type )
117
138
{
118
139
Type columnType = null ;
119
140
switch ( type )
@@ -147,7 +168,7 @@ public static Type ParquetNetTypeToCSharpType(Parquet.Data.DataType type)
147
168
columnType = typeof ( int ) ;
148
169
break ;
149
170
case Parquet . Data . DataType . Int64 :
150
- columnType = typeof ( long ) ;
171
+ columnType = thriftSchema . LogicalType . TIMESTAMP != null ? typeof ( DateTime ) : typeof ( long ) ;
151
172
break ;
152
173
case Parquet . Data . DataType . UnsignedByte :
153
174
columnType = typeof ( byte ) ;
0 commit comments