Skip to content

Commit 6055b84

Browse files
committed
feat: adding more datafusion pg-catalog tables
1 parent 9552038 commit 6055b84

8 files changed

Lines changed: 351 additions & 40 deletions

File tree

datafusion-pg-catalog/src/pg_catalog.rs

Lines changed: 126 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use std::sync::atomic::AtomicU32;
44

55
use async_trait::async_trait;
66
use datafusion::arrow::array::{
7-
ArrayRef, AsArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray, StringBuilder,
8-
as_boolean_array,
7+
ArrayRef, AsArray, BooleanArray, BooleanBuilder, Int32Builder, RecordBatch, StringArray,
8+
StringBuilder, as_boolean_array,
99
};
1010
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
1111
use datafusion::arrow::ipc::reader::FileReader;
@@ -35,12 +35,16 @@ pub mod pg_attribute;
3535
pub mod pg_class;
3636
pub mod pg_database;
3737
pub mod pg_get_expr_udf;
38+
pub mod pg_locks;
3839
pub mod pg_namespace;
3940
pub mod pg_replication_slot;
4041
pub mod pg_roles;
4142
pub mod pg_settings;
4243
pub mod pg_stat_gssapi;
44+
pub mod pg_stat_ssl;
4345
pub mod pg_tables;
46+
pub mod pg_tablespace;
47+
pub mod pg_timezone;
4448
pub mod pg_views;
4549
pub mod quote_ident_udf;
4650

@@ -103,6 +107,9 @@ const PG_CATALOG_TABLE_PG_STATISTIC_EXT_DATA: &str = "pg_statistic_ext_data";
103107
const PG_CATALOG_TABLE_PG_SUBSCRIPTION: &str = "pg_subscription";
104108
const PG_CATALOG_TABLE_PG_SUBSCRIPTION_REL: &str = "pg_subscription_rel";
105109
const PG_CATALOG_TABLE_PG_TABLESPACE: &str = "pg_tablespace";
110+
const PG_CATALOG_TABLE_PG_LOCKS: &str = "pg_locks";
111+
const PG_CATALOG_VIEW_PG_TIMEZONE_NAMES: &str = "pg_timezone_names";
112+
const PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS: &str = "pg_timezone_abbrevs";
106113
const PG_CATALOG_TABLE_PG_TRIGGER: &str = "pg_trigger";
107114
const PG_CATALOG_TABLE_PG_USER_MAPPING: &str = "pg_user_mapping";
108115
const PG_CATALOG_VIEW_PG_SETTINGS: &str = "pg_settings";
@@ -111,6 +118,7 @@ const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews";
111118
const PG_CATALOG_VIEW_PG_ROLES: &str = "pg_roles";
112119
const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables";
113120
const PG_CATALOG_VIEW_PG_STAT_GSSAPI: &str = "pg_stat_gssapi";
121+
const PG_CATALOG_VIEW_PG_STAT_SSL: &str = "pg_stat_ssl";
114122
const PG_CATALOG_VIEW_PG_STAT_USER_TABLES: &str = "pg_stat_user_tables";
115123
const PG_CATALOG_VIEW_PG_REPLICATION_SLOTS: &str = "pg_replication_slots";
116124

