From 5334b4ebd4c5ce16e78b325256a1a5568f9f830c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Metehan=20Y=C4=B1ld=C4=B1r=C4=B1m?= <44446768+metegenez@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:02:42 +0100 Subject: [PATCH 1/2] Solve the parquet conversion problem --- src/lib.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3998bf9..7fb4a02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ use std::path::{Path, PathBuf}; use std::time::Instant; use async_trait::async_trait; -use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaBuilder}; use datafusion::error::DataFusionError; use datafusion::parquet::basic::Compression; use datafusion::parquet::file::properties::WriterProperties; @@ -49,11 +49,15 @@ pub async fn convert_to_parquet( ) -> datafusion::error::Result<()> { for table in benchmark.get_table_names() { println!("Converting table {}", table); - let schema = benchmark.get_schema(table); + + let mut schema_builder = SchemaBuilder::from(benchmark.get_schema(table).fields); + schema_builder.push(Field::new("__placeholder", DataType::Utf8, true)); + let schema = schema_builder.finish(); let file_ext = format!(".{}", benchmark.get_table_ext()); let options = CsvReadOptions::new() .schema(&schema) + .has_header(false) .delimiter(b'|') .file_extension(&file_ext); @@ -178,7 +182,19 @@ pub async fn convert_tbl( // build plan to read the TBL file let csv_filename = format!("{}", input_path.display()); - let df = ctx.read_csv(&csv_filename, options.clone()).await?; + let mut df = ctx.read_csv(&csv_filename, options.clone()).await?; + + let schema = df.schema(); + // Select all apart from the padding column + let selection = df + .schema() + .fields() + .iter() + .take(schema.fields().len() - 1) + .map(|d| Expr::Column(d.qualified_column())) + .collect(); + + df = df.select(selection)?; match file_format { "csv" => df.write_csv(&output_filename).await?, From e3de7e825176028f395253c2bacb95ec57bded7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Metehan=20Y=C4=B1ld=C4=B1r=C4=B1m?= <44446768+metegenez@users.noreply.github.com> Date: Sun, 23 Jun 2024 18:02:24 +0100 Subject: [PATCH 2/2] Update tpch schema --- src/tpch.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/tpch.rs b/src/tpch.rs index 2e16596..008f633 100644 --- a/src/tpch.rs +++ b/src/tpch.rs @@ -153,7 +153,6 @@ impl Tpc for TpcH { Field::new("p_container", DataType::Utf8, false), Field::new("p_retailprice", DataType::Decimal128(11, 2), false), Field::new("p_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), "supplier" => Schema::new(vec![ @@ -164,7 +163,6 @@ impl Tpc for TpcH { Field::new("s_phone", DataType::Utf8, false), Field::new("s_acctbal", DataType::Decimal128(11, 2), false), Field::new("s_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), "partsupp" => Schema::new(vec![ @@ -173,7 +171,6 @@ impl Tpc for TpcH { Field::new("ps_availqty", DataType::Int32, false), Field::new("ps_supplycost", DataType::Decimal128(11, 2), false), Field::new("ps_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), "customer" => Schema::new(vec![ @@ -185,7 +182,6 @@ impl Tpc for TpcH { Field::new("c_acctbal", DataType::Decimal128(11, 2), false), Field::new("c_mktsegment", DataType::Utf8, false), Field::new("c_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), "orders" => Schema::new(vec![ @@ -198,7 +194,6 @@ impl Tpc for TpcH { Field::new("o_clerk", DataType::Utf8, false), Field::new("o_shippriority", DataType::Int32, false), Field::new("o_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), "lineitem" => Schema::new(vec![ @@ -218,7 +213,6 @@ impl Tpc for TpcH { Field::new("l_shipinstruct", DataType::Utf8, false), Field::new("l_shipmode", DataType::Utf8, false), Field::new("l_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), "nation" => Schema::new(vec![ @@ -226,14 +220,12 @@ impl Tpc for TpcH { Field::new("n_name", DataType::Utf8, false), Field::new("n_regionkey", DataType::Int64, false), Field::new("n_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), "region" => Schema::new(vec![ Field::new("r_regionkey", DataType::Int64, false), Field::new("r_name", DataType::Utf8, false), Field::new("r_comment", DataType::Utf8, false), - Field::new("ignore", DataType::Utf8, true), ]), _ => unimplemented!(),