Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions LinuxPackage.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<package >
<metadata>
<id>pq2json_linux</id>
<version>0.1.15</version>
<version>0.1.16</version>
<authors>Evgeney Ryzhyk</authors>
<owners>Evgeney Ryzhyk</owners>
<license type="expression">MIT</license>
<projectUrl>https://github.com/Azure/azure-kusto-parquet-conv</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Parquet to JSON (line delimited) converter tool.</description>
<releaseNotes>Updated cslschema handling of Int32, JSON and Decimal types.</releaseNotes>
<releaseNotes>Updated arrow - Parquet library version</releaseNotes>
<copyright>Copyright 2020</copyright>
<tags></tags>
<dependencies></dependencies>
Expand Down
4 changes: 2 additions & 2 deletions Package.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<package >
<metadata>
<id>pq2json</id>
<version>0.1.15</version>
<version>0.1.16</version>
<authors>Evgeney Ryzhyk</authors>
<owners>Evgeney Ryzhyk</owners>
<license type="expression">MIT</license>
<projectUrl>https://github.com/Azure/azure-kusto-parquet-conv</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Parquet to JSON (line delimited) converter tool.</description>
<releaseNotes>Updated cslschema handling of Int32, JSON and Decimal types.</releaseNotes>
<releaseNotes>Updated arrow - Parquet library version</releaseNotes>
<copyright>Copyright 2020</copyright>
<tags></tags>
<dependencies></dependencies>
Expand Down
2 changes: 1 addition & 1 deletion pq2json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
clap = "2"
parquet = { git = "https://github.com/rzheka/arrow.git", branch = "dev" }
parquet = { git = "https://github.com/apache/arrow-rs.git", branch = "master", features = ["cli"] }
itertools = "0.8"
serde = "1"
serde_json = "1"
Expand Down
136 changes: 60 additions & 76 deletions pq2json/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::Path;

use crate::settings::Settings;
use crate::TimestampRendering;
use chrono::Duration;
use csv::Terminator;
use num_bigint::{BigInt, Sign};
use parquet::data_type::Decimal;
use parquet::data_type::{AsBytes, Decimal};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::{FieldType, List, ListAccessor, Map, MapAccessor, Row, RowAccessor};
use parquet::record::reader::RowIter;
use parquet::record::{Field, List, Map, Row};
use parquet::schema::types::Type as SchemaType;
use serde_json::{Number, Value};

use crate::settings::{Settings, TimestampRendering};
use chrono::Duration;
use csv::Terminator;
use parquet::record::reader::RowIter;

const WRITER_BUF_CAP: usize = 256 * 1024;

/// Writes Parquet file as text, either as JSONL (every line contains single JSON record)
Expand Down Expand Up @@ -92,37 +92,21 @@ fn projected_schema(
)
}

macro_rules! element_to_value {
($ft:expr, $obj:ident, $i:ident, $settings:ident) => {
match $ft {
FieldType::Null => Value::Null,
FieldType::Bool => Value::Bool($obj.get_bool($i)?),
FieldType::Byte => Value::Number($obj.get_byte($i)?.into()),
FieldType::Short => Value::Number($obj.get_short($i)?.into()),
FieldType::Int => Value::Number($obj.get_int($i)?.into()),
FieldType::Long => Value::Number($obj.get_long($i)?.into()),
FieldType::UByte => Value::Number($obj.get_ubyte($i)?.into()),
FieldType::UShort => Value::Number($obj.get_ushort($i)?.into()),
FieldType::UInt => Value::Number($obj.get_uint($i)?.into()),
FieldType::ULong => ulong_to_value($obj.get_ulong($i)?, &$settings),
FieldType::Float => float_to_value($obj.get_float($i)? as f64),
FieldType::Double => float_to_value($obj.get_double($i)?),
FieldType::Decimal => Value::String(decimal_to_string($obj.get_decimal($i)?)),
FieldType::Str => Value::String($obj.get_string($i)?.to_string()),
FieldType::Bytes => bytes_to_value($obj.get_bytes($i)?.data()),
FieldType::Date => date_to_value($obj.get_date($i)?)?,
FieldType::TimestampMillis => {
timestamp_to_value($settings, $obj.get_timestamp_millis($i)?)?
}
FieldType::TimestampMicros => timestamp_to_value(
$settings,
$obj.get_timestamp_micros($i).map(|ts| ts / 1000)?,
)?,
FieldType::Group => row_to_value($settings, $obj.get_group($i)?)?,
FieldType::List => list_to_value($settings, $obj.get_list($i)?)?,
FieldType::Map => map_to_value($settings, $obj.get_map($i)?)?,
}
};
fn element_to_value(settings: &Settings, field: &Field) -> Value {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember what were the changes made by Evgeney in his fork of Apache Arrow. Are we sure we don't miss these changes in the latest official Apache Arrow version?

match field {
Field::ULong(ulong) => ulong_to_value(*ulong, settings),
Field::Bytes(byte_array) => bytes_to_value(byte_array.as_bytes()),
Field::Float(float) => float_to_value(*float as f64),
Field::Double(double) => float_to_value(*double),
Field::Decimal(decimal) => Value::String(decimal_to_string(decimal)),
Field::Date(date) => date_to_value(*date).unwrap(),
Field::TimestampMillis(ts) => timestamp_to_value(settings, *ts).unwrap(),
Field::TimestampMicros(ts) => timestamp_to_value(settings, ts / 1000).unwrap(),
Field::Group(row) => row_to_value(settings, row).unwrap(),
Field::ListInternal(list) => list_to_value(settings, list).unwrap(),
Field::MapInternal(map) => map_to_value(settings, map).unwrap(),
_ => field.to_json_value(),
}
}

fn top_level_rows_to_json(
Expand All @@ -139,6 +123,7 @@ fn top_level_rows_to_json(
};
writeln!(writer, "{}", serde_json::to_string(&value)?)?;
}

Ok(())
}

