diff --git a/README.md b/README.md index ee998fd..ce8b69c 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet'); - [Installation From Source](#installation-from-source) - [Usage](#usage) - [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-fromto-parquet-files-tofrom-postgres-tables) + - [Create Postgres tables from Parquet files](#create-postgres-tables-from-parquet-files) - [COPY FROM/TO Parquet stdin/stdout TO/FROM Postgres tables)](#copy-fromto-parquet-stdinstdout-tofrom-postgres-tables) - [COPY FROM/TO Parquet program stream TO/FROM Postgres tables)](#copy-fromto-parquet-program-stream-tofrom-postgres-tables) - [Inspect Parquet schema](#inspect-parquet-schema) @@ -68,7 +69,7 @@ psql> "CREATE EXTENSION pg_parquet;" ## Usage There are mainly 3 things that you can do with `pg_parquet`: 1. You can export Postgres tables/queries to Parquet files, stdin/stdout or a program's stream, -2. You can ingest data from Parquet files to Postgres tables, +2. You can ingest data from Parquet files to Postgres tables with type coercion and schema inference, 3. You can inspect the schema and metadata of Parquet files. ### COPY from/to Parquet files to/from Postgres tables @@ -110,6 +111,22 @@ COPY product_example FROM '/tmp/product_example.parquet'; SELECT * FROM product_example; ``` +### Create Postgres tables from Parquet files +You can use `CREATE TABLE () WITH (definition_from = )` command to **infer** the columns of Postgres tables. +You can even infer + populate the table via `CREATE TABLE () WITH (load_from = )`. + +```sql +-- create table with inferred columns and populated rows +CREATE TABLE product_inferred_example () WITH (load_from = '/tmp/product_example.parquet'); + +-- show table +SELECT * FROM product_inferred_example; +``` + +> [!NOTE] +> If the inferred column is of composite type, a new type will be created named as `parquet_structs.struct_`. Hash is determined by +field types and names of the composite type. + ### COPY from/to Parquet stdin/stdout to/from Postgres tables You can use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`): diff --git a/sql/bootstrap.sql b/sql/bootstrap.sql index 79c2f82..d0c8a5f 100644 --- a/sql/bootstrap.sql +++ b/sql/bootstrap.sql @@ -14,3 +14,6 @@ CREATE SCHEMA parquet; REVOKE ALL ON SCHEMA parquet FROM public; GRANT USAGE ON SCHEMA parquet TO public; GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA parquet TO public; + +-- error if the schema already exists +CREATE SCHEMA parquet_structs; diff --git a/sql/pg_parquet--0.5.1--0.5.2.sql b/sql/pg_parquet--0.5.1--0.5.2.sql new file mode 100644 index 0000000..491642f --- /dev/null +++ b/sql/pg_parquet--0.5.1--0.5.2.sql @@ -0,0 +1,2 @@ +-- error if the schema already exists +CREATE SCHEMA parquet_structs; diff --git a/sql/pg_parquet.sql b/sql/pg_parquet.sql index cb473c6..1d4fb22 100644 --- a/sql/pg_parquet.sql +++ b/sql/pg_parquet.sql @@ -104,3 +104,6 @@ CREATE FUNCTION parquet."column_stats"("uri" TEXT) RETURNS TABLE ( ) STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'column_stats_wrapper'; + +-- error if the schema already exists +CREATE SCHEMA parquet_structs; diff --git a/src/arrow_parquet.rs b/src/arrow_parquet.rs index b89c52d..2ab1e3f 100644 --- a/src/arrow_parquet.rs +++ b/src/arrow_parquet.rs @@ -7,5 +7,5 @@ pub(crate) mod parquet_reader; pub(crate) mod parquet_version; pub(crate) mod parquet_writer; pub(crate) mod pg_to_arrow; -pub(crate) mod schema_parser; +pub(crate) mod schema; pub(crate) mod uri_utils; diff --git a/src/arrow_parquet/parquet_reader.rs b/src/arrow_parquet/parquet_reader.rs index 40b91b6..c027642 100644 --- a/src/arrow_parquet/parquet_reader.rs +++ b/src/arrow_parquet/parquet_reader.rs @@ -23,9 +23,14 @@ use crate::{ arrow_parquet::{ arrow_to_pg::{context::collect_arrow_to_pg_attribute_contexts, to_pg_datum}, field_ids::FieldIds, - schema_parser::{ - error_if_copy_from_match_by_position_with_generated_columns, - parquet_schema_string_from_attributes, + schema::{ + coerce_schema::{ + ensure_file_schema_match_tupledesc_schema, + error_if_copy_from_match_by_position_with_generated_columns, + }, + parse_schema::{ + parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes, + }, }, }, parquet_udfs::list::list_uri, @@ -37,9 +42,6 @@ use crate::{ use super::{ arrow_to_pg::context::ArrowToPgAttributeContext, match_by::MatchBy, - schema_parser::{ - ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes, - }, uri_utils::{parquet_reader_from_uri, ParsedUriInfo}, }; diff --git a/src/arrow_parquet/parquet_writer.rs b/src/arrow_parquet/parquet_writer.rs index 90f2db4..ce1faab 100644 --- a/src/arrow_parquet/parquet_writer.rs +++ b/src/arrow_parquet/parquet_writer.rs @@ -16,7 +16,7 @@ use crate::{ compression::PgParquetCompressionWithLevel, field_ids::validate_field_ids, pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts, - schema_parser::{ + schema::parse_schema::{ parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes, }, uri_utils::parquet_writer_from_uri, diff --git a/src/arrow_parquet/schema.rs b/src/arrow_parquet/schema.rs new file mode 100644 index 0000000..b6805da --- /dev/null +++ b/src/arrow_parquet/schema.rs @@ -0,0 +1,3 @@ +pub(crate) mod coerce_schema; +pub(crate) mod infer_schema; +pub(crate) mod parse_schema; diff --git a/src/arrow_parquet/schema/coerce_schema.rs b/src/arrow_parquet/schema/coerce_schema.rs new file mode 100644 index 0000000..2019e4e --- /dev/null +++ b/src/arrow_parquet/schema/coerce_schema.rs @@ -0,0 +1,270 @@ +use std::sync::Arc; + +use arrow_cast::can_cast_types; +use arrow_schema::{DataType, FieldRef, Schema}; +use pgrx::{ + ereport, + pg_sys::{ + can_coerce_type, + CoercionContext::{self, COERCION_EXPLICIT}, + FormData_pg_attribute, InvalidOid, Oid, BOOLOID, BYTEAOID, DATEOID, FLOAT4OID, FLOAT8OID, + INT2OID, INT4OID, INT8OID, JSONOID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, + TIMESTAMPTZOID, UUIDOID, + }, + PgLogLevel, PgSqlErrorCode, PgTupleDesc, +}; + +use crate::{ + arrow_parquet::match_by::MatchBy, + pgrx_utils::{ + array_element_typoid, collect_attributes_for, domain_array_base_elem_type, + is_generated_attribute, tuple_desc, CollectAttributesFor, + }, + type_compat::pg_arrow_type_conversions::make_numeric_typmod, +}; + +// ensure_file_schema_match_tupledesc_schema throws an error if the file's schema does not match the table schema. +// If the file's arrow schema is castable to the table's arrow schema, it returns a vector of Option +// to cast to for each field. +pub(crate) fn ensure_file_schema_match_tupledesc_schema( + file_schema: Arc, + tupledesc_schema: Arc, + attributes: &[FormData_pg_attribute], + match_by: MatchBy, +) -> Vec> { + let mut cast_to_types = Vec::new(); + + if match_by == MatchBy::Position + && tupledesc_schema.fields().len() != file_schema.fields().len() + { + panic!( + "column count mismatch between table and parquet file. \ + parquet file has {} columns, but table has {} columns", + file_schema.fields().len(), + tupledesc_schema.fields().len() + ); + } + + for (tupledesc_schema_field, attribute) in + tupledesc_schema.fields().iter().zip(attributes.iter()) + { + let field_name = tupledesc_schema_field.name(); + + let file_schema_field = match match_by { + MatchBy::Position => file_schema.field(attribute.attnum as usize - 1), + + MatchBy::Name => { + let file_schema_field = file_schema.column_with_name(field_name); + + if file_schema_field.is_none() { + panic!("column \"{}\" is not found in parquet file", field_name); + } + + let (_, file_schema_field) = file_schema_field.unwrap(); + + file_schema_field + } + }; + + let file_schema_field = Arc::new(file_schema_field.clone()); + + let from_type = file_schema_field.data_type(); + let to_type = tupledesc_schema_field.data_type(); + + // no cast needed + if from_type == to_type { + cast_to_types.push(None); + continue; + } + + if !is_coercible( + &file_schema_field, + tupledesc_schema_field, + attribute.atttypid, + attribute.atttypmod, + ) { + panic!( + "type mismatch for column \"{}\" between table and parquet file.\n\n\ + table has \"{}\"\n\nparquet file has \"{}\"", + field_name, to_type, from_type + ); + } + + pgrx::debug2!( + "column \"{}\" is being cast from \"{}\" to \"{}\"", + field_name, + from_type, + to_type + ); + + cast_to_types.push(Some(to_type.clone())); + } + + cast_to_types +} + +// is_coercible first checks if "from_type" can be cast to "to_type" by arrow-cast. +// Then, it checks if the cast is meaningful at Postgres by seeing if there is +// an explicit coercion from "from_typoid" to "to_typoid". +// +// Additionaly, we need to be careful about struct rules for the cast: +// Arrow supports casting struct fields by field position instead of field name, +// which is not the intended behavior for pg_parquet. Hence, we make sure the field names +// match for structs. +fn is_coercible( + from_field: &FieldRef, + to_field: &FieldRef, + to_typoid: Oid, + to_typmod: i32, +) -> bool { + match (from_field.data_type(), to_field.data_type()) { + (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { + if from_fields.len() != to_fields.len() { + return false; + } + + let tupledesc = tuple_desc(to_typoid, to_typmod); + + let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc); + + for (from_field, (to_field, to_attribute)) in from_fields + .iter() + .zip(to_fields.iter().zip(attributes.iter())) + { + if from_field.name() != to_field.name() { + return false; + } + + if !is_coercible( + from_field, + to_field, + to_attribute.type_oid().value(), + to_attribute.type_mod(), + ) { + return false; + } + } + + true + } + (DataType::List(from_field), DataType::List(to_field)) + | (DataType::FixedSizeList(from_field, _), DataType::List(to_field)) + | (DataType::LargeList(from_field), DataType::List(to_field)) => { + let element_oid = array_element_typoid(to_typoid); + let element_typmod = to_typmod; + + is_coercible(from_field, to_field, element_oid, element_typmod) + } + (DataType::Map(from_entries_field, _), DataType::Map(to_entries_field, _)) => { + // entries field cannot be null + if from_entries_field.is_nullable() { + return false; + } + + let (entries_typoid, entries_typmod) = domain_array_base_elem_type(to_typoid); + + is_coercible( + from_entries_field, + to_entries_field, + entries_typoid, + entries_typmod, + ) + } + _ => { + // check if arrow-cast can cast the types + if !can_cast_types(from_field.data_type(), to_field.data_type()) { + return false; + } + + let (from_typoid, _) = pg_type_for_arrow_primitive_field(from_field); + + // pg_parquet could not recognize that arrow type + if from_typoid == InvalidOid { + return false; + } + + // check if coercion is meaningful at Postgres (it has a coercion path) + can_pg_coerce_types(from_typoid, to_typoid, COERCION_EXPLICIT) + } + } +} + +// pg_type_for_arrow_primitive_field returns Postgres type for given +// primitive arrow field. It returns InvalidOid if the arrow field's type is not recognized. +pub(crate) fn pg_type_for_arrow_primitive_field(field: &FieldRef) -> (Oid, i32) { + match field.data_type() { + DataType::Float32 | DataType::Float16 => (FLOAT4OID, -1), + DataType::Float64 => (FLOAT8OID, -1), + DataType::Int16 | DataType::UInt16 | DataType::Int8 | DataType::UInt8 => (INT2OID, -1), + DataType::UInt32 => (OIDOID, -1), + DataType::Int32 => (INT4OID, -1), + DataType::Int64 | DataType::UInt64 => (INT8OID, -1), + DataType::Decimal128(precision, scale) => ( + NUMERICOID, + make_numeric_typmod(*precision as _, *scale as _), + ), + DataType::Boolean => (BOOLOID, -1), + DataType::Date32 => (DATEOID, -1), + DataType::Time64(_) => (TIMEOID, -1), + DataType::Timestamp(_, None) => (TIMESTAMPOID, -1), + DataType::Timestamp(_, Some(_)) => (TIMESTAMPTZOID, -1), + DataType::Utf8 | DataType::LargeUtf8 if field.extension_type_name().is_none() => { + (TEXTOID, -1) + } + DataType::Utf8 | DataType::LargeUtf8 + if field + .try_extension_type::() + .is_ok() => + { + (JSONOID, -1) + } + DataType::Binary | DataType::LargeBinary => (BYTEAOID, -1), + DataType::FixedSizeBinary(16) + if field + .try_extension_type::() + .is_ok() => + { + (UUIDOID, -1) + } + _ => (InvalidOid, -1), + } +} + +fn can_pg_coerce_types(from_typoid: Oid, to_typoid: Oid, ccontext: CoercionContext::Type) -> bool { + let n_args = 1; + let input_typeids = [from_typoid]; + let target_typeids = [to_typoid]; + + unsafe { + can_coerce_type( + n_args, + input_typeids.as_ptr(), + target_typeids.as_ptr(), + ccontext, + ) + } +} + +pub(crate) fn error_if_copy_from_match_by_position_with_generated_columns( + tupledesc: &PgTupleDesc, + match_by: MatchBy, +) { + // match_by 'name' can handle generated columns + if let MatchBy::Name = match_by { + return; + } + + let attributes = collect_attributes_for(CollectAttributesFor::Other, tupledesc); + + for attribute in attributes { + if is_generated_attribute(&attribute) { + ereport!( + PgLogLevel::ERROR, + PgSqlErrorCode::ERRCODE_FEATURE_NOT_SUPPORTED, + "COPY FROM parquet with generated columns is not supported", + "Try COPY FROM parquet WITH (match_by 'name'). \" + It works only if the column names match with parquet file's.", + ); + } + } +} diff --git a/src/arrow_parquet/schema/infer_schema.rs b/src/arrow_parquet/schema/infer_schema.rs new file mode 100644 index 0000000..23d0aa6 --- /dev/null +++ b/src/arrow_parquet/schema/infer_schema.rs @@ -0,0 +1,182 @@ +use std::hash::{Hash, Hasher}; + +use arrow_schema::{DataType, FieldRef, Fields}; +use pgrx::{ + pg_sys::{ + makeColumnDef, makeTypeName, typenameTypeIdAndMod, AsPgCStr, ColumnDef, + CommandCounterIncrement, InvalidOid, Oid, + }, + PgList, Spi, +}; + +use crate::{ + arrow_parquet::uri_utils::{parquet_reader_from_uri, ParsedUriInfo}, + pgrx_utils::{array_typoid, get_type_name, type_info_from_name}, +}; + +use super::coerce_schema::pg_type_for_arrow_primitive_field; + +pub(crate) fn infer_columns_from_uri(uri_info: &ParsedUriInfo) -> PgList { + let parquet_reader = parquet_reader_from_uri(uri_info).unwrap_or_else(|e| { + panic!( + "failed to create parquet reader for uri {}: {}", + uri_info.uri, e + ) + }); + + let parquet_schema = parquet_reader.schema(); + + let mut column_defs = PgList::new(); + + for field in parquet_schema.fields() { + let (typoid, typmod) = get_or_create_pg_type_from_arrow_type(field); + + let field_name = field.name(); + + let column_def = + unsafe { makeColumnDef(field_name.as_pg_cstr(), typoid, typmod, InvalidOid) }; + + column_defs.push(column_def); + } + + column_defs +} + +// get_or_create_pg_type_from_arrow_type returns the Postgres type for the given arrow field. +// If the arrow field is a struct or a map, it creates a new Postgres type in case it does not exist. +fn get_or_create_pg_type_from_arrow_type(field: &FieldRef) -> (Oid, i32) { + match field.data_type() { + DataType::Struct(fields) => { + let struct_attribute_oids = get_or_create_struct_field_types(fields); + + let struct_attribute_names = + fields.iter().map(|f| f.name().as_str()).collect::>(); + + let struct_type_name = + get_parquet_struct_typename(&struct_attribute_oids, &struct_attribute_names); + + let struct_schema_name = "parquet_structs"; + + let (typoid, typmod) = type_info_from_name(struct_schema_name, &struct_type_name); + + if typoid != InvalidOid { + return (typoid, typmod); + } + + create_struct_type( + struct_schema_name, + &struct_type_name, + &struct_attribute_oids, + &struct_attribute_names, + ) + } + DataType::List(element_field) => { + let (element_typoid, element_typmod) = + get_or_create_pg_type_from_arrow_type(element_field); + + (array_typoid(element_typoid), element_typmod) + } + DataType::Map(entries_field, _) => match entries_field.data_type() { + DataType::Struct(fields) => { + let key_field = fields.find("key").expect("expected key field").1; + let (key_typoid, _) = get_or_create_pg_type_from_arrow_type(key_field); + let key_typename = get_type_name(key_typoid); + + let value_field = fields.find("val").expect("expected val field").1; + let (value_typoid, _) = get_or_create_pg_type_from_arrow_type(value_field); + let value_typename = get_type_name(value_typoid); + + let create_map_command = format!( + "SELECT crunchy_map.create('{}', '{}');", + key_typename, value_typename + ); + + let map_typename = Spi::get_one::<&str>(create_map_command.as_str()) + .unwrap_or_else(|e| { + panic!("failed to create map type: {}", e); + }) + .unwrap_or_else(|| { + panic!("failed to create map type"); + }); + + let mut typoid = InvalidOid; + let mut typmod = -1; + + let typename = unsafe { makeTypeName(map_typename.as_pg_cstr()) }; + + unsafe { + typenameTypeIdAndMod(std::ptr::null_mut(), typename, &mut typoid, &mut typmod) + }; + + (typoid, typmod) + } + _ => panic!("expected struct data type for map entries"), + }, + _ => pg_type_for_arrow_primitive_field(field), + } +} + +// get_or_create_struct_field_types returns a vector of Postgres types for the fields of a struct. +// If the field is a struct or a map, it creates a new Postgres type in case it does not exist. +fn get_or_create_struct_field_types(fields: &Fields) -> Vec { + let mut field_oids = vec![]; + + for field in fields { + let (typoid, _) = get_or_create_pg_type_from_arrow_type(field); + + field_oids.push(typoid); + } + + field_oids +} + +// get_parquet_struct_typename returns the name of the Postgres type for the given attribute oids and names. +fn get_parquet_struct_typename(attribute_oids: &[Oid], attribute_names: &[&str]) -> String { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + + for (oid, name) in attribute_oids.iter().zip(attribute_names.iter()) { + oid.hash(&mut hasher); + name.hash(&mut hasher); + } + + format!("struct_{}", hasher.finish()) +} + +// create_struct_type creates a new Postgres type for the given attribute oids and names +// by executing a CREATE TYPE command. +fn create_struct_type( + schema_name: &str, + type_name: &str, + attribute_oids: &[Oid], + attribute_names: &[&str], +) -> (Oid, i32) { + let mut create_type_command = String::new(); + + create_type_command + .push_str(format!("CREATE TYPE {}.{} AS (", schema_name, type_name).as_str()); + + for (att_idx, (oid, name)) in attribute_oids + .iter() + .zip(attribute_names.iter()) + .enumerate() + { + let field_typename = get_type_name(*oid); + + create_type_command.push_str(format!("{} {}", name, field_typename).as_str()); + + if att_idx < attribute_oids.len() - 1 { + create_type_command.push_str(", "); + } + } + + create_type_command.push_str(");"); + + Spi::run(&create_type_command).unwrap_or_else(|e| { + panic!("failed to create struct type: {}", e); + }); + + // increment the command counter to make the type visible + unsafe { CommandCounterIncrement() }; + + type_info_from_name(schema_name, type_name) +} diff --git a/src/arrow_parquet/schema_parser.rs b/src/arrow_parquet/schema/parse_schema.rs similarity index 63% rename from src/arrow_parquet/schema_parser.rs rename to src/arrow_parquet/schema/parse_schema.rs index 8a2dc91..f3b0377 100644 --- a/src/arrow_parquet/schema_parser.rs +++ b/src/arrow_parquet/schema/parse_schema.rs @@ -5,22 +5,20 @@ use std::{ }; use arrow::datatypes::{Field, Fields, Schema}; -use arrow_cast::can_cast_types; -use arrow_schema::{DataType, FieldRef}; +use arrow_schema::FieldRef; use parquet::arrow::{ArrowSchemaConverter, PARQUET_FIELD_ID_META_KEY}; use pg_sys::{ - can_coerce_type, - CoercionContext::{self, COERCION_EXPLICIT}, - FormData_pg_attribute, InvalidOid, Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, - FLOAT8OID, INT2OID, INT4OID, INT8OID, JSONBOID, JSONOID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, - TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID, UUIDOID, + FormData_pg_attribute, Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, + INT4OID, INT8OID, JSONBOID, JSONOID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, + TIMESTAMPTZOID, TIMETZOID, UUIDOID, }; use pgrx::{check_for_interrupts, prelude::*, PgTupleDesc}; use crate::{ + arrow_parquet::field_ids::FieldIds, pgrx_utils::{ array_element_typoid, collect_attributes_for, domain_array_base_elem_type, is_array_type, - is_composite_type, is_generated_attribute, tuple_desc, CollectAttributesFor, + is_composite_type, tuple_desc, CollectAttributesFor, }, type_compat::{ geometry::is_postgis_geometry_type, @@ -31,8 +29,6 @@ use crate::{ }, }; -use super::{field_ids::FieldIds, match_by::MatchBy}; - pub(crate) fn parquet_schema_string_from_attributes( attributes: &[FormData_pg_attribute], field_ids: FieldIds, @@ -199,7 +195,7 @@ fn parse_struct_schema( let struct_field_id = field_id_mapping_context.next_root_field_id(); - let mut child_fields: Vec> = vec![]; + let mut child_fields = vec![]; let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc); @@ -424,6 +420,9 @@ fn parse_primitive_schema( field_id_mapping_context.field_with_id(field, primitive_field_id) } +// adjust_map_entries_field adjusts the data type of the map entries field. +// The key field is not nullable, while the value field is nullable. +// The map entries field is not nullable. fn adjust_map_entries_field(field: FieldRef) -> FieldRef { let not_nullable_key_field; let nullable_value_field; @@ -474,242 +473,3 @@ fn adjust_map_entries_field(field: FieldRef) -> FieldRef { Arc::new(entries_field) } - -pub(crate) fn error_if_copy_from_match_by_position_with_generated_columns( - tupledesc: &PgTupleDesc, - match_by: MatchBy, -) { - // match_by 'name' can handle generated columns - if let MatchBy::Name = match_by { - return; - } - - let attributes = collect_attributes_for(CollectAttributesFor::Other, tupledesc); - - for attribute in attributes { - if is_generated_attribute(&attribute) { - ereport!( - PgLogLevel::ERROR, - PgSqlErrorCode::ERRCODE_FEATURE_NOT_SUPPORTED, - "COPY FROM parquet with generated columns is not supported", - "Try COPY FROM parquet WITH (match_by 'name'). \" - It works only if the column names match with parquet file's.", - ); - } - } -} - -// ensure_file_schema_match_tupledesc_schema throws an error if the file's schema does not match the table schema. -// If the file's arrow schema is castable to the table's arrow schema, it returns a vector of Option -// to cast to for each field. -pub(crate) fn ensure_file_schema_match_tupledesc_schema( - file_schema: Arc, - tupledesc_schema: Arc, - attributes: &[FormData_pg_attribute], - match_by: MatchBy, -) -> Vec> { - let mut cast_to_types = Vec::new(); - - if match_by == MatchBy::Position - && tupledesc_schema.fields().len() != file_schema.fields().len() - { - panic!( - "column count mismatch between table and parquet file. \ - parquet file has {} columns, but table has {} columns", - file_schema.fields().len(), - tupledesc_schema.fields().len() - ); - } - - for (tupledesc_schema_field, attribute) in - tupledesc_schema.fields().iter().zip(attributes.iter()) - { - let field_name = tupledesc_schema_field.name(); - - let file_schema_field = match match_by { - MatchBy::Position => file_schema.field(attribute.attnum as usize - 1), - - MatchBy::Name => { - let file_schema_field = file_schema.column_with_name(field_name); - - if file_schema_field.is_none() { - panic!("column \"{field_name}\" is not found in parquet file"); - } - - let (_, file_schema_field) = file_schema_field.unwrap(); - - file_schema_field - } - }; - - let file_schema_field = Arc::new(file_schema_field.clone()); - - let from_type = file_schema_field.data_type(); - let to_type = tupledesc_schema_field.data_type(); - - // no cast needed - if from_type == to_type { - cast_to_types.push(None); - continue; - } - - if !is_coercible( - &file_schema_field, - tupledesc_schema_field, - attribute.atttypid, - attribute.atttypmod, - ) { - panic!( - "type mismatch for column \"{field_name}\" between table and parquet file.\n\n\ - table has \"{to_type}\"\n\nparquet file has \"{from_type}\"" - ); - } - - pgrx::debug2!( - "column \"{}\" is being cast from \"{}\" to \"{}\"", - field_name, - from_type, - to_type - ); - - cast_to_types.push(Some(to_type.clone())); - } - - cast_to_types -} - -// is_coercible first checks if "from_type" can be cast to "to_type" by arrow-cast. -// Then, it checks if the cast is meaningful at Postgres by seeing if there is -// an explicit coercion from "from_typoid" to "to_typoid". -// -// Additionaly, we need to be careful about struct rules for the cast: -// Arrow supports casting struct fields by field position instead of field name, -// which is not the intended behavior for pg_parquet. Hence, we make sure the field names -// match for structs. -fn is_coercible( - from_field: &FieldRef, - to_field: &FieldRef, - to_typoid: Oid, - to_typmod: i32, -) -> bool { - match (from_field.data_type(), to_field.data_type()) { - (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { - if from_fields.len() != to_fields.len() { - return false; - } - - let tupledesc = tuple_desc(to_typoid, to_typmod); - - let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc); - - for (from_field, (to_field, to_attribute)) in from_fields - .iter() - .zip(to_fields.iter().zip(attributes.iter())) - { - if from_field.name() != to_field.name() { - return false; - } - - if !is_coercible( - from_field, - to_field, - to_attribute.type_oid().value(), - to_attribute.type_mod(), - ) { - return false; - } - } - - true - } - (DataType::List(from_field), DataType::List(to_field)) - | (DataType::FixedSizeList(from_field, _), DataType::List(to_field)) - | (DataType::LargeList(from_field), DataType::List(to_field)) => { - let element_oid = array_element_typoid(to_typoid); - let element_typmod = to_typmod; - - is_coercible(from_field, to_field, element_oid, element_typmod) - } - (DataType::Map(from_entries_field, _), DataType::Map(to_entries_field, _)) => { - // entries field cannot be null - if from_entries_field.is_nullable() { - return false; - } - - let (entries_typoid, entries_typmod) = domain_array_base_elem_type(to_typoid); - - is_coercible( - from_entries_field, - to_entries_field, - entries_typoid, - entries_typmod, - ) - } - _ => { - // check if arrow-cast can cast the types - if !can_cast_types(from_field.data_type(), to_field.data_type()) { - return false; - } - - let from_typoid = pg_type_for_arrow_primitive_field(from_field); - - // pg_parquet could not recognize that arrow type - if from_typoid == InvalidOid { - return false; - } - - // check if coercion is meaningful at Postgres (it has a coercion path) - can_pg_coerce_types(from_typoid, to_typoid, COERCION_EXPLICIT) - } - } -} - -fn can_pg_coerce_types(from_typoid: Oid, to_typoid: Oid, ccontext: CoercionContext::Type) -> bool { - let n_args = 1; - let input_typeids = [from_typoid]; - let target_typeids = [to_typoid]; - - unsafe { - can_coerce_type( - n_args, - input_typeids.as_ptr(), - target_typeids.as_ptr(), - ccontext, - ) - } -} - -// pg_type_for_arrow_primitive_field returns Postgres type for given -// primitive arrow field. It returns InvalidOid if the arrow field's type is not recognized. -fn pg_type_for_arrow_primitive_field(field: &FieldRef) -> Oid { - match field.data_type() { - DataType::Float32 | DataType::Float16 => FLOAT4OID, - DataType::Float64 => FLOAT8OID, - DataType::Int16 | DataType::UInt16 | DataType::Int8 | DataType::UInt8 => INT2OID, - DataType::Int32 | DataType::UInt32 => INT4OID, - DataType::Int64 | DataType::UInt64 => INT8OID, - DataType::Decimal128(_, _) => NUMERICOID, - DataType::Boolean => BOOLOID, - DataType::Date32 => DATEOID, - DataType::Time64(_) => TIMEOID, - DataType::Timestamp(_, None) => TIMESTAMPOID, - DataType::Timestamp(_, Some(_)) => TIMESTAMPTZOID, - DataType::Utf8 | DataType::LargeUtf8 if field.extension_type_name().is_none() => TEXTOID, - DataType::Utf8 | DataType::LargeUtf8 - if field - .try_extension_type::() - .is_ok() => - { - JSONOID - } - DataType::Binary | DataType::LargeBinary => BYTEAOID, - DataType::FixedSizeBinary(16) - if field - .try_extension_type::() - .is_ok() => - { - UUIDOID - } - _ => InvalidOid, - } -} diff --git a/src/parquet_copy_hook.rs b/src/parquet_copy_hook.rs index 6a58ca8..0bbc1af 100644 --- a/src/parquet_copy_hook.rs +++ b/src/parquet_copy_hook.rs @@ -7,5 +7,6 @@ pub(crate) mod copy_to_program; pub(crate) mod copy_to_split_dest_receiver; pub(crate) mod copy_to_stdout; pub(crate) mod copy_utils; +pub(crate) mod create_table; pub(crate) mod hook; pub(crate) mod pg_compat; diff --git a/src/parquet_copy_hook/copy_utils.rs b/src/parquet_copy_hook/copy_utils.rs index 5cc7163..fe0d30d 100644 --- a/src/parquet_copy_hook/copy_utils.rs +++ b/src/parquet_copy_hook/copy_utils.rs @@ -402,15 +402,35 @@ pub(crate) fn copy_from_stmt_match_by(p_stmt: &PgBox) -> MatchBy { } } -pub(crate) fn copy_stmt_get_option( - p_stmt: &PgBox, - option_name: &str, -) -> PgBox { +fn copy_stmt_get_option(p_stmt: &PgBox, option_name: &str) -> PgBox { let copy_stmt = unsafe { PgBox::::from_pg(p_stmt.utilityStmt as _) }; + get_option(copy_stmt.options, option_name) +} - let copy_options = unsafe { PgList::::from_pg(copy_stmt.options) }; +pub(crate) fn has_option(options: *mut List, option_name: &str) -> bool { + let options = unsafe { PgList::::from_pg(options) }; + + for option in options.iter_ptr() { + let option = unsafe { PgBox::::from_pg(option) }; + + let current_option_name = unsafe { + CStr::from_ptr(option.defname) + .to_str() + .expect("option name is not a valid CString") + }; + + if current_option_name == option_name { + return true; + } + } + + false +} + +pub(crate) fn get_option(options: *mut List, option_name: &str) -> PgBox { + let options = unsafe { PgList::::from_pg(options) }; - for current_option in copy_options.iter_ptr() { + for current_option in options.iter_ptr() { let current_option = unsafe { PgBox::::from_pg(current_option) }; let current_option_name = unsafe { @@ -455,7 +475,7 @@ fn is_copy_parquet_stmt(p_stmt: &PgBox, copy_from: bool) -> bool { let uri_info = uri_info.unwrap(); // only parquet format is supported - if !is_parquet_format_option(p_stmt) && !is_parquet_uri(uri_info.uri.clone()) { + if !has_parquet_format_option(p_stmt) && !is_parquet_uri(uri_info.uri.clone()) { return false; } @@ -492,7 +512,7 @@ pub(crate) fn is_copy_from_parquet_stmt(p_stmt: &PgBox) -> bool { is_copy_parquet_stmt(p_stmt, copy_from) } -fn is_parquet_format_option(p_stmt: &PgBox) -> bool { +fn has_parquet_format_option(p_stmt: &PgBox) -> bool { let format_option = copy_stmt_get_option(p_stmt, "format"); if format_option.is_null() { diff --git a/src/parquet_copy_hook/create_table.rs b/src/parquet_copy_hook/create_table.rs new file mode 100644 index 0000000..d2b7bfc --- /dev/null +++ b/src/parquet_copy_hook/create_table.rs @@ -0,0 +1,166 @@ +use std::ffi::CStr; + +use pgrx::{ + is_a, + pg_sys::{ + defGetString, makeDefElem, makeString, AsPgCStr, ColumnDef, CopyStmt, CreateStmt, DefElem, + NodeTag::{T_CopyStmt, T_CreateStmt}, + PlannedStmt, RangeVar, + }, + PgBox, PgList, +}; + +use crate::arrow_parquet::{ + schema::infer_schema::infer_columns_from_uri, + uri_utils::{ensure_access_privilege_to_uri, uri_as_string, ParsedUriInfo}, +}; + +use super::copy_utils::{get_option, has_option}; + +pub(crate) fn is_create_table_from_parquet_stmt(p_stmt: &PgBox) -> bool { + let is_create_stmt = unsafe { is_a(p_stmt.utilityStmt, T_CreateStmt) }; + + if !is_create_stmt { + return false; + } + + let create_stmt = unsafe { PgBox::::from_pg(p_stmt.utilityStmt as _) }; + + has_option(create_stmt.options, "load_from") + || has_option(create_stmt.options, "definition_from") +} + +pub(crate) fn validate_create_table_from_parquet_stmt(create_stmt: &PgBox) { + let definition_from = has_option(create_stmt.options, "definition_from"); + let load_from = has_option(create_stmt.options, "load_from"); + + if load_from && definition_from { + panic!("cannot specify both 'load_from' and 'definition_from' options"); + } + + if !create_stmt.partbound.is_null() { + panic!("cannot create a partition table from a parquet file"); + } + + if !create_stmt.tableElts.is_null() { + panic!("cannot create a table from a parquet file when column definitions are provided"); + } +} + +pub(crate) fn infer_column_definitions(create_stmt: &PgBox) -> PgList { + let definition_from = has_option(create_stmt.options, "definition_from"); + let load_from = has_option(create_stmt.options, "load_from"); + + if load_from && definition_from { + panic!("cannot specify both 'load_from' and 'definition_from' options"); + } + + if !create_stmt.partbound.is_null() { + panic!("cannot infer column definitions for a partition table"); + } + + if !create_stmt.tableElts.is_null() { + panic!("cannot infer column definitions when column definitions are provided"); + } + + let uri_info = create_stmt_get_uri(create_stmt); + + let copy_from = true; + ensure_access_privilege_to_uri(&uri_info, copy_from); + + infer_columns_from_uri(&uri_info) +} + +pub(crate) fn create_stmt_get_uri(create_stmt: &PgBox) -> ParsedUriInfo { + let load_from_uri = create_stmt_get_load_from_uri(create_stmt); + let definition_from_uri = create_stmt_get_definition_from_uri(create_stmt); + + if let Some(uri_info) = load_from_uri { + uri_info + } else if let Some(uri_info) = definition_from_uri { + uri_info + } else { + panic!("either 'load_from' or 'definition_from' option must be specified"); + } +} + +fn create_stmt_get_load_from_uri(create_stmt: &PgBox) -> Option { + let load_from_option = get_option(create_stmt.options, "load_from"); + + if load_from_option.is_null() { + return None; + } + + let uri = unsafe { defGetString(load_from_option.as_ptr()) }; + + let uri = unsafe { + CStr::from_ptr(uri) + .to_str() + .expect("load_from option is not a valid CString") + }; + + Some(ParsedUriInfo::try_from(uri).expect("invalid uri")) +} + +fn create_stmt_get_definition_from_uri(create_stmt: &PgBox) -> Option { + let definition_from_option = get_option(create_stmt.options, "definition_from"); + + if definition_from_option.is_null() { + return None; + } + + let uri = unsafe { defGetString(definition_from_option.as_ptr()) }; + + let uri = unsafe { + CStr::from_ptr(uri) + .to_str() + .expect("definition_from option is not a valid CString") + }; + + Some(ParsedUriInfo::try_from(uri).expect("invalid uri")) +} + +pub(crate) fn create_stmt_remove_parquet_options(create_stmt: &mut PgBox) { + let options = unsafe { PgList::::from_pg(create_stmt.options) }; + + let mut new_options = PgList::::new(); + + for option in options.iter_ptr() { + let option = unsafe { PgBox::::from_pg(option) }; + + let option_name = unsafe { + CStr::from_ptr(option.defname) + .to_str() + .expect("option name is not a valid CString") + }; + + if option_name == "load_from" || option_name == "definition_from" { + continue; + } + + new_options.push(option.as_ptr()); + } + + create_stmt.options = new_options.into_pg(); +} + +pub(crate) fn create_copy_from_parquet_stmt_for_table( + table: *mut RangeVar, + uri_info: &ParsedUriInfo, +) -> PgBox { + let mut copy_from_stmt = unsafe { PgBox::::alloc_node(T_CopyStmt) }; + copy_from_stmt.relation = table; + copy_from_stmt.is_from = true; + copy_from_stmt.filename = uri_as_string(&uri_info.uri).as_pg_cstr(); + + let format_option_name = "format".as_pg_cstr(); + let format_option_val = unsafe { makeString("parquet".as_pg_cstr()) } as _; + let format_option = unsafe { makeDefElem(format_option_name, format_option_val, -1) }; + + let mut new_copy_options = PgList::::new(); + new_copy_options.push(format_option); + + copy_from_stmt.options = new_copy_options.into_pg(); + + copy_from_stmt.into_pg_boxed() +} diff --git a/src/parquet_copy_hook/hook.rs b/src/parquet_copy_hook/hook.rs index c3851d9..7ccba59 100644 --- a/src/parquet_copy_hook/hook.rs +++ b/src/parquet_copy_hook/hook.rs @@ -1,8 +1,9 @@ use std::ffi::{c_char, CStr}; use pg_sys::{ - standard_ProcessUtility, AsPgCStr, CommandTag, DestReceiver, ParamListInfoData, PlannedStmt, - ProcessUtility_hook, ProcessUtility_hook_type, QueryCompletion, QueryEnvironment, + nodeToString, standard_ProcessUtility, AsPgCStr, CommandTag, CreateStmt, DestReceiver, + ParamListInfoData, PlannedStmt, ProcessUtility_hook, ProcessUtility_hook_type, QueryCompletion, + QueryEnvironment, }; use pgrx::{prelude::*, GucSetting}; @@ -25,7 +26,13 @@ use super::{ copy_to_split_dest_receiver::free_copy_to_parquet_split_dest_receiver, copy_utils::{ copy_to_stmt_compression, copy_to_stmt_field_ids, copy_to_stmt_file_size_bytes, - copy_to_stmt_parquet_version, validate_copy_from_options, validate_copy_to_options, + copy_to_stmt_parquet_version, has_option, validate_copy_from_options, + validate_copy_to_options, + }, + create_table::{ + create_copy_from_parquet_stmt_for_table, create_stmt_get_uri, + create_stmt_remove_parquet_options, infer_column_definitions, + is_create_table_from_parquet_stmt, validate_create_table_from_parquet_stmt, }, }; @@ -130,6 +137,60 @@ fn process_copy_from_parquet( .execute() } +#[allow(clippy::too_many_arguments)] +fn process_create_table_from_parquet( + p_stmt: &mut PgBox, + query_string: &CStr, + read_only_tree: bool, + context: u32, + params: *mut ParamListInfoData, + query_env: *mut QueryEnvironment, + dest: *mut DestReceiver, + completion_tag: *mut QueryCompletion, +) { + let mut create_stmt = unsafe { PgBox::::from_pg(p_stmt.utilityStmt as _) }; + + validate_create_table_from_parquet_stmt(&create_stmt); + + let uri_info = create_stmt_get_uri(&create_stmt); + + let load_from = has_option(create_stmt.options, "load_from"); + + let column_defs = infer_column_definitions(&create_stmt); + create_stmt.tableElts = column_defs.into_pg(); + + // remove pg_parquet specific options to make PG happy + create_stmt_remove_parquet_options(&mut create_stmt); + + unsafe { + ProcessUtility_hook.expect("ProcessUtility_hook is None")( + p_stmt.as_ptr(), + query_string.as_ptr(), + read_only_tree, + context, + params, + query_env, + dest, + completion_tag, + ); + } + + if load_from { + let copy_from_stmt = + create_copy_from_parquet_stmt_for_table(create_stmt.relation, &uri_info); + + let mut planned_stmt = p_stmt.clone(); + planned_stmt.utilityStmt = copy_from_stmt.into_pg() as _; + + let query_string = unsafe { nodeToString(planned_stmt.utilityStmt as _) }; + let query_string = unsafe { CStr::from_ptr(query_string) }; + + process_copy_from_parquet(&planned_stmt, query_string, unsafe { + &PgBox::from_pg(query_env) + }); + } +} + #[pg_guard] #[allow(clippy::too_many_arguments)] extern "C-unwind" fn parquet_copy_hook( @@ -142,7 +203,7 @@ extern "C-unwind" fn parquet_copy_hook( dest: *mut DestReceiver, completion_tag: *mut QueryCompletion, ) { - let p_stmt = unsafe { PgBox::from_pg(p_stmt) }; + let mut p_stmt = unsafe { PgBox::from_pg(p_stmt) }; let query_string = unsafe { CStr::from_ptr(query_string) }; let params = unsafe { PgBox::from_pg(params) }; let query_env = unsafe { PgBox::from_pg(query_env) }; @@ -164,6 +225,18 @@ extern "C-unwind" fn parquet_copy_hook( completion_tag.commandTag = CommandTag::CMDTAG_COPY; } return; + } else if ENABLE_PARQUET_COPY_HOOK.get() && is_create_table_from_parquet_stmt(&p_stmt) { + process_create_table_from_parquet( + &mut p_stmt, + query_string, + read_only_tree, + context, + params.as_ptr(), + query_env.as_ptr(), + dest, + completion_tag.as_ptr(), + ); + return; } unsafe { diff --git a/src/pgrx_tests/common.rs b/src/pgrx_tests/common.rs index a0ed99c..d6a2837 100644 --- a/src/pgrx_tests/common.rs +++ b/src/pgrx_tests/common.rs @@ -8,10 +8,10 @@ use crate::type_compat::map::Map; use arrow::array::RecordBatch; use arrow_schema::SchemaRef; use parquet::arrow::ArrowWriter; -use pgrx::spi; use pgrx::{ datum::{Time, TimeWithTimeZone}, - FromDatum, IntoDatum, Spi, + pg_sys::Oid, + spi, FromDatum, IntoDatum, Spi, }; use pgrx::{Json, JsonB}; @@ -380,3 +380,77 @@ pub(crate) fn create_crunchy_map_type(key_type: &str, val_type: &str) -> String let command = format!("SELECT crunchy_map.create('{key_type}','{val_type}')::text;",); Spi::get_one(&command).unwrap().unwrap() } + +pub(crate) fn ensure_table_attribute_type( + table_name: &str, + attribute_name: &str, + expected_typoid: Oid, + expected_typmod: i32, +) { + let query = format!( + "select atttypid, atttypmod from pg_attribute + where attrelid = (select oid from pg_class where relname = '{}') and + attname = '{}'", + table_name, attribute_name + ); + + let (result_typoid, result_typmod) = Spi::get_two::(&query).unwrap(); + assert_eq!(expected_typoid, result_typoid.unwrap()); + assert_eq!(expected_typmod, result_typmod.unwrap()); +} + +pub(crate) fn copy_to_helper(uri: &str) { + let create_type = "CREATE TYPE child AS (id int); + CREATE TYPE parent AS (id int, child child, children child[]);"; + Spi::run(create_type).unwrap(); + + let copy_to = format!("COPY (SELECT 11::smallint AS a, + array[11::smallint, null] AS a_array, + 232::int AS b, + array[232::int, null] AS b_array, + 2342::bigint AS c, + array[2342::bigint, null] AS c_array, + 12.34::float4 AS d, + array[12.34::float4, null] AS d_array, + 123.325::float8 AS e, + array[123.325::float8, null] AS e_array, + 123.24535::numeric AS f, + array[123.24535::numeric, null] AS f_array, + 123.24535::numeric(8,5) AS f_with_typmod, + array[123.24535::numeric(8,5), null::numeric(8,5)] AS f_with_typmod_array, + false::bool AS g, + array[false, null] AS g_array, + '2022-05-05'::date AS h, + array['2022-05-05'::date, null] AS h_array, + '2022-05-05 13:00:00'::timestamp AS i, + array['2022-05-05 13:00:00'::timestamp, null] AS i_array, + '2022-05-05 13:00:00-05'::timestamptz AS j, + array['2022-05-05 13:00:00-05'::timestamptz, null] AS j_array, + '13:00:00'::time AS k, + array['13:00:00'::time, null] AS k_array, + '13:00:00-05'::timetz AS l, + array['13:00:00-05'::timetz, null] AS l_array, + '2 years 3 minutes 3 seconds'::interval AS m, + array['2 years 3 minutes 3 seconds'::interval, null] AS m_array, + 'a'::\"char\" AS n, + array['a'::\"char\", null] AS n_array, + 'hello'::text AS o, + array['hello'::text, null] AS o_array, + 'hello'::bytea AS p, + array['hello'::bytea, null] AS p_array, + 'hello'::varchar AS q, + array['hello'::varchar, null] AS q_array, + 'hello'::bpchar AS r, + array['hello'::bpchar, null] AS r_array, + 'hello'::name AS s, + array['hello'::name, null] AS s_array, + '{{\"id\": 12, \"name\": \"Doe\"}}'::json AS t, + array['{{\"id\": 12, \"name\": \"Doe\"}}'::json, null] AS t_array, + '{{\"id\": 12, \"name\": \"Doe\"}}'::jsonb AS u, + array['{{\"id\": 12, \"name\": \"Doe\"}}'::jsonb, null] AS u_array, + 123::oid AS v, + array[123::oid, null] AS v_array, + row(1, row(10)::child, array[row(10), null]::child[])::parent AS y, + array[row(1, row(10)::child, array[row(10), null]::child[])::parent, null] AS y_array) TO '{}'", uri); + Spi::run(©_to).unwrap(); +} diff --git a/src/pgrx_tests/create_table_from.rs b/src/pgrx_tests/create_table_from.rs new file mode 100644 index 0000000..c8d6c81 --- /dev/null +++ b/src/pgrx_tests/create_table_from.rs @@ -0,0 +1,469 @@ +#[pgrx::pg_schema] +mod tests { + use std::{str::FromStr, vec}; + + use pgrx::{ + composite_type, + datum::{Date, Time, Timestamp, TimestampWithTimeZone}, + pg_sys::{ + Oid, BOOLARRAYOID, BOOLOID, BYTEAARRAYOID, BYTEAOID, DATEARRAYOID, DATEOID, + FLOAT4ARRAYOID, FLOAT4OID, FLOAT8ARRAYOID, FLOAT8OID, INT2ARRAYOID, INT2OID, + INT4ARRAYOID, INT4OID, INT8ARRAYOID, INT8OID, JSONARRAYOID, JSONOID, NUMERICARRAYOID, + NUMERICOID, OIDARRAYOID, OIDOID, TEXTARRAYOID, TEXTOID, TIMEARRAYOID, TIMEOID, + TIMESTAMPARRAYOID, TIMESTAMPOID, TIMESTAMPTZARRAYOID, TIMESTAMPTZOID, + }, + pg_test, AnyNumeric, Json, Spi, + }; + + use crate::{ + pgrx_tests::common::{copy_to_helper, ensure_table_attribute_type, LOCAL_TEST_FILE_PATH}, + pgrx_utils::array_typoid, + type_compat::pg_arrow_type_conversions::{ + make_numeric_typmod, DEFAULT_UNBOUNDED_NUMERIC_PRECISION, + DEFAULT_UNBOUNDED_NUMERIC_SCALE, + }, + }; + + #[pg_test] + fn test_create_table_definition_from() { + copy_to_helper(LOCAL_TEST_FILE_PATH); + + let create_table = + format!("CREATE TABLE test_table () WITH (definition_from = '{LOCAL_TEST_FILE_PATH}')"); + Spi::run(&create_table).unwrap(); + + ensure_table_schema(); + } + + #[pg_test] + fn test_create_table_load_from() { + copy_to_helper(LOCAL_TEST_FILE_PATH); + + let create_table = + format!("CREATE TABLE test_table () WITH (load_from = '{LOCAL_TEST_FILE_PATH}')"); + Spi::run(&create_table).unwrap(); + + ensure_table_schema(); + + let query = "SELECT * FROM test_table"; + let result = Spi::connect(|client| { + let mut results = Vec::new(); + let tup_table = client.select(query, None, &[]).unwrap(); + + for row in tup_table { + let a = row["a"].value::().unwrap(); + let a_array = row["a_array"].value::>>().unwrap(); + let b = row["b"].value::().unwrap(); + let b_array = row["b_array"].value::>>().unwrap(); + let c = row["c"].value::().unwrap(); + let c_array = row["c_array"].value::>>().unwrap(); + let d = row["d"].value::().unwrap(); + let d_array = row["d_array"].value::>>().unwrap(); + let e = row["e"].value::().unwrap(); + let e_array = row["e_array"].value::>>().unwrap(); + let f = row["f"].value::().unwrap(); + let f_array = row["f_array"].value::>>().unwrap(); + let f_with_typmod = row["f_with_typmod"].value::().unwrap(); + let f_with_typmod_array = row["f_with_typmod_array"] + .value::>>() + .unwrap(); + let g = row["g"].value::().unwrap(); + let g_array = row["g_array"].value::>>().unwrap(); + let h = row["h"].value::().unwrap(); + let h_array = row["h_array"].value::>>().unwrap(); + let i = row["i"].value::().unwrap(); + let i_array = row["i_array"].value::>>().unwrap(); + let j = row["j"].value::().unwrap(); + let j_array = row["j_array"] + .value::>>() + .unwrap(); + let k = row["k"].value::