-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathpg_namespace.rs
More file actions
122 lines (105 loc) · 4.38 KB
/
pg_namespace.rs
File metadata and controls
122 lines (105 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream;
use postgres_types::Oid;
use tokio::sync::RwLock;
use crate::pg_catalog::catalog_info::CatalogInfo;
use super::OidCacheKey;
#[derive(Debug, Clone)]
pub(crate) struct PgNamespaceTable<C> {
schema: SchemaRef,
catalog_list: C,
oid_counter: Arc<AtomicU32>,
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
}
impl<C: CatalogInfo> PgNamespaceTable<C> {
pub(crate) fn new(
catalog_list: C,
oid_counter: Arc<AtomicU32>,
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("oid", DataType::Int32, false),
Field::new("xmin", DataType::Int32, true),
Field::new("nspname", DataType::Utf8, false),
Field::new("nspowner", DataType::Int32, false),
Field::new("nspacl", DataType::Utf8, true),
Field::new("options", DataType::Utf8, true),
]));
Self {
schema,
catalog_list,
oid_counter,
oid_cache,
}
}
async fn get_data(this: Self) -> Result<RecordBatch> {
let mut oids = Vec::new();
let mut xmins = Vec::new();
let mut nspnames = Vec::new();
let mut nspowners = Vec::new();
let mut nspacls: Vec<Option<String>> = Vec::new();
let mut options: Vec<Option<String>> = Vec::new();
// to store all schema-oid mapping temporarily before adding to global oid cache
let mut schema_oid_cache = HashMap::new();
let mut oid_cache = this.oid_cache.write().await;
// Now add all schemas from DataFusion catalogs
for catalog_name in this.catalog_list.catalog_names().await? {
if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name).await? {
for schema_name in schema_names {
let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone());
let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) {
*oid
} else {
this.oid_counter.fetch_add(1, Ordering::Relaxed)
};
schema_oid_cache.insert(cache_key, schema_oid);
oids.push(schema_oid as i32);
xmins.push(Some(1i32));
nspnames.push(schema_name.clone());
nspowners.push(10); // Default owner
nspacls.push(None);
options.push(None);
}
}
}
// remove all schema cache and table of the schema which is no longer exists
oid_cache.retain(|key, _| match key {
OidCacheKey::Catalog(..) => true,
OidCacheKey::Schema(..) => false,
OidCacheKey::Table(catalog, schema_name, _) => schema_oid_cache
.contains_key(&OidCacheKey::Schema(catalog.clone(), schema_name.clone())),
});
// add new schema cache
oid_cache.extend(schema_oid_cache);
let arrays: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(oids)),
Arc::new(Int32Array::from(xmins)),
Arc::new(StringArray::from(nspnames)),
Arc::new(Int32Array::from(nspowners)),
Arc::new(StringArray::from_iter(nspacls.into_iter())),
Arc::new(StringArray::from_iter(options.into_iter())),
];
// Create a full record batch
let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
Ok(batch)
}
}
impl<C: CatalogInfo> PartitionStream for PgNamespaceTable<C> {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let this = self.clone();
Box::pin(RecordBatchStreamAdapter::new(
this.schema.clone(),
futures::stream::once(async move { Self::get_data(this).await }),
))
}
}