Expand All @@ -152,30 +137,31 @@ fn top_level_rows_to_csv(
let mut csv_writer = csv::WriterBuilder::new()
.terminator(Terminator::Any(b'\r'))
.from_writer(vec![]);
let mut column_idx = 0;
let columns = settings.columns.as_ref();

match columns {
Some(cols) => {
let mut row_columns_map: HashMap<&String, &Field> = HashMap::new();
for (name, field) in row.get_column_iter() {
row_columns_map.insert(name, field);
}

// Produce empty values for columns specified by --columns argument, but missing in the file
for col in cols {
let value = if missing_columns.contains(col) {
Value::Null
} else {
let field_type = row.get_field_type(column_idx);
let val = element_to_value!(field_type, row, column_idx, settings);
column_idx += 1;
val
let field = row_columns_map.get(col).unwrap();
element_to_value(settings, field)
};

csv_writer.write_field(value_to_csv(&value))?;
}
}
None => {
// No columns specified by --columns argument
for i in 0..row.len() {
let field_type = row.get_field_type(i);
let value = element_to_value!(field_type, row, i, settings);
for (_, field) in row.get_column_iter() {
let value = element_to_value(settings, field);
csv_writer.write_field(value_to_csv(&value))?;
}
}
Expand Down Expand Up @@ -209,10 +195,8 @@ fn value_to_csv(value: &Value) -> String {

fn row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>> {
let mut map = serde_json::Map::with_capacity(row.len());
for i in 0..row.len() {
let name = row.get_field_name(i);
let field_type = row.get_field_type(i);
let value = element_to_value!(field_type, row, i, settings);
for (name, field) in row.get_column_iter() {
let value = element_to_value(settings, field);
if !(settings.omit_nulls && value.is_null()) {
map.insert(name.to_string(), value);
}
Expand All @@ -227,9 +211,8 @@ fn row_to_value(settings: &Settings, row: &Row) -> Result<Value, Box<dyn Error>>

fn list_to_value(settings: &Settings, list: &List) -> Result<Value, Box<dyn Error>> {
let mut arr = Vec::<Value>::with_capacity(list.len());
for i in 0..list.len() {
let elt_ty = list.get_element_type(i);
let value = element_to_value!(elt_ty, list, i, settings);
for field in list.elements().iter() {
let value = element_to_value(settings, field);
arr.push(value);
}

Expand All @@ -242,28 +225,24 @@ fn list_to_value(settings: &Settings, list: &List) -> Result<Value, Box<dyn Erro

fn map_to_value(settings: &Settings, map: &Map) -> Result<Value, Box<dyn Error>> {
let mut jsmap = serde_json::Map::with_capacity(map.len());
let keys = map.get_keys();
let values = map.get_values();
for i in 0..map.len() {
let key_ty = keys.get_element_type(i);
let key = match key_ty {
FieldType::Null => String::from("null"),
FieldType::Bool => keys.get_bool(i)?.to_string(),
FieldType::Byte => keys.get_byte(i)?.to_string(),
FieldType::Short => keys.get_short(i)?.to_string(),
FieldType::Int => keys.get_int(i)?.to_string(),
FieldType::Long => keys.get_long(i)?.to_string(),
FieldType::UByte => keys.get_ubyte(i)?.to_string(),
FieldType::UShort => keys.get_ushort(i)?.to_string(),
FieldType::UInt => keys.get_uint(i)?.to_string(),
FieldType::ULong => keys.get_ulong(i)?.to_string(),
FieldType::Str => keys.get_string(i)?.to_string(),
for (key_type, value_type) in map.entries() {
let key = match key_type {
Field::Null => String::from("null"),
Field::Bool(_)
| Field::Byte(_)
| Field::Short(_)
| Field::Int(_)
| Field::Long(_)
| Field::UByte(_)
| Field::UShort(_)
| Field::UInt(_)
| Field::ULong(_) => element_to_value(settings, key_type).to_string(),
Field::Str(string) => String::from(string),
// TODO: return error here
_ => panic!("Unsupported map key"),
};

let val_ty = values.get_element_type(i);
let value = element_to_value!(val_ty, values, i, settings);
let value = element_to_value(settings, value_type);
if !(settings.omit_nulls && value.is_null()) {
jsmap.insert(key, value);
}
Expand Down Expand Up @@ -301,19 +280,24 @@ fn ulong_to_value(l: u64, settings: &Settings) -> Value {
const TICKS_TILL_UNIX_TIME: u64 = 621355968000000000u64;

fn timestamp_to_value(settings: &Settings, ts: u64) -> Result<Value, Box<dyn Error>> {
let mut timestamp = ts as i64;
if timestamp < 0 {
timestamp = 0;
}

match settings.timestamp_rendering {
TimestampRendering::Ticks => {
let ticks = ts
let ticks = timestamp
.checked_mul(10000)
.and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME));
.and_then(|t| t.checked_add(TICKS_TILL_UNIX_TIME as i64));
let v = ticks
.map(|t| Value::Number(t.into()))
.unwrap_or(Value::Null);
Ok(v)
}
TimestampRendering::IsoStr => {
let seconds = (ts / 1000) as i64;
let nanos = ((ts % 1000) * 1000000) as u32;
let seconds = (timestamp / 1000) as i64;
let nanos = ((timestamp % 1000) * 1000000) as u32;
let datetime =
if let Some(dt) = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos) {
dt
Expand All @@ -323,7 +307,7 @@ fn timestamp_to_value(settings: &Settings, ts: u64) -> Result<Value, Box<dyn Err
let iso_str = datetime.format("%Y-%m-%dT%H:%M:%S.%6fZ").to_string();
Ok(Value::String(iso_str))
}
TimestampRendering::UnixMs => Ok(Value::Number(ts.into())),
TimestampRendering::UnixMs => Ok(Value::Number(timestamp.into())),
}
}

Expand Down
52 changes: 28 additions & 24 deletions pq2json/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fs::File;
use std::path::Path;

use itertools::Itertools;
use parquet::basic::{LogicalType, Type as PhysicalType};
use parquet::basic::{ConvertedType, Type as PhysicalType};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::printer::{print_file_metadata, print_parquet_metadata};
use parquet::schema::types::Type;
Expand Down Expand Up @@ -78,36 +78,40 @@ fn field_csl_schema(field_type: &Type) -> (&str, &str) {
physical_type,
..
} => {
let csl_type = match physical_type {
PhysicalType::BOOLEAN => "bool",
PhysicalType::BYTE_ARRAY => match basic_info.logical_type() {
LogicalType::UTF8 | LogicalType::ENUM => "string",
LogicalType::DECIMAL => "decimal",
_ => "dynamic",
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match basic_info.logical_type() {
LogicalType::DECIMAL => "decimal",
_ => "dynamic",
},
PhysicalType::DOUBLE | PhysicalType::FLOAT => "real",
PhysicalType::INT32 => match basic_info.logical_type() {
LogicalType::DATE => "datetime",
LogicalType::DECIMAL => "real",
_ => "int",
},
PhysicalType::INT64 => match basic_info.logical_type() {
LogicalType::TIMESTAMP_MILLIS | LogicalType::TIMESTAMP_MICROS => "datetime",
LogicalType::DECIMAL => "real",
_ => "long",
},
PhysicalType::INT96 => "datetime",
let csl_type = match basic_info.converted_type() {
ConvertedType::UTF8 => "string",
ConvertedType::TIMESTAMP_MICROS
| ConvertedType::TIMESTAMP_MILLIS
| ConvertedType::DATE => "datetime",
ConvertedType::INTERVAL => "timespan",
ConvertedType::DECIMAL => "decimal",
ConvertedType::LIST
| ConvertedType::MAP
| ConvertedType::MAP_KEY_VALUE
| ConvertedType::JSON => "dynamic",
ConvertedType::UINT_32 => "long",
ConvertedType::UINT_64 => "decimal",
_ => parquet_physical_to_csl_type(physical_type),
};
(basic_info.name(), csl_type)
}
Type::GroupType { ref basic_info, .. } => (basic_info.name(), "dynamic"),
}
}

fn parquet_physical_to_csl_type(parquet_type: &PhysicalType) -> &str {
match parquet_type {
PhysicalType::BOOLEAN => "bool",
PhysicalType::INT32 => "int",
PhysicalType::INT64 => "long",
PhysicalType::INT96 => "datetime",
PhysicalType::FLOAT => "real",
PhysicalType::DOUBLE => "real",
PhysicalType::BYTE_ARRAY => "string",
PhysicalType::FIXED_LEN_BYTE_ARRAY => "decimal",
}
}

/// Prints limited row groups metadata of a specified Parquet file as JSON,
/// for each row group its size in bytes and the number of rows.
///
Expand Down