From 9552038331fb2f9b442fba04e9cb85bdcd55e80c Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sun, 14 Sep 2025 19:44:53 +0800 Subject: [PATCH 1/2] test: datagrip startup queries --- datafusion-postgres/tests/datagrip.rs | 92 +++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 datafusion-postgres/tests/datagrip.rs diff --git a/datafusion-postgres/tests/datagrip.rs b/datafusion-postgres/tests/datagrip.rs new file mode 100644 index 0000000..de75701 --- /dev/null +++ b/datafusion-postgres/tests/datagrip.rs @@ -0,0 +1,92 @@ +mod common; + +use common::*; +use pgwire::api::query::SimpleQueryHandler; + +const DATAGRIP_QUERIES: &[&str] = &[ + "SET extra_float_digits = 3", + "SET application_name = ''", + "select version()", + "SET application_name = 'DataGrip 2025.2.3'", + "select current_database() as a, current_schemas(false) as b", + "SHOW TRANSACTION ISOLATION LEVEL", + "select round(extract(epoch from pg_postmaster_start_time() at time zone 'UTC')) as startup_time", + "select L.transactionid::varchar::bigint as transaction_id + from pg_catalog.pg_locks L + where L.transactionid is not null + order by pg_catalog.age(L.transactionid) desc + limit 1", + "select case + when pg_catalog.pg_is_in_recovery() + then null + else + (pg_catalog.txid_current() % 4294967296)::varchar::bigint + end as current_txid", + r#"select N.oid::bigint as id, + datname as name, + D.description, + datistemplate as is_template, + datallowconn as allow_connections, + pg_catalog.pg_get_userbyid(N.datdba) as "owner" + from pg_catalog.pg_database N + left join pg_catalog.pg_shdescription D on N.oid = D.objoid + order by case when datname = pg_catalog.current_database() then -1::bigint else N.oid::bigint end"#, + r#"select N.oid::bigint as id, + N.xmin as state_number, + nspname as name, + D.description, + pg_catalog.pg_get_userbyid(N.nspowner) as "owner" + from pg_catalog.pg_namespace N + left join pg_catalog.pg_description D on N.oid = D.objoid + order by case when nspname = pg_catalog.current_schema() then -1::bigint else N.oid::bigint end"#, + r#"SELECT typinput='pg_catalog.array_in'::regproc as is_array, typtype, typname, pg_type.oid FROM pg_catalog.pg_type LEFT JOIN (select ns.oid as nspoid, ns.nspname, r.r from pg_namespace as ns join ( select s.r, (current_schemas(false))[s.r] as nspname from generate_series(1, array_upper(current_schemas(false), 1)) as s(r) ) as r using ( nspname ) ) as sp ON sp.nspoid = typnamespace WHERE pg_type.oid = '28' ORDER BY sp.r, pg_type.oid DESC"#, + r#"show DateStyle"#, + r#"select name, is_dst from pg_catalog.pg_timezone_names + union distinct + select abbrev as name, is_dst from pg_catalog.pg_timezone_abbrevs"#, + r#"select R.oid::bigint as role_id, rolname as role_name, + rolsuper is_super, rolinherit is_inherit, + rolcreaterole can_createrole, rolcreatedb can_createdb, + rolcanlogin can_login, rolreplication /* false */ is_replication, + rolconnlimit conn_limit, rolvaliduntil valid_until, + rolbypassrls /* false */ bypass_rls, rolconfig config, + D.description + from pg_catalog.pg_roles R + left join pg_catalog.pg_shdescription D on D.objoid = R.oid"#, + r#"select member id, roleid role_id, admin_option + from pg_catalog.pg_auth_members order by id, roleid::text"#, + r#"select T.oid::bigint as id, T.spcname as name, + T.xmin as state_number, pg_catalog.pg_get_userbyid(T.spcowner) as owner, + pg_catalog.pg_tablespace_location(T.oid) /* null */ as location, + T.spcoptions /* null */ as options, + D.description as comment + from pg_catalog.pg_tablespace T + left join pg_catalog.pg_shdescription D on D.objoid = T.oid + -- where pg_catalog.age(T.xmin) <= #TXAGE"#, + r#"select T.oid as object_id, + T.spcacl as acl + from pg_catalog.pg_tablespace T + union all + select T.oid as object_id, + T.datacl as acl + from pg_catalog.pg_database T"#, + r#" SELECT typinput='pg_catalog.array_in'::regproc as is_array, typtype, typname, pg_type.oid FROM pg_catalog.pg_type LEFT JOIN (select ns.oid as nspoid, ns.nspname, r.r from pg_namespace as ns join ( select s.r, (current_schemas(false))[s.r] as nspname from generate_series(1, array_upper(current_schemas(false), 1)) as s(r) ) as r using ( nspname ) ) as sp ON sp.nspoid = typnamespace WHERE pg_type.oid = '1034' ORDER BY sp.r, pg_type.oid DESC"#, + r#"SELECT e.typdelim FROM pg_catalog.pg_type t, pg_catalog.pg_type e WHERE t.oid = '1034' and t.typelem = e.oid"#, + r#" SELECT e.oid, n.nspname = ANY(current_schemas(true)), n.nspname, e.typname FROM pg_catalog.pg_type t JOIN pg_catalog.pg_type e ON t.typelem = e.oid JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid WHERE t.oid = '1034'"#, + r#"SELECT typinput='pg_catalog.array_in'::regproc as is_array, typtype, typname, pg_type.oid FROM pg_catalog.pg_type LEFT JOIN (select ns.oid as nspoid, ns.nspname, r.r from pg_namespace as ns join ( select s.r, (current_schemas(false))[s.r] as nspname from generate_series(1, array_upper(current_schemas(false), 1)) as s(r) ) as r using ( nspname ) ) as sp ON sp.nspoid = typnamespace WHERE pg_type.oid = '1033' ORDER BY sp.r, pg_type.oid DESC"#, +]; + +#[tokio::test] +pub async fn test_datagrip_startup_sql() { + env_logger::init(); + let service = setup_handlers(); + let mut client = MockClient::new(); + + for query in DATAGRIP_QUERIES { + SimpleQueryHandler::do_query(&service, &mut client, query) + .await + .unwrap_or_else(|e| { + panic!("failed to run sql:\n-----------------\n {query}\n-----------------\n {e}") + }); + } +} From 6055b840b6a45648a374d02582b605667a75ef02 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 22 May 2026 07:54:17 +0800 Subject: [PATCH 2/2] feat: adding more datafusion pg-catalog tables --- datafusion-pg-catalog/src/pg_catalog.rs | 145 +++++++++++++++--- .../src/pg_catalog/pg_database.rs | 28 +++- .../src/pg_catalog/pg_locks.rs | 33 ++++ .../src/pg_catalog/pg_namespace.rs | 19 ++- .../src/pg_catalog/pg_stat_ssl.rs | 70 +++++++++ .../src/pg_catalog/pg_tablespace.rs | 66 ++++++++ .../src/pg_catalog/pg_timezone.rs | 24 +++ datafusion-postgres/tests/datagrip.rs | 6 +- 8 files changed, 351 insertions(+), 40 deletions(-) create mode 100644 datafusion-pg-catalog/src/pg_catalog/pg_locks.rs create mode 100644 datafusion-pg-catalog/src/pg_catalog/pg_stat_ssl.rs create mode 100644 datafusion-pg-catalog/src/pg_catalog/pg_tablespace.rs create mode 100644 datafusion-pg-catalog/src/pg_catalog/pg_timezone.rs diff --git a/datafusion-pg-catalog/src/pg_catalog.rs b/datafusion-pg-catalog/src/pg_catalog.rs index 6749f30..7f00df3 100644 --- a/datafusion-pg-catalog/src/pg_catalog.rs +++ b/datafusion-pg-catalog/src/pg_catalog.rs @@ -4,8 +4,8 @@ use std::sync::atomic::AtomicU32; use async_trait::async_trait; use datafusion::arrow::array::{ - ArrayRef, AsArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray, StringBuilder, - as_boolean_array, + ArrayRef, AsArray, BooleanArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray, + StringBuilder, as_boolean_array, }; use datafusion::arrow::datatypes::{DataType, Field, Int32Type, SchemaRef}; use datafusion::arrow::ipc::reader::FileReader; @@ -35,12 +35,16 @@ pub mod pg_attribute; pub mod pg_class; pub mod pg_database; pub mod pg_get_expr_udf; +pub mod pg_locks; pub mod pg_namespace; pub mod pg_replication_slot; pub mod pg_roles; pub mod pg_settings; pub mod pg_stat_gssapi; +pub mod pg_stat_ssl; pub mod pg_tables; +pub mod pg_tablespace; +pub mod pg_timezone; pub mod pg_views; pub mod quote_ident_udf; @@ -103,6 +107,9 @@ const PG_CATALOG_TABLE_PG_STATISTIC_EXT_DATA: &str = "pg_statistic_ext_data"; const PG_CATALOG_TABLE_PG_SUBSCRIPTION: &str = "pg_subscription"; const PG_CATALOG_TABLE_PG_SUBSCRIPTION_REL: &str = "pg_subscription_rel"; const PG_CATALOG_TABLE_PG_TABLESPACE: &str = "pg_tablespace"; +const PG_CATALOG_TABLE_PG_LOCKS: &str = "pg_locks"; +const PG_CATALOG_VIEW_PG_TIMEZONE_NAMES: &str = "pg_timezone_names"; +const PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS: &str = "pg_timezone_abbrevs"; const PG_CATALOG_TABLE_PG_TRIGGER: &str = "pg_trigger"; const PG_CATALOG_TABLE_PG_USER_MAPPING: &str = "pg_user_mapping"; const PG_CATALOG_VIEW_PG_SETTINGS: &str = "pg_settings"; @@ -111,6 +118,7 @@ const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews"; const PG_CATALOG_VIEW_PG_ROLES: &str = "pg_roles"; const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables"; const PG_CATALOG_VIEW_PG_STAT_GSSAPI: &str = "pg_stat_gssapi"; +const PG_CATALOG_VIEW_PG_STAT_SSL: &str = "pg_stat_ssl"; const PG_CATALOG_VIEW_PG_STAT_USER_TABLES: &str = "pg_stat_user_tables"; const PG_CATALOG_VIEW_PG_REPLICATION_SLOTS: &str = "pg_replication_slots"; @@ -174,16 +182,20 @@ pub const PG_CATALOG_TABLES: &[&str] = &[ PG_CATALOG_TABLE_PG_SUBSCRIPTION, PG_CATALOG_TABLE_PG_SUBSCRIPTION_REL, PG_CATALOG_TABLE_PG_TABLESPACE, + PG_CATALOG_TABLE_PG_LOCKS, PG_CATALOG_TABLE_PG_TRIGGER, PG_CATALOG_TABLE_PG_USER_MAPPING, PG_CATALOG_VIEW_PG_ROLES, PG_CATALOG_VIEW_PG_SETTINGS, PG_CATALOG_VIEW_PG_STAT_GSSAPI, + PG_CATALOG_VIEW_PG_STAT_SSL, PG_CATALOG_VIEW_PG_TABLES, PG_CATALOG_VIEW_PG_VIEWS, PG_CATALOG_VIEW_PG_MATVIEWS, PG_CATALOG_VIEW_PG_STAT_USER_TABLES, PG_CATALOG_VIEW_PG_REPLICATION_SLOTS, + PG_CATALOG_VIEW_PG_TIMEZONE_NAMES, + PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS, ]; #[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)] @@ -364,7 +376,8 @@ impl PgCatalogSchemaProvider Ok(Some(self.static_tables.pg_subscription_rel.clone().into())) } PG_CATALOG_TABLE_PG_TABLESPACE => { - Ok(Some(self.static_tables.pg_tablespace.clone().into())) + let table = Arc::new(pg_tablespace::PgTablespaceTable::new()); + Ok(Some(PgCatalogTable::Dynamic(table))) } PG_CATALOG_TABLE_PG_TRIGGER => Ok(Some(self.static_tables.pg_trigger.clone().into())), PG_CATALOG_TABLE_PG_USER_MAPPING => { @@ -416,6 +429,10 @@ impl PgCatalogSchemaProvider let table = Arc::new(pg_stat_gssapi::PgStatGssApiTable::new()); Ok(Some(PgCatalogTable::Dynamic(table))) } + PG_CATALOG_VIEW_PG_STAT_SSL => { + let table = Arc::new(pg_stat_ssl::PgStatSslTable::new()); + Ok(Some(PgCatalogTable::Dynamic(table))) + } PG_CATALOG_VIEW_PG_ROLES => { let table = Arc::new(pg_roles::PgRolesTable::new(self.context_provider.clone())); Ok(Some(PgCatalogTable::Dynamic(table))) @@ -427,6 +444,11 @@ impl PgCatalogSchemaProvider PG_CATALOG_VIEW_PG_REPLICATION_SLOTS => { Ok(Some(pg_replication_slot::pg_replication_slots().into())) } + PG_CATALOG_TABLE_PG_LOCKS => Ok(Some(pg_locks::pg_locks().into())), + PG_CATALOG_VIEW_PG_TIMEZONE_NAMES => Ok(Some(pg_timezone::pg_timezone_names().into())), + PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS => { + Ok(Some(pg_timezone::pg_timezone_abbrevs().into())) + } _ => Ok(None), } @@ -588,7 +610,6 @@ pub struct PgCatalogStaticTables { pub pg_statistic_ext_data: Arc, pub pg_subscription: Arc, pub pg_subscription_rel: Arc, - pub pg_tablespace: Arc, pub pg_trigger: Arc, pub pg_user_mapping: Arc, @@ -977,13 +998,6 @@ impl PgCatalogStaticTables { )) .to_vec(), )?, - pg_tablespace: Self::create_arrow_table( - include_bytes!(concat!( - env!("CARGO_MANIFEST_DIR"), - "/pg_catalog_arrow_exports/pg_tablespace.feather" - )) - .to_vec(), - )?, pg_trigger: Self::create_arrow_table( include_bytes!(concat!( env!("CARGO_MANIFEST_DIR"), @@ -1426,6 +1440,102 @@ pub fn create_pg_get_partition_ancestors_udf() -> ScalarUDF { ) } +pub fn create_pg_postmaster_start_time_udf() -> ScalarUDF { + let func = move |_args: &[ColumnarValue]| { + let mut builder = datafusion::arrow::array::TimestampNanosecondBuilder::new() + .with_data_type(DataType::Timestamp( + datafusion::arrow::datatypes::TimeUnit::Nanosecond, + Some(Arc::from("UTC")), + )); + builder.append_value(1_700_000_000_000_000_000); + let array: ArrayRef = Arc::new(builder.finish()); + Ok(ColumnarValue::Array(array)) + }; + + create_udf( + "pg_postmaster_start_time", + vec![], + DataType::Timestamp( + datafusion::arrow::datatypes::TimeUnit::Nanosecond, + Some(Arc::from("UTC")), + ), + Volatility::Stable, + Arc::new(func), + ) +} + +pub fn create_pg_is_in_recovery_udf() -> ScalarUDF { + let func = move |_args: &[ColumnarValue]| { + let array: ArrayRef = Arc::new(BooleanArray::from(vec![false])); + Ok(ColumnarValue::Array(array)) + }; + + create_udf( + "pg_is_in_recovery", + vec![], + DataType::Boolean, + Volatility::Stable, + Arc::new(func), + ) +} + +pub fn create_txid_current_udf() -> ScalarUDF { + let func = move |_args: &[ColumnarValue]| { + let array: ArrayRef = Arc::new(datafusion::arrow::array::Int64Array::from(vec![1i64])); + Ok(ColumnarValue::Array(array)) + }; + + create_udf( + "txid_current", + vec![], + DataType::Int64, + Volatility::Stable, + Arc::new(func), + ) +} + +pub fn create_pg_tablespace_location_udf() -> ScalarUDF { + let func = move |args: &[ColumnarValue]| { + let args = ColumnarValue::values_to_arrays(args)?; + let input = &args[0]; + let mut builder = StringBuilder::new(); + for _ in 0..input.len() { + builder.append_null(); + } + let array: ArrayRef = Arc::new(builder.finish()); + Ok(ColumnarValue::Array(array)) + }; + + create_udf( + "pg_tablespace_location", + vec![DataType::Int32], + DataType::Utf8, + Volatility::Stable, + Arc::new(func), + ) +} + +pub fn create_age_udf() -> ScalarUDF { + let func = move |args: &[ColumnarValue]| { + let args = ColumnarValue::values_to_arrays(args)?; + let input = &args[0]; + let mut builder = datafusion::arrow::array::Int64Builder::new(); + for _ in 0..input.len() { + builder.append_value(0); + } + let array: ArrayRef = Arc::new(builder.finish()); + Ok(ColumnarValue::Array(array)) + }; + + create_udf( + "age", + vec![DataType::Utf8], + DataType::Int64, + Volatility::Stable, + Arc::new(func), + ) +} + /// Install pg_catalog and postgres UDFs to current `SessionContext` pub fn setup_pg_catalog

( session_context: &SessionContext, @@ -1484,6 +1594,11 @@ where session_context.register_udf(create_pg_get_partition_ancestors_udf()); session_context.register_udf(quote_ident_udf::create_quote_ident_udf()); session_context.register_udf(quote_ident_udf::create_parse_ident_udf()); + session_context.register_udf(create_pg_postmaster_start_time_udf()); + session_context.register_udf(create_pg_is_in_recovery_udf()); + session_context.register_udf(create_txid_current_udf()); + session_context.register_udf(create_pg_tablespace_location_udf()); + session_context.register_udf(create_age_udf()); Ok(()) } @@ -1942,14 +2057,6 @@ mod test { .to_vec(), ) .expect("Failed to load ipc data"); - let _ = ArrowTable::from_ipc_data( - include_bytes!(concat!( - env!("CARGO_MANIFEST_DIR"), - "/pg_catalog_arrow_exports/pg_tablespace.feather" - )) - .to_vec(), - ) - .expect("Failed to load ipc data"); let _ = ArrowTable::from_ipc_data( include_bytes!(concat!( env!("CARGO_MANIFEST_DIR"), diff --git a/datafusion-pg-catalog/src/pg_catalog/pg_database.rs b/datafusion-pg-catalog/src/pg_catalog/pg_database.rs index 98e8643..8be98fd 100644 --- a/datafusion-pg-catalog/src/pg_catalog/pg_database.rs +++ b/datafusion-pg-catalog/src/pg_catalog/pg_database.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; use datafusion::arrow::array::{ - ArrayRef, BooleanArray, Int32Array, ListArray, RecordBatch, StringArray, + ArrayRef, BooleanArray, Int32Array, ListBuilder, RecordBatch, StringArray, StringBuilder, }; -use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::error::Result; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -52,9 +52,9 @@ impl PgDatabaseTable { Field::new("daticurules", DataType::Utf8, true), Field::new( "datacl", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), true, - ), // Access privileges + ), ])); Self { @@ -84,7 +84,7 @@ impl PgDatabaseTable { let mut dattablespaces = Vec::new(); let mut daticulocales: Vec> = Vec::new(); let mut daticurules: Vec> = Vec::new(); - let mut datacls: Vec>>> = Vec::new(); + let mut datacls: Vec>>> = Vec::new(); // to store all schema-oid mapping temporarily before adding to global oid cache let mut catalog_oid_cache = HashMap::new(); @@ -169,9 +169,21 @@ impl PgDatabaseTable { Arc::new(Int32Array::from(dattablespaces)), Arc::new(StringArray::from(daticulocales)), Arc::new(StringArray::from(daticurules)), - Arc::new(ListArray::from_iter_primitive::( - datacls.into_iter(), - )), + Arc::new({ + let mut builder = ListBuilder::new(StringBuilder::new()); + for acl in &datacls { + match acl { + Some(items) => { + for item in items { + builder.values().append_option(item.as_deref()); + } + builder.append(true); + } + None => builder.append(false), + } + } + builder.finish() + }), ]; // Create a full record batch diff --git a/datafusion-pg-catalog/src/pg_catalog/pg_locks.rs b/datafusion-pg-catalog/src/pg_catalog/pg_locks.rs new file mode 100644 index 0000000..1ef7f6d --- /dev/null +++ b/datafusion-pg-catalog/src/pg_catalog/pg_locks.rs @@ -0,0 +1,33 @@ +use crate::pg_catalog::empty_table::EmptyTable; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use std::sync::Arc; + +pub(crate) fn pg_locks() -> EmptyTable { + let schema = Arc::new(Schema::new(vec![ + Field::new("locktype", DataType::Utf8, true), + Field::new("database", DataType::Int32, true), + Field::new("relation", DataType::Int32, true), + Field::new("page", DataType::Int32, true), + Field::new("tuple", DataType::Int16, true), + Field::new("virtualxid", DataType::Utf8, true), + Field::new("transactionid", DataType::Utf8, true), + Field::new("classid", DataType::Int32, true), + Field::new("objid", DataType::Int32, true), + Field::new("objsubid", DataType::Int16, true), + Field::new("virtualtransaction", DataType::Utf8, true), + Field::new("pid", DataType::Int32, true), + Field::new("mode", DataType::Utf8, true), + Field::new("granted", DataType::Boolean, true), + Field::new("fastpath", DataType::Boolean, true), + Field::new( + "waitstart", + DataType::Timestamp( + datafusion::arrow::datatypes::TimeUnit::Microsecond, + Some(Arc::from("UTC")), + ), + true, + ), + ])); + + EmptyTable::new(schema) +} diff --git a/datafusion-pg-catalog/src/pg_catalog/pg_namespace.rs b/datafusion-pg-catalog/src/pg_catalog/pg_namespace.rs index 923ca9b..05d5d71 100644 --- a/datafusion-pg-catalog/src/pg_catalog/pg_namespace.rs +++ b/datafusion-pg-catalog/src/pg_catalog/pg_namespace.rs @@ -29,14 +29,13 @@ impl PgNamespaceTable { oid_counter: Arc, oid_cache: Arc>>, ) -> Self { - // Define the schema for pg_namespace - // This matches the columns from PostgreSQL's pg_namespace let schema = Arc::new(Schema::new(vec![ - Field::new("oid", DataType::Int32, false), // Object identifier - Field::new("nspname", DataType::Utf8, false), // Name of the namespace (schema) - Field::new("nspowner", DataType::Int32, false), // Owner of the namespace - Field::new("nspacl", DataType::Utf8, true), // Access privileges - Field::new("options", DataType::Utf8, true), // Schema-level options + Field::new("oid", DataType::Int32, false), + Field::new("xmin", DataType::Int32, true), + Field::new("nspname", DataType::Utf8, false), + Field::new("nspowner", DataType::Int32, false), + Field::new("nspacl", DataType::Utf8, true), + Field::new("options", DataType::Utf8, true), ])); Self { @@ -47,10 +46,9 @@ impl PgNamespaceTable { } } - /// Generate record batches based on the current state of the catalog async fn get_data(this: Self) -> Result { - // Vectors to store column data let mut oids = Vec::new(); + let mut xmins = Vec::new(); let mut nspnames = Vec::new(); let mut nspowners = Vec::new(); let mut nspacls: Vec> = Vec::new(); @@ -74,6 +72,7 @@ impl PgNamespaceTable { schema_oid_cache.insert(cache_key, schema_oid); oids.push(schema_oid as i32); + xmins.push(Some(1i32)); nspnames.push(schema_name.clone()); nspowners.push(10); // Default owner nspacls.push(None); @@ -92,9 +91,9 @@ impl PgNamespaceTable { // add new schema cache oid_cache.extend(schema_oid_cache); - // Create Arrow arrays from the collected data let arrays: Vec = vec![ Arc::new(Int32Array::from(oids)), + Arc::new(Int32Array::from(xmins)), Arc::new(StringArray::from(nspnames)), Arc::new(Int32Array::from(nspowners)), Arc::new(StringArray::from_iter(nspacls.into_iter())), diff --git a/datafusion-pg-catalog/src/pg_catalog/pg_stat_ssl.rs b/datafusion-pg-catalog/src/pg_catalog/pg_stat_ssl.rs new file mode 100644 index 0000000..550888e --- /dev/null +++ b/datafusion-pg-catalog/src/pg_catalog/pg_stat_ssl.rs @@ -0,0 +1,70 @@ +use datafusion::arrow::array::{ArrayRef, BooleanArray, Int32Array, RecordBatch}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::error::Result; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream; +use std::sync::Arc; + +use crate::pg_catalog::BACKEND_PID; + +#[derive(Debug, Clone)] +pub(crate) struct PgStatSslTable { + schema: SchemaRef, +} + +impl PgStatSslTable { + pub(crate) fn new() -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("pid", DataType::Int32, true), + Field::new("ssl", DataType::Boolean, false), + Field::new("version", DataType::Utf8, true), + Field::new("cipher", DataType::Utf8, true), + Field::new("bits", DataType::Int32, true), + Field::new("client_dn", DataType::Utf8, true), + Field::new("client_serial", DataType::Utf8, true), + Field::new("issuer_dn", DataType::Utf8, true), + ])); + + Self { schema } + } + + async fn get_data(this: Self) -> Result { + let pid = vec![BACKEND_PID]; + let ssl = vec![false]; + let version: Vec> = vec![None]; + let cipher: Vec> = vec![None]; + let bits: Vec> = vec![None]; + let client_dn: Vec> = vec![None]; + let client_serial: Vec> = vec![None]; + let issuer_dn: Vec> = vec![None]; + + let arrays: Vec = vec![ + Arc::new(Int32Array::from(pid)), + Arc::new(BooleanArray::from(ssl)), + Arc::new(datafusion::arrow::array::StringArray::from(version)), + Arc::new(datafusion::arrow::array::StringArray::from(cipher)), + Arc::new(Int32Array::from(bits)), + Arc::new(datafusion::arrow::array::StringArray::from(client_dn)), + Arc::new(datafusion::arrow::array::StringArray::from(client_serial)), + Arc::new(datafusion::arrow::array::StringArray::from(issuer_dn)), + ]; + + let batch = RecordBatch::try_new(this.schema.clone(), arrays)?; + Ok(batch) + } +} + +impl PartitionStream for PgStatSslTable { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + let this = self.clone(); + Box::pin(RecordBatchStreamAdapter::new( + this.schema.clone(), + futures::stream::once(async move { PgStatSslTable::get_data(this).await }), + )) + } +} diff --git a/datafusion-pg-catalog/src/pg_catalog/pg_tablespace.rs b/datafusion-pg-catalog/src/pg_catalog/pg_tablespace.rs new file mode 100644 index 0000000..9787d21 --- /dev/null +++ b/datafusion-pg-catalog/src/pg_catalog/pg_tablespace.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; + +use datafusion::arrow::array::{ + ArrayRef, Int32Array, ListBuilder, RecordBatch, StringArray, StringBuilder, +}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::error::Result; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream; + +#[derive(Debug, Clone)] +pub(crate) struct PgTablespaceTable { + schema: SchemaRef, +} + +impl PgTablespaceTable { + pub(crate) fn new() -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("oid", DataType::Int32, false), + Field::new("xmin", DataType::Int32, true), + Field::new("spcname", DataType::Utf8, false), + Field::new("spcowner", DataType::Int32, false), + Field::new( + "spcacl", + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + true, + ), + Field::new("spcoptions", DataType::Utf8, true), + ])); + + Self { schema } + } + + fn get_data(schema: SchemaRef) -> Result { + let arrays: Vec = vec![ + Arc::new(Int32Array::from(vec![1663, 1664])), + Arc::new(Int32Array::from(vec![Some(1), Some(1)])), + Arc::new(StringArray::from(vec!["pg_default", "pg_global"])), + Arc::new(Int32Array::from(vec![10, 10])), + Arc::new({ + let mut builder = ListBuilder::new(StringBuilder::new()); + builder.append(false); + builder.append(false); + builder.finish() + }), + Arc::new(StringArray::from(vec![None::, None::])), + ]; + + Ok(RecordBatch::try_new(schema, arrays)?) + } +} + +impl PartitionStream for PgTablespaceTable { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + let schema = self.schema.clone(); + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + futures::stream::once(async move { Self::get_data(schema) }), + )) + } +} diff --git a/datafusion-pg-catalog/src/pg_catalog/pg_timezone.rs b/datafusion-pg-catalog/src/pg_catalog/pg_timezone.rs new file mode 100644 index 0000000..3620faa --- /dev/null +++ b/datafusion-pg-catalog/src/pg_catalog/pg_timezone.rs @@ -0,0 +1,24 @@ +use crate::pg_catalog::empty_table::EmptyTable; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use std::sync::Arc; + +pub(crate) fn pg_timezone_names() -> EmptyTable { + let schema = Arc::new(Schema::new(vec![ + Field::new("name", DataType::Utf8, true), + Field::new("abbrev", DataType::Utf8, true), + Field::new("utc_offset", DataType::Utf8, true), + Field::new("is_dst", DataType::Boolean, true), + ])); + + EmptyTable::new(schema) +} + +pub(crate) fn pg_timezone_abbrevs() -> EmptyTable { + let schema = Arc::new(Schema::new(vec![ + Field::new("abbrev", DataType::Utf8, true), + Field::new("utc_offset", DataType::Utf8, true), + Field::new("is_dst", DataType::Boolean, true), + ])); + + EmptyTable::new(schema) +} diff --git a/datafusion-postgres/tests/datagrip.rs b/datafusion-postgres/tests/datagrip.rs index de75701..b7afce0 100644 --- a/datafusion-postgres/tests/datagrip.rs +++ b/datafusion-postgres/tests/datagrip.rs @@ -1,8 +1,7 @@ -mod common; - -use common::*; use pgwire::api::query::SimpleQueryHandler; +use datafusion_postgres::testing::*; + const DATAGRIP_QUERIES: &[&str] = &[ "SET extra_float_digits = 3", "SET application_name = ''", @@ -74,6 +73,7 @@ const DATAGRIP_QUERIES: &[&str] = &[ r#"SELECT e.typdelim FROM pg_catalog.pg_type t, pg_catalog.pg_type e WHERE t.oid = '1034' and t.typelem = e.oid"#, r#" SELECT e.oid, n.nspname = ANY(current_schemas(true)), n.nspname, e.typname FROM pg_catalog.pg_type t JOIN pg_catalog.pg_type e ON t.typelem = e.oid JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid WHERE t.oid = '1034'"#, r#"SELECT typinput='pg_catalog.array_in'::regproc as is_array, typtype, typname, pg_type.oid FROM pg_catalog.pg_type LEFT JOIN (select ns.oid as nspoid, ns.nspname, r.r from pg_namespace as ns join ( select s.r, (current_schemas(false))[s.r] as nspname from generate_series(1, array_upper(current_schemas(false), 1)) as s(r) ) as r using ( nspname ) ) as sp ON sp.nspoid = typnamespace WHERE pg_type.oid = '1033' ORDER BY sp.r, pg_type.oid DESC"#, + r#"select ssl from pg_stat_ssl where pid = pg_backend_pid()"#, ]; #[tokio::test]