-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathsqlite.rs
77 lines (69 loc) · 2.72 KB
/
sqlite.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use datafusion::{prelude::SessionContext, sql::TableReference};
use datafusion_table_providers::{
common::DatabaseCatalogProvider,
sql::db_connection_pool::{sqlitepool::SqliteConnectionPoolFactory, Mode},
sqlite::SqliteTableFactory,
};
use std::sync::Arc;
use std::time::Duration;
/// This example demonstrates how to:
/// 1. Create a SQLite connection pool
/// 2. Create and use SqliteTableFactory to generate TableProvider
/// 3. Register TableProvider with DataFusion
/// 4. Use SQL queries to access SQLite table data
#[tokio::main]
async fn main() {
// Create SQLite connection pool
// - arg1: SQLite database file path
// - arg2: Database mode (file mode)
// - arg3: Connection timeout duration
let sqlite_pool = Arc::new(
SqliteConnectionPoolFactory::new(
"examples/sqlite_example.db",
Mode::File,
Duration::default(),
)
.build()
.await
.expect("unable to create Sqlite connection pool"),
);
// Create SQLite table provider factory
// Used to generate TableProvider instances that can read SQLite table data
let table_factory = SqliteTableFactory::new(sqlite_pool.clone());
// Create database catalog provider
// This allows us to access tables through catalog structure (catalog.schema.table)
let catalog_provider = DatabaseCatalogProvider::try_new(sqlite_pool).await.unwrap();
// Create DataFusion session context
let ctx = SessionContext::new();
// Register SQLite catalog, making it accessible via the "sqlite" name
ctx.register_catalog("sqlite", Arc::new(catalog_provider));
// Demonstrate direct table provider registration
// This method registers the table in the default catalog
// Here we register the SQLite "companies" table as "companies_v2"
ctx.register_table(
"companies_v2",
table_factory
.table_provider(TableReference::bare("companies"))
.await
.expect("failed to register table provider"),
)
.expect("failed to register table");
// Query Example 1: Query the renamed table through default catalog
let df = ctx
.sql("SELECT * FROM datafusion.public.companies_v2")
.await
.expect("select failed");
df.show().await.expect("show failed");
// Query Example 2: Query the original table through SQLite catalog
let df = ctx
.sql("SELECT * FROM sqlite.main.companies")
.await
.expect("select failed");
df.show().await.expect("show failed");
// Query Example 3: Query the projects table in SQLite
let df = ctx
.sql("SELECT * FROM sqlite.main.projects")
.await
.expect("select failed");
df.show().await.expect("show failed");
}