-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathschema.rs
136 lines (119 loc) · 4.26 KB
/
schema.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::catalog::TableProviderFactory;
use datafusion::common::Constraints;
use datafusion::common::ToDFSchema;
use datafusion::logical_expr::CreateExternalTable;
use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use std::collections::HashMap;
use std::sync::Arc;
use crate::postgres::common;
use crate::postgres::PostgresTableProviderFactory;
use datafusion_table_providers::postgres::PostgresTableFactory;
use datafusion_table_providers::sql::db_connection_pool::postgrespool::PostgresConnectionPool;
use datafusion_table_providers::util::secrets::to_secret_map;
const COMPLEX_TABLE_SQL: &str = include_str!("scripts/complex_table.sql");
fn get_schema() -> SchemaRef {
let fields = vec![
Field::new("id", DataType::Int32, false),
Field::new("large_id", DataType::Int64, true),
Field::new("name", DataType::Utf8, true),
Field::new("age", DataType::Int16, true),
Field::new("height", DataType::Float64, true),
Field::new("is_active", DataType::Boolean, true),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
Field::new("data", DataType::Binary, true),
Field::new(
"tags",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
];
Arc::new(Schema::new(fields))
}
#[tokio::test]
async fn test_postgres_schema_inference() {
let port = crate::get_random_port();
let container = common::start_postgres_docker_container(port)
.await
.expect("Postgres container to start");
let factory = PostgresTableProviderFactory::new();
let ctx = SessionContext::new();
let table_name = "test_table";
let schema = get_schema();
let cmd = CreateExternalTable {
schema: schema.to_dfschema_ref().expect("to df schema"),
name: table_name.into(),
location: "".to_string(),
file_type: "".to_string(),
table_partition_cols: vec![],
if_not_exists: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: common::get_pg_params(port),
constraints: Constraints::empty(),
column_defaults: HashMap::new(),
temporary: false,
};
let _ = factory
.create(&ctx.state(), &cmd)
.await
.expect("table provider created");
let postgres_pool = Arc::new(
PostgresConnectionPool::new(to_secret_map(common::get_pg_params(port)))
.await
.expect("unable to create Postgres connection pool"),
);
let table_factory = PostgresTableFactory::new(postgres_pool);
let table_provider = table_factory
.table_provider(TableReference::bare(table_name))
.await
.expect("to create table provider");
assert_eq!(table_provider.schema(), get_schema());
// Tear down
container
.remove()
.await
.expect("to stop postgres container");
}
#[tokio::test]
async fn test_postgres_schema_inference_complex_types() {
let port = crate::get_random_port();
let container = common::start_postgres_docker_container(port)
.await
.expect("Postgres container to start");
let table_name = "example_table";
let postgres_pool = Arc::new(
PostgresConnectionPool::new(to_secret_map(common::get_pg_params(port)))
.await
.expect("unable to create Postgres connection pool"),
);
let pg_conn = postgres_pool
.connect_direct()
.await
.expect("to connect to postgres");
for cmd in COMPLEX_TABLE_SQL.split(";") {
pg_conn
.conn
.execute(cmd, &[])
.await
.expect("to create table");
}
let table_factory = PostgresTableFactory::new(postgres_pool);
let table_provider = table_factory
.table_provider(TableReference::bare(table_name))
.await
.expect("to create table provider");
let pretty_schema = format!("{:#?}", table_provider.schema());
insta::assert_snapshot!(pretty_schema);
// Tear down
container
.remove()
.await
.expect("to stop postgres container");
}