@@ -174,16 +182,20 @@ pub const PG_CATALOG_TABLES: &[&str] = &[
174182
PG_CATALOG_TABLE_PG_SUBSCRIPTION,
175183
PG_CATALOG_TABLE_PG_SUBSCRIPTION_REL,
176184
PG_CATALOG_TABLE_PG_TABLESPACE,
185+
PG_CATALOG_TABLE_PG_LOCKS,
177186
PG_CATALOG_TABLE_PG_TRIGGER,
178187
PG_CATALOG_TABLE_PG_USER_MAPPING,
179188
PG_CATALOG_VIEW_PG_ROLES,
180189
PG_CATALOG_VIEW_PG_SETTINGS,
181190
PG_CATALOG_VIEW_PG_STAT_GSSAPI,
191+
PG_CATALOG_VIEW_PG_STAT_SSL,
182192
PG_CATALOG_VIEW_PG_TABLES,
183193
PG_CATALOG_VIEW_PG_VIEWS,
184194
PG_CATALOG_VIEW_PG_MATVIEWS,
185195
PG_CATALOG_VIEW_PG_STAT_USER_TABLES,
186196
PG_CATALOG_VIEW_PG_REPLICATION_SLOTS,
197+
PG_CATALOG_VIEW_PG_TIMEZONE_NAMES,
198+
PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS,
187199
];
188200

