-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathpostgres.rs
92 lines (84 loc) · 3.32 KB
/
postgres.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use datafusion_table_providers::{
common::DatabaseCatalogProvider, postgres::PostgresTableFactory,
sql::db_connection_pool::postgrespool::PostgresConnectionPool, util::secrets::to_secret_map,
};
use std::collections::HashMap;
use std::sync::Arc;
/// This example demonstrates how to:
/// 1. Create a PostgreSQL connection pool
/// 2. Create and use PostgresTableFactory to generate TableProvider
/// 3. Register TableProvider with DataFusion
/// 4. Use SQL queries to access PostgreSQL table data
///
/// Prerequisites:
/// Start a PostgreSQL server using Docker:
/// ```bash
/// docker run --name postgres -e POSTGRES_PASSWORD=password -e POSTGRES_DB=postgres_db -p 5432:5432 -d postgres:16-alpine
/// # Wait for the Postgres server to start
/// sleep 30
///
/// # Create a table and insert sample data
/// docker exec -i postgres psql -U postgres test_db <<EOF
/// CREATE TABLE companies (
/// id SERIAL PRIMARY KEY,
/// name VARCHAR(100)
/// );
///
/// INSERT INTO companies (name) VALUES ('Example Corp');
/// EOF
/// ```
#[tokio::main]
async fn main() {
// Create PostgreSQL connection parameters
let postgres_params = to_secret_map(HashMap::from([
("host".to_string(), "localhost".to_string()),
("user".to_string(), "postgres".to_string()),
("db".to_string(), "postgres_db".to_string()),
("pass".to_string(), "password".to_string()),
("port".to_string(), "5432".to_string()),
("sslmode".to_string(), "disable".to_string()),
]));
// Create PostgreSQL connection pool
let postgres_pool = Arc::new(
PostgresConnectionPool::new(postgres_params)
.await
.expect("unable to create PostgreSQL connection pool"),
);
// Create PostgreSQL table provider factory
// Used to generate TableProvider instances that can read PostgreSQL table data
let table_factory = PostgresTableFactory::new(postgres_pool.clone());
// Create database catalog provider
// This allows us to access tables through catalog structure (catalog.schema.table)
let catalog = DatabaseCatalogProvider::try_new(postgres_pool)
.await
.unwrap();
// Create DataFusion session context
let ctx = SessionContext::new();
// Register PostgreSQL catalog, making it accessible via the "postgres" name
ctx.register_catalog("postgres", Arc::new(catalog));
// Demonstrate direct table provider registration
// This method registers the table in the default catalog
// Here we register the PostgreSQL "companies" table as "companies_v2"
ctx.register_table(
"companies_v2",
table_factory
.table_provider(TableReference::bare("companies"))
.await
.expect("failed to register table provider"),
)
.expect("failed to register table");
// Query Example 1: Query the renamed table through default catalog
let df = ctx
.sql("SELECT * FROM datafusion.public.companies_v2")
.await
.expect("select failed");
df.show().await.expect("show failed");
// Query Example 2: Query the original table through PostgreSQL catalog
let df = ctx
.sql("SELECT * FROM postgres.public.companies")
.await
.expect("select failed");
df.show().await.expect("show failed");
}