7878import org.apache.parquet.io.LocalOutputFile;
7979import org.apache.parquet.io.MessageColumnIO;
8080import org.apache.parquet.io.RecordReader;
81+ import org.apache.parquet.schema.LogicalTypeAnnotation;
82+ import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
8183import org.apache.parquet.schema.MessageType;
82- import org.apache.parquet.schema.MessageTypeParser ;
84+ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName ;
8385import org.apache.parquet.schema.Type;
86+ import org.apache.parquet.schema.Types.MessageTypeBuilder;
8487import org.xml.sax.InputSource;
8588import org.xml.sax.XMLReader;
8689import ucar.ma2.*;
@@ -16089,52 +16092,71 @@ public void readParquet(
1608916092 }
1609016093 }
1609116094
16095+ private boolean isTimeColumn(int col) {
16096+ return "time".equalsIgnoreCase(getColumnName(col))
16097+ && Calendar2.SECONDS_SINCE_1970.equals(columnAttributes.get(col).getString("units"));
16098+ }
16099+
1609216100 private MessageType getParquetSchemaForTable() {
16093- String schemaProto = "message m {" ;
16101+ MessageTypeBuilder schemaBuilder = org.apache.parquet.schema.Types.buildMessage() ;
1609416102 for (int j = 0; j < nColumns(); j++) {
16095- String schemaType = "String";
16103+ String columnName = getColumnName(j);
16104+ if (isTimeColumn(j)) {
16105+ schemaBuilder
16106+ .optional(PrimitiveTypeName.INT64)
16107+ .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
16108+ .named(columnName);
16109+ continue;
16110+ }
1609616111 switch (getColumn(j).elementType()) {
1609716112 case BYTE:
16098- schemaType = " INT32" ;
16113+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1609916114 break;
1610016115 case SHORT:
16101- schemaType = " INT32" ;
16116+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1610216117 break;
1610316118 case CHAR:
16104- schemaType = "BINARY";
16119+ schemaBuilder
16120+ .optional(PrimitiveTypeName.BINARY)
16121+ .as(LogicalTypeAnnotation.stringType())
16122+ .named(columnName);
1610516123 break;
1610616124 case INT:
16107- schemaType = " INT32" ;
16125+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1610816126 break;
1610916127 case LONG:
16110- schemaType = " INT64" ;
16128+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
1611116129 break;
1611216130 case FLOAT:
16113- schemaType = " FLOAT" ;
16131+ schemaBuilder.optional(PrimitiveTypeName. FLOAT).named(columnName) ;
1611416132 break;
1611516133 case DOUBLE:
16116- schemaType = " DOUBLE" ;
16134+ schemaBuilder.optional(PrimitiveTypeName. DOUBLE).named(columnName) ;
1611716135 break;
1611816136 case STRING:
16119- schemaType = "BINARY";
16137+ schemaBuilder
16138+ .optional(PrimitiveTypeName.BINARY)
16139+ .as(LogicalTypeAnnotation.stringType())
16140+ .named(columnName);
1612016141 break;
1612116142 case UBYTE:
16122- schemaType = " INT32" ;
16143+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1612316144 break;
1612416145 case USHORT:
16125- schemaType = " INT32" ;
16146+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1612616147 break;
1612716148 case UINT:
16128- schemaType = " INT64" ;
16149+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
1612916150 break;
1613016151 case ULONG:
16131- schemaType = "DOUBLE";
16152+ schemaBuilder.optional(PrimitiveTypeName.DOUBLE).named(columnName);
16153+ break;
16154+ case BOOLEAN:
16155+ schemaBuilder.optional(PrimitiveTypeName.BOOLEAN).named(columnName);
1613216156 break;
1613316157 }
16134- schemaProto += " optional " + schemaType + " " + getColumnName(j) + ";\n";
1613516158 }
16136- schemaProto += "}";
16137- return MessageTypeParser.parseMessageType(schemaProto);
16159+ return schemaBuilder.named("m");
1613816160 }
1613916161
1614016162 private void addMetadata(Map<String, String> metadata, Attributes attributes, String prefix) {
@@ -16148,7 +16170,12 @@ private void addMetadata(Map<String, String> metadata, Attributes attributes, St
1614816170 if (tValue == null || tValue.size() == 0 || tValue.toString().length() == 0) {
1614916171 continue; // do nothing
1615016172 }
16151- metadata.put(prefix + tName, tValue.toCSVString());
16173+ if ("time_".equalsIgnoreCase(prefix)
16174+ && Calendar2.SECONDS_SINCE_1970.equals(attributes.getString(tName))) {
16175+ metadata.put(prefix + tName, Calendar2.MILLISECONDS_SINCE_1970);
16176+ } else {
16177+ metadata.put(prefix + tName, tValue.toCSVString());
16178+ }
1615216179 }
1615316180 }
1615416181
@@ -16188,7 +16215,11 @@ public void writeParquet(String fullFileName, boolean fullMetadata) throws Excep
1618816215 columnUnits += ",";
1618916216 }
1619016217 columnNames += getColumnName(col);
16191- columnUnits += colAttributes.getString("units");
16218+ if (isTimeColumn(col)) {
16219+ columnUnits += Calendar2.MILLISECONDS_SINCE_1970;
16220+ } else {
16221+ columnUnits += colAttributes.getString("units");
16222+ }
1619216223 }
1619316224 metadata.put("column_names", columnNames);
1619416225 metadata.put("column_units", columnUnits);
@@ -16208,7 +16239,12 @@ public void writeParquet(String fullFileName, boolean fullMetadata) throws Excep
1620816239 for (int row = 0; row < nRows(); row++) {
1620916240 ArrayList<PAOne> record = new ArrayList<>();
1621016241 for (int j = 0; j < nColumns(); j++) {
16211- record.add(getPAOneData(j, row));
16242+ if (isTimeColumn(j)) {
16243+ // Convert from seconds since epoch to millis since epoch.
16244+ record.add(getPAOneData(j, row).multiply(PAOne.fromInt(1000)));
16245+ } else {
16246+ record.add(getPAOneData(j, row));
16247+ }
1621216248 }
1621316249 writer.write(record);
1621416250 }
0 commit comments