189201
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
@@ -364,7 +376,8 @@ impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P>
364376
Ok(Some(self.static_tables.pg_subscription_rel.clone().into()))
365377
}
366378
PG_CATALOG_TABLE_PG_TABLESPACE => {
367-
Ok(Some(self.static_tables.pg_tablespace.clone().into()))
379+
let table = Arc::new(pg_tablespace::PgTablespaceTable::new());
380+
Ok(Some(PgCatalogTable::Dynamic(table)))
368381
}
369382
PG_CATALOG_TABLE_PG_TRIGGER => Ok(Some(self.static_tables.pg_trigger.clone().into())),
370383
PG_CATALOG_TABLE_PG_USER_MAPPING => {
@@ -416,6 +429,10 @@ impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P>
416429
let table = Arc::new(pg_stat_gssapi::PgStatGssApiTable::new());
417430
Ok(Some(PgCatalogTable::Dynamic(table)))
418431
}
432+
PG_CATALOG_VIEW_PG_STAT_SSL => {
433+
let table = Arc::new(pg_stat_ssl::PgStatSslTable::new());
434+
Ok(Some(PgCatalogTable::Dynamic(table)))
435+
}
419436
PG_CATALOG_VIEW_PG_ROLES => {
420437
let table = Arc::new(pg_roles::PgRolesTable::new(self.context_provider.clone()));
421438
Ok(Some(PgCatalogTable::Dynamic(table)))
@@ -427,6 +444,11 @@ impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P>
427444
PG_CATALOG_VIEW_PG_REPLICATION_SLOTS => {
428445
Ok(Some(pg_replication_slot::pg_replication_slots().into()))
429446
}
447+
PG_CATALOG_TABLE_PG_LOCKS => Ok(Some(pg_locks::pg_locks().into())),
448+
PG_CATALOG_VIEW_PG_TIMEZONE_NAMES => Ok(Some(pg_timezone::pg_timezone_names().into())),
449+
PG_CATALOG_VIEW_PG_TIMEZONE_ABBREVS => {
450+
Ok(Some(pg_timezone::pg_timezone_abbrevs().into()))
451+
}
430452

431453
_ => Ok(None),
432454
}
@@ -588,7 +610,6 @@ pub struct PgCatalogStaticTables {
588610
pub pg_statistic_ext_data: Arc<ArrowTable>,
589611
pub pg_subscription: Arc<ArrowTable>,
590612
pub pg_subscription_rel: Arc<ArrowTable>,
591-
pub pg_tablespace: Arc<ArrowTable>,
592613
pub pg_trigger: Arc<ArrowTable>,
593614
pub pg_user_mapping: Arc<ArrowTable>,
594615

@@ -977,13 +998,6 @@ impl PgCatalogStaticTables {
977998
))
978999
.to_vec(),
9791000
)?,
980-
pg_tablespace: Self::create_arrow_table(
981-
include_bytes!(concat!(
982-
env!("CARGO_MANIFEST_DIR"),
983-
"/pg_catalog_arrow_exports/pg_tablespace.feather"
984-
))
985-
.to_vec(),
986-
)?,
9871001
pg_trigger: Self::create_arrow_table(
9881002
include_bytes!(concat!(
9891003
env!("CARGO_MANIFEST_DIR"),
@@ -1426,6 +1440,102 @@ pub fn create_pg_get_partition_ancestors_udf() -> ScalarUDF {
14261440
)
14271441
}
14281442

1443+
pub fn create_pg_postmaster_start_time_udf() -> ScalarUDF {
1444+
let func = move |_args: &[ColumnarValue]| {
1445+
let mut builder = datafusion::arrow::array::TimestampNanosecondBuilder::new()
1446+
.with_data_type(DataType::Timestamp(
1447+
datafusion::arrow::datatypes::TimeUnit::Nanosecond,
1448+
Some(Arc::from("UTC")),
1449+
));
1450+
builder.append_value(1_700_000_000_000_000_000);
1451+
let array: ArrayRef = Arc::new(builder.finish());
1452+
Ok(ColumnarValue::Array(array))
1453+
};
1454+
1455+
create_udf(
1456+
"pg_postmaster_start_time",
1457+
vec![],
1458+
DataType::Timestamp(
1459+
datafusion::arrow::datatypes::TimeUnit::Nanosecond,
1460+
Some(Arc::from("UTC")),
1461+
),
1462+
Volatility::Stable,
1463+
Arc::new(func),
1464+
)
1465+
}
1466+
1467+
pub fn create_pg_is_in_recovery_udf() -> ScalarUDF {
1468+
let func = move |_args: &[ColumnarValue]| {
1469+
let array: ArrayRef = Arc::new(BooleanArray::from(vec![false]));
1470+
Ok(ColumnarValue::Array(array))
1471+
};
1472+
1473+
create_udf(
1474+
"pg_is_in_recovery",
1475+
vec![],
1476+
DataType::Boolean,
1477+
Volatility::Stable,
1478+
Arc::new(func),
1479+
)
1480+
}
1481+
1482+
pub fn create_txid_current_udf() -> ScalarUDF {
1483+
let func = move |_args: &[ColumnarValue]| {
1484+
let array: ArrayRef = Arc::new(datafusion::arrow::array::Int64Array::from(vec![1i64]));
1485+
Ok(ColumnarValue::Array(array))
1486+
};
1487+
1488+
create_udf(
1489+
"txid_current",
1490+
vec![],
1491+
DataType::Int64,
1492+
Volatility::Stable,
1493+
Arc::new(func),
1494+
)
1495+
}
1496+
1497+
pub fn create_pg_tablespace_location_udf() -> ScalarUDF {
1498+
let func = move |args: &[ColumnarValue]| {
1499+
let args = ColumnarValue::values_to_arrays(args)?;
1500+
let input = &args[0];
1501+
let mut builder = StringBuilder::new();
1502+
for _ in 0..input.len() {
1503+
builder.append_null();
1504+
}
1505+
let array: ArrayRef = Arc::new(builder.finish());
1506+
Ok(ColumnarValue::Array(array))
1507+
};
1508+
1509+
create_udf(
1510+
"pg_tablespace_location",
1511+
vec![DataType::Int32],
1512+
DataType::Utf8,
1513+
Volatility::Stable,
1514+
Arc::new(func),
1515+
)
1516+
}
1517+
1518+
pub fn create_age_udf() -> ScalarUDF {
1519+
let func = move |args: &[ColumnarValue]| {
1520+
let args = ColumnarValue::values_to_arrays(args)?;
1521+
let input = &args[0];
1522+
let mut builder = datafusion::arrow::array::Int64Builder::new();
1523+
for _ in 0..input.len() {
1524+
builder.append_value(0);
1525+
}
1526+
let array: ArrayRef = Arc::new(builder.finish());
1527+
Ok(ColumnarValue::Array(array))
1528+
};
1529+
1530+
create_udf(
1531+
"age",
1532+
vec![DataType::Utf8],
1533+
DataType::Int64,
1534+
Volatility::Stable,
1535+
Arc::new(func),
1536+
)
1537+
}
1538+
14291539
/// Install pg_catalog and postgres UDFs to current `SessionContext`
14301540
pub fn setup_pg_catalog<P>(
14311541
session_context: &SessionContext,
@@ -1484,6 +1594,11 @@ where
14841594
session_context.register_udf(create_pg_get_partition_ancestors_udf());
14851595
session_context.register_udf(quote_ident_udf::create_quote_ident_udf());
14861596
session_context.register_udf(quote_ident_udf::create_parse_ident_udf());
1597+
session_context.register_udf(create_pg_postmaster_start_time_udf());
1598+
session_context.register_udf(create_pg_is_in_recovery_udf());
1599+
session_context.register_udf(create_txid_current_udf());
1600+
session_context.register_udf(create_pg_tablespace_location_udf());
1601+
session_context.register_udf(create_age_udf());
14871602

14881603
Ok(())
14891604
}
@@ -1942,14 +2057,6 @@ mod test {
19422057
.to_vec(),
19432058
)
19442059
.expect("Failed to load ipc data");
1945-
let _ = ArrowTable::from_ipc_data(
1946-
include_bytes!(concat!(
1947-
env!("CARGO_MANIFEST_DIR"),
1948-
"/pg_catalog_arrow_exports/pg_tablespace.feather"
1949-
))
1950-
.to_vec(),
1951-
)
1952-
.expect("Failed to load ipc data");
19532060
let _ = ArrowTable::from_ipc_data(
19542061
include_bytes!(concat!(
19552062
env!("CARGO_MANIFEST_DIR"),

datafusion-pg-catalog/src/pg_catalog/pg_database.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use std::sync::Arc;
33
use std::sync::atomic::{AtomicU32, Ordering};
44

55
use datafusion::arrow::array::{
6-
ArrayRef, BooleanArray, Int32Array, ListArray, RecordBatch, StringArray,
6+
ArrayRef, BooleanArray, Int32Array, ListBuilder, RecordBatch, StringArray, StringBuilder,
77
};
8-
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
8+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
99
use datafusion::error::Result;
1010
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1111
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
@@ -52,9 +52,9 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
5252
Field::new("daticurules", DataType::Utf8, true),
5353
Field::new(
5454
"datacl",
55-
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
55+
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
5656
true,
57-
), // Access privileges
57+
),
5858
]));
5959

6060
Self {
@@ -84,7 +84,7 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
8484
let mut dattablespaces = Vec::new();
8585
let mut daticulocales: Vec<Option<String>> = Vec::new();
8686
let mut daticurules: Vec<Option<String>> = Vec::new();
87-
let mut datacls: Vec<Option<Vec<Option<i32>>>> = Vec::new();
87+
let mut datacls: Vec<Option<Vec<Option<String>>>> = Vec::new();
8888

8989
// to store all schema-oid mapping temporarily before adding to global oid cache
9090
let mut catalog_oid_cache = HashMap::new();
@@ -169,9 +169,21 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
169169
Arc::new(Int32Array::from(dattablespaces)),
170170
Arc::new(StringArray::from(daticulocales)),
171171
Arc::new(StringArray::from(daticurules)),
172-
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(
173-
datacls.into_iter(),
174-
)),
172+
Arc::new({
173+
let mut builder = ListBuilder::new(StringBuilder::new());
174+
for acl in &datacls {
175+
match acl {
176+
Some(items) => {
177+
for item in items {
178+
builder.values().append_option(item.as_deref());
179+
}
180+
builder.append(true);
181+
}
182+
None => builder.append(false),
183+
}
184+
}
185+
builder.finish()
186+
}),
175187
];
176188

177189
// Create a full record batch
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use crate::pg_catalog::empty_table::EmptyTable;
2+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
3+
use std::sync::Arc;
4+
5+
pub(crate) fn pg_locks() -> EmptyTable {
6+
let schema = Arc::new(Schema::new(vec![
7+
Field::new("locktype", DataType::Utf8, true),
8+
Field::new("database", DataType::Int32, true),
9+
Field::new("relation", DataType::Int32, true),
10+
Field::new("page", DataType::Int32, true),
11+
Field::new("tuple", DataType::Int16, true),
12+
Field::new("virtualxid", DataType::Utf8, true),
13+
Field::new("transactionid", DataType::Utf8, true),
14+
Field::new("classid", DataType::Int32, true),
15+
Field::new("objid", DataType::Int32, true),
16+
Field::new("objsubid", DataType::Int16, true),
17+
Field::new("virtualtransaction", DataType::Utf8, true),
18+
Field::new("pid", DataType::Int32, true),
19+
Field::new("mode", DataType::Utf8, true),
20+
Field::new("granted", DataType::Boolean, true),
21+
Field::new("fastpath", DataType::Boolean, true),
22+
Field::new(
23+
"waitstart",
24+
DataType::Timestamp(
25+
datafusion::arrow::datatypes::TimeUnit::Microsecond,
26+
Some(Arc::from("UTC")),
27+
),
28+
true,
29+
),
30+
]));
31+
32+
EmptyTable::new(schema)
33+
}

