7878import org.apache.parquet.io.LocalOutputFile;
7979import org.apache.parquet.io.MessageColumnIO;
8080import org.apache.parquet.io.RecordReader;
81+ import org.apache.parquet.schema.LogicalTypeAnnotation;
82+ import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
8183import org.apache.parquet.schema.MessageType;
82- import org.apache.parquet.schema.MessageTypeParser ;
84+ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName ;
8385import org.apache.parquet.schema.Type;
86+ import org.apache.parquet.schema.Types.MessageTypeBuilder;
8487import org.xml.sax.InputSource;
8588import org.xml.sax.XMLReader;
8689import ucar.ma2.*;
@@ -588,7 +591,7 @@ private static interface WithColumnNames {
588591 public static BitSet ncCFcc = null; // null=inactive, new BitSet() = active
589592
590593 /** An arrayList to hold 0 or more PrimitiveArray's with data. */
591- protected ArrayList<PrimitiveArray> columns = new ArrayList();
594+ protected ArrayList<PrimitiveArray> columns = new ArrayList<> ();
592595
593596 /** An arrayList to hold the column names. */
594597 protected StringArray columnNames = new StringArray();
@@ -605,7 +608,7 @@ private static interface WithColumnNames {
605608 * Although a HashTable is more appropriate for name=value pairs, this uses ArrayList to preserve
606609 * the order of the attributes. This may be null if not in use.
607610 */
608- protected ArrayList<Attributes> columnAttributes = new ArrayList();
611+ protected ArrayList<Attributes> columnAttributes = new ArrayList<> ();
609612
610613 /** The one known valid url for readIobis. */
611614 public static final String IOBIS_URL = "http://www.iobis.org/OBISWEB/ObisControllerServlet";
@@ -16089,52 +16092,91 @@ public void readParquet(
1608916092 }
1609016093 }
1609116094
16092- private MessageType getParquetSchemaForTable(String name) {
16093- String schemaProto = "message m {";
16095+ private boolean isTimeColumn(int col) {
16096+ return "time".equalsIgnoreCase(getColumnName(col))
16097+ && Calendar2.SECONDS_SINCE_1970.equals(columnAttributes.get(col).getString("units"));
16098+ }
16099+
16100+ private MessageType getParquetSchemaForTable() {
16101+ MessageTypeBuilder schemaBuilder = org.apache.parquet.schema.Types.buildMessage();
1609416102 for (int j = 0; j < nColumns(); j++) {
16095- String schemaType = "String";
16103+ String columnName = getColumnName(j);
16104+ if (isTimeColumn(j)) {
16105+ schemaBuilder
16106+ .optional(PrimitiveTypeName.INT64)
16107+ .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
16108+ .named(columnName);
16109+ continue;
16110+ }
1609616111 switch (getColumn(j).elementType()) {
1609716112 case BYTE:
16098- schemaType = " INT32" ;
16113+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1609916114 break;
1610016115 case SHORT:
16101- schemaType = " INT32" ;
16116+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1610216117 break;
1610316118 case CHAR:
16104- schemaType = "BINARY";
16119+ schemaBuilder
16120+ .optional(PrimitiveTypeName.BINARY)
16121+ .as(LogicalTypeAnnotation.stringType())
16122+ .named(columnName);
1610516123 break;
1610616124 case INT:
16107- schemaType = " INT32" ;
16125+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1610816126 break;
1610916127 case LONG:
16110- schemaType = " INT64" ;
16128+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
1611116129 break;
1611216130 case FLOAT:
16113- schemaType = " FLOAT" ;
16131+ schemaBuilder.optional(PrimitiveTypeName. FLOAT).named(columnName) ;
1611416132 break;
1611516133 case DOUBLE:
16116- schemaType = " DOUBLE" ;
16134+ schemaBuilder.optional(PrimitiveTypeName. DOUBLE).named(columnName) ;
1611716135 break;
1611816136 case STRING:
16119- schemaType = "BINARY";
16137+ schemaBuilder
16138+ .optional(PrimitiveTypeName.BINARY)
16139+ .as(LogicalTypeAnnotation.stringType())
16140+ .named(columnName);
1612016141 break;
1612116142 case UBYTE:
16122- schemaType = " INT32" ;
16143+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1612316144 break;
1612416145 case USHORT:
16125- schemaType = " INT32" ;
16146+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
1612616147 break;
1612716148 case UINT:
16128- schemaType = " INT64" ;
16149+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
1612916150 break;
1613016151 case ULONG:
16131- schemaType = " DOUBLE" ;
16152+ schemaBuilder.optional(PrimitiveTypeName. DOUBLE).named(columnName) ;
1613216153 break;
16154+ case BOOLEAN:
16155+ schemaBuilder.optional(PrimitiveTypeName.BOOLEAN).named(columnName);
16156+ break;
16157+ }
16158+ }
16159+ return schemaBuilder.named("m");
16160+ }
16161+
16162+ private void addMetadata(Map<String, String> metadata, Attributes attributes, String prefix) {
16163+ String names[] = attributes.getNames();
16164+ for (int ni = 0; ni < names.length; ni++) {
16165+ String tName = names[ni];
16166+ if (!String2.isSomething(tName)) {
16167+ continue;
16168+ }
16169+ PrimitiveArray tValue = attributes.get(tName);
16170+ if (tValue == null || tValue.size() == 0 || tValue.toString().length() == 0) {
16171+ continue; // do nothing
16172+ }
16173+ if ("time_".equalsIgnoreCase(prefix)
16174+ && Calendar2.SECONDS_SINCE_1970.equals(attributes.getString(tName))) {
16175+ metadata.put(prefix + tName, Calendar2.MILLISECONDS_SINCE_1970);
16176+ } else {
16177+ metadata.put(prefix + tName, tValue.toCSVString());
1613316178 }
16134- schemaProto += " optional " + schemaType + " " + getColumnName(j) + ";\n";
1613516179 }
16136- schemaProto += "}";
16137- return MessageTypeParser.parseMessageType(schemaProto);
1613816180 }
1613916181
1614016182 /**
@@ -16143,23 +16185,49 @@ private MessageType getParquetSchemaForTable(String name) {
1614316185 * @param fullFileName This is just used for error messages.
1614416186 * @throws Exception if trouble, including observed nItems != expected nItems.
1614516187 */
16146- public void writeParquet(String fullFileName) throws Exception {
16188+ public void writeParquet(String fullFileName, boolean fullMetadata ) throws Exception {
1614716189 String msg = " Table.writeParquet " + fullFileName;
1614816190 long time = System.currentTimeMillis();
1614916191
1615016192 int randomInt = Math2.random(Integer.MAX_VALUE);
16151-
16152- int nameStart = fullFileName.lastIndexOf('/');
16153- if (nameStart == -1) {
16154- nameStart = fullFileName.lastIndexOf('\\');
16193+ MessageType schema = getParquetSchemaForTable();
16194+
16195+ Map<String, String> metadata = new HashMap<>();
16196+ if (fullMetadata) {
16197+ addMetadata(metadata, globalAttributes, "");
16198+ for (int col = 0; col < nColumns(); col++) {
16199+ Attributes colAttributes = columnAttributes.get(col);
16200+ if (colAttributes == null) {
16201+ continue;
16202+ }
16203+ addMetadata(metadata, colAttributes, getColumnName(col) + "_");
16204+ }
1615516205 }
16156- int nameEnd = fullFileName.lastIndexOf('.');
16157- String name = fullFileName.substring(nameStart + 1, nameEnd);
16158- MessageType schema = getParquetSchemaForTable(name);
16159-
16206+ String columnNames = "";
16207+ String columnUnits = "";
16208+ for (int col = 0; col < nColumns(); col++) {
16209+ Attributes colAttributes = columnAttributes.get(col);
16210+ if (colAttributes == null) {
16211+ continue;
16212+ }
16213+ if (columnNames.length() > 0) {
16214+ columnNames += ",";
16215+ columnUnits += ",";
16216+ }
16217+ columnNames += getColumnName(col);
16218+ if (isTimeColumn(col)) {
16219+ columnUnits += Calendar2.MILLISECONDS_SINCE_1970;
16220+ } else {
16221+ columnUnits += colAttributes.getString("units");
16222+ }
16223+ }
16224+ metadata.put("column_names", columnNames);
16225+ metadata.put("column_units", columnUnits);
1616016226 try (ParquetWriter<List<PAOne>> writer =
1616116227 new ParquetWriterBuilder(
16162- schema, new LocalOutputFile(java.nio.file.Path.of(fullFileName + randomInt)))
16228+ schema,
16229+ new LocalOutputFile(java.nio.file.Path.of(fullFileName + randomInt)),
16230+ metadata)
1616316231 .withCompressionCodec(CompressionCodecName.SNAPPY)
1616416232 .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
1616516233 .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
@@ -16171,7 +16239,12 @@ schema, new LocalOutputFile(java.nio.file.Path.of(fullFileName + randomInt)))
1617116239 for (int row = 0; row < nRows(); row++) {
1617216240 ArrayList<PAOne> record = new ArrayList<>();
1617316241 for (int j = 0; j < nColumns(); j++) {
16174- record.add(getPAOneData(j, row));
16242+ if (isTimeColumn(j)) {
16243+ // Convert from seconds since epoch to millis since epoch.
16244+ record.add(getPAOneData(j, row).multiply(PAOne.fromInt(1000)));
16245+ } else {
16246+ record.add(getPAOneData(j, row));
16247+ }
1617516248 }
1617616249 writer.write(record);
1617716250 }
0 commit comments