datafusion-pg-catalog/src/pg_catalog/pg_namespace.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,13 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
2929
oid_counter: Arc<AtomicU32>,
3030
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
3131
) -> Self {
32-
// Define the schema for pg_namespace
33-
// This matches the columns from PostgreSQL's pg_namespace
3432
let schema = Arc::new(Schema::new(vec![
35-
Field::new("oid", DataType::Int32, false), // Object identifier
36-
Field::new("nspname", DataType::Utf8, false), // Name of the namespace (schema)
37-
Field::new("nspowner", DataType::Int32, false), // Owner of the namespace
38-
Field::new("nspacl", DataType::Utf8, true), // Access privileges
39-
Field::new("options", DataType::Utf8, true), // Schema-level options
33+
Field::new("oid", DataType::Int32, false),
34+
Field::new("xmin", DataType::Int32, true),
35+
Field::new("nspname", DataType::Utf8, false),
36+
Field::new("nspowner", DataType::Int32, false),
37+
Field::new("nspacl", DataType::Utf8, true),
38+
Field::new("options", DataType::Utf8, true),
4039
]));
4140

4241
Self {
@@ -47,10 +46,9 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
4746
}
4847
}
4948

50-
/// Generate record batches based on the current state of the catalog
5149
async fn get_data(this: Self) -> Result<RecordBatch> {
52-
// Vectors to store column data
5350
let mut oids = Vec::new();
51+
let mut xmins = Vec::new();
5452
let mut nspnames = Vec::new();
5553
let mut nspowners = Vec::new();
5654
let mut nspacls: Vec<Option<String>> = Vec::new();
@@ -74,6 +72,7 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
7472
schema_oid_cache.insert(cache_key, schema_oid);
7573

7674
oids.push(schema_oid as i32);
75+
xmins.push(Some(1i32));
7776
nspnames.push(schema_name.clone());
7877
nspowners.push(10); // Default owner
7978
nspacls.push(None);
@@ -92,9 +91,9 @@ impl<C: CatalogInfo> PgNamespaceTable<C> {
9291
// add new schema cache
9392
oid_cache.extend(schema_oid_cache);
9493

95-
// Create Arrow arrays from the collected data
9694
let arrays: Vec<ArrayRef> = vec![
9795
Arc::new(Int32Array::from(oids)),
96+
Arc::new(Int32Array::from(xmins)),
9897
Arc::new(StringArray::from(nspnames)),
9998
Arc::new(Int32Array::from(nspowners)),
10099
Arc::new(StringArray::from_iter(nspacls.into_iter())),

0 commit comments

Comments
 (0)