Skip to content

Commit 1ccaf09

Browse files
authored
ClickHouse: Add python module (#418)
* ClickHouse: Update python bindings * Update comment * Add clickhouse to default feature
1 parent 40aba79 commit 1ccaf09

8 files changed

Lines changed: 360 additions & 8 deletions

File tree

README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ let ctx = SessionContext::with_state(state);
2323
- PostgreSQL
2424
- MySQL
2525
- SQLite
26+
- ClickHouse
2627
- DuckDB
2728
- Flight SQL
2829
- ODBC
@@ -86,6 +87,59 @@ cargo run -p datafusion-table-providers --example postgres --features postgres
8687

8788
```
8889

90+
### ClickHouse
91+
92+
In order to run the Clickhouse example, you need to have a Clickhouse server running. You can use the following command to start a Clickhouse server in a Docker container the example can use:
93+
94+
```bash
95+
docker run --name clickhouse \
96+
-e CLICKHOUSE_DB=default \
97+
-e CLICKHOUSE_USER=admin \
98+
-e CLICKHOUSE_PASSWORD=secret \
99+
-p 8123:8123 \
100+
-p 9000:9000 \
101+
-d clickhouse/clickhouse-server:24.8-alpine
102+
103+
# 2. Wait for readiness
104+
echo "Waiting for ClickHouse to start..."
105+
until curl -s http://localhost:8123/ping | grep -q 'Ok'; do
106+
sleep 2
107+
done
108+
echo
109+
110+
# 3. Create tables and a parameterized view
111+
docker exec -i clickhouse clickhouse-client \
112+
--user=admin --password=secret --multiquery <<EOF
113+
CREATE TABLE IF NOT EXISTS Reports (
114+
id UInt32,
115+
name String
116+
) ENGINE = Memory;
117+
118+
INSERT INTO Reports VALUES (1, 'Monthly Report'), (2, 'Quarterly Report');
119+
120+
CREATE TABLE IF NOT EXISTS Users (
121+
workspace_uid String,
122+
id UInt32,
123+
username String
124+
) ENGINE = Memory;
125+
126+
INSERT INTO Users VALUES
127+
('abc', 1, 'alice'),
128+
('abc', 2, 'bob'),
129+
('xyz', 3, 'charlie');
130+
131+
CREATE OR REPLACE VIEW Users AS
132+
SELECT workspace_uid, id, username
133+
FROM Users
134+
WHERE workspace_uid = {workspace_uid:String};
135+
EOF
136+
```
137+
138+
```bash
139+
# Run from repo folder
140+
cargo run -p datafusion-table-providers --example clickhouse --features clickhouse
141+
```
142+
89143
### MySQL
90144

91145
In order to run the MySQL example, you need to have a MySQL server running. You can use the following command to start a MySQL server in a Docker container the example can use:

core/src/sql/db_connection_pool/dbconnection/clickhouseconn.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl AsyncDbConnection<Client, ()> for Client {
4949
}
5050

5151
let tables: Vec<Row> = self
52-
.query("SHOW TABLES FROM ?")
52+
.query("SELECT name FROM system.tables WHERE database = ?")
5353
.bind(schema)
5454
.fetch_all()
5555
.await
@@ -65,7 +65,7 @@ impl AsyncDbConnection<Client, ()> for Client {
6565
name: String,
6666
}
6767
let tables: Vec<Row> = self
68-
.query("SHOW DATABASES")
68+
.query("SELECT name FROM system.databases WHERE name NOT IN ('system', 'information_schema', 'INFORMATION_SCHEMA')")
6969
.fetch_all()
7070
.await
7171
.boxed()

python/Cargo.toml

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,30 @@ doc = false
1515

1616
[dependencies]
1717
arrow = { workspace = true }
18-
arrow-flight = {workspace = true, optional = true}
18+
arrow-flight = { workspace = true, optional = true }
1919
datafusion = { workspace = true, features = ["pyarrow"] }
2020
datafusion-ffi = { workspace = true }
2121
datafusion-table-providers = { workspace = true }
2222
pyo3 = { version = "0.24.2" }
23-
tokio = { version = "1.46", features = ["macros", "rt", "rt-multi-thread", "sync"] }
24-
duckdb = { workspace = true, optional = true}
23+
tokio = { version = "1.46", features = [
24+
"macros",
25+
"rt",
26+
"rt-multi-thread",
27+
"sync",
28+
] }
29+
duckdb = { workspace = true, optional = true }
2530

2631
[features]
27-
default = ["duckdb", "sqlite", "mysql", "postgres", "odbc", "flight"]
32+
default = [
33+
"duckdb",
34+
"clickhouse",
35+
"sqlite",
36+
"mysql",
37+
"postgres",
38+
"odbc",
39+
"flight",
40+
]
41+
clickhouse = ["datafusion-table-providers/clickhouse-federation"]
2842
duckdb = ["dep:duckdb", "datafusion-table-providers/duckdb-federation"]
2943
sqlite = ["datafusion-table-providers/sqlite-federation"]
3044
mysql = ["datafusion-table-providers/mysql-federation"]

python/examples/clickhouse_demo.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from datafusion import SessionContext
2+
from datafusion_table_providers import clickhouse
3+
4+
ctx = SessionContext()
5+
connection_param = {
6+
"url": "http://localhost:8123",
7+
"database": "default",
8+
"user": "user",
9+
"password": "secret"
10+
}
11+
pool = clickhouse.ClickHouseTableFactory(connection_param)
12+
tables = pool.tables()
13+
14+
for t in tables:
15+
ctx.register_table_provider(t, pool.get_table(t))
16+
print("Checking table:", t)
17+
ctx.table(t).show()
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Python interface for Clickhouse table provider."""
18+
19+
from typing import Any, List
20+
from . import _internal
21+
22+
23+
class ClickHouseTableFactory:
24+
"""ClickHouse table factory."""
25+
26+
def __init__(self, params: dict) -> None:
27+
"""Create a ClickHouse table factory."""
28+
self._raw = _internal.clickhouse.RawClickHouseTableFactory(params)
29+
30+
def tables(self) -> List[str]:
31+
"""Get all the table names."""
32+
return self._raw.tables()
33+
34+
def get_table(self, table_reference: str, args=None) -> Any:
35+
"""Return the table provider for table named `table_reference`.
36+
37+
Args:
38+
table_reference (str): table name
39+
args: optional list of parameter tuples (name, value)
40+
"""
41+
return self._raw.get_table(table_reference, args)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import subprocess
2+
import time
3+
from datafusion import SessionContext
4+
from datafusion_table_providers import clickhouse # hypothetical provider
5+
6+
def run_docker_container():
7+
subprocess.run(
8+
["docker", "run", "--name", "clickhouse",
9+
"-e", "CLICKHOUSE_USER=user",
10+
"-e", "CLICKHOUSE_PASSWORD=secret",
11+
"-p", "8123:8123",
12+
"-d", "clickhouse/clickhouse-server:latest"],
13+
check=True,
14+
)
15+
time.sleep(20)
16+
17+
def create_schema():
18+
sql = r"""
19+
CREATE TABLE companies (
20+
id UInt32,
21+
name String,
22+
founded Date,
23+
revenue Decimal(18,2),
24+
is_active Bool,
25+
tags Array(String)
26+
) ENGINE = MergeTree()
27+
ORDER BY id;
28+
29+
INSERT INTO companies VALUES
30+
(1, 'Acme Corporation', '1999-03-12', 12500000.50, 1, ['manufacturing', 'global']),
31+
(2, 'Widget Inc.', '2005-07-01', 4500000.00, 1, ['gadgets', 'innovation']),
32+
(3, 'Gizmo Corp.', '2010-11-23', 780000.75, 0, ['hardware']),
33+
(4, 'Tech Solutions', '2018-01-15', 220000.00, 1, ['consulting','it']),
34+
(5, 'Data Innovations', '2021-05-10', 98000.99, 1, ['analytics','startup']);
35+
36+
CREATE VIEW companies_param_view AS
37+
SELECT id, name, founded, revenue, is_active, tags
38+
FROM companies
39+
WHERE name = {name:String};
40+
"""
41+
subprocess.run(
42+
["docker", "exec", "-i", "clickhouse", "clickhouse-client", "--multiquery", "--query", sql],
43+
check=True
44+
)
45+
46+
def stop_container():
47+
subprocess.run(["docker", "stop", "clickhouse"], check=True)
48+
subprocess.run(["docker", "rm", "clickhouse"], check=True)
49+
50+
class TestClickHouseParameterized:
51+
@classmethod
52+
def setup_class(cls):
53+
run_docker_container()
54+
create_schema()
55+
cls.ctx = SessionContext()
56+
connection_param = {
57+
"url": "http://localhost:8123",
58+
"database": "default",
59+
"user": "user",
60+
"password": "secret"
61+
}
62+
cls.pool = clickhouse.ClickHouseTableFactory(connection_param)
63+
64+
@classmethod
65+
def teardown_class(cls):
66+
stop_container()
67+
68+
def test_get_tables(self):
69+
tables = self.pool.tables()
70+
assert "companies" in tables
71+
assert "companies_param_view" in tables
72+
73+
def test_parameterized_view(self):
74+
# Register provider so DF can access the view\
75+
76+
self.ctx.register_table_provider(
77+
"companies_param_view",
78+
self.pool.get_table("companies_param_view", [("name", "Gizmo Corp.")])
79+
)
80+
81+
# Query using parameter
82+
df = self.ctx.sql(
83+
"SELECT id, name, revenue FROM companies_param_view"
84+
)
85+
rows = df.collect()[0]
86+
87+
assert len(rows["name"]) == 1
88+
assert rows["name"][0].as_py() == "Gizmo Corp."
89+
assert rows["revenue"][0].as_py() == 780000.75

python/src/clickhouse.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::sync::Arc;
2+
3+
use datafusion_table_providers::{
4+
clickhouse::{Arg, ClickHouseTableFactory},
5+
sql::db_connection_pool::{clickhousepool::ClickHouseConnectionPool, DbConnectionPool},
6+
util::secrets::to_secret_map,
7+
};
8+
use pyo3::{
9+
exceptions::PyTypeError,
10+
prelude::*,
11+
types::{PyDict, PyList},
12+
};
13+
14+
use crate::{
15+
utils::{pydict_to_hashmap, to_pyerr, wait_for_future},
16+
RawTableProvider,
17+
};
18+
19+
#[pyclass(module = "datafusion_table_providers._internal.clickhouse")]
20+
struct RawClickHouseTableFactory {
21+
pool: Arc<ClickHouseConnectionPool>,
22+
factory: ClickHouseTableFactory,
23+
}
24+
25+
#[pymethods]
26+
impl RawClickHouseTableFactory {
27+
#[new]
28+
#[pyo3(signature = (params))]
29+
pub fn new(py: Python, params: &Bound<'_, PyDict>) -> PyResult<Self> {
30+
let params = to_secret_map(pydict_to_hashmap(params)?);
31+
let pool =
32+
Arc::new(wait_for_future(py, ClickHouseConnectionPool::new(params)).map_err(to_pyerr)?);
33+
34+
Ok(Self {
35+
factory: ClickHouseTableFactory::new(Arc::clone(&pool)),
36+
pool,
37+
})
38+
}
39+
40+
pub fn tables(&self, py: Python) -> PyResult<Vec<String>> {
41+
wait_for_future(py, async {
42+
let conn = self.pool.connect().await.map_err(to_pyerr)?;
43+
let conn_async = conn.as_async().ok_or(to_pyerr(
44+
"Unable to create connection to Postgres db".to_string(),
45+
))?;
46+
let schemas = conn_async.schemas().await.map_err(to_pyerr)?;
47+
48+
let mut tables = Vec::default();
49+
for schema in schemas {
50+
let schema_tables = conn_async.tables(&schema).await.map_err(to_pyerr)?;
51+
tables.extend(schema_tables);
52+
}
53+
54+
Ok(tables)
55+
})
56+
}
57+
58+
#[pyo3(signature = (table_reference, args=None))]
59+
pub fn get_table(
60+
&self,
61+
py: Python,
62+
table_reference: &str,
63+
args: Option<Py<PyAny>>,
64+
) -> PyResult<RawTableProvider> {
65+
let args_vec = if let Some(args) = args {
66+
let seq = args.downcast_bound::<PyList>(py).map_err(|_| {
67+
PyTypeError::new_err("Argument must be list of int (signed/unsigned) or string")
68+
})?;
69+
70+
let arr: Result<Vec<_>, _> = seq
71+
.iter()
72+
.map(|val| {
73+
val.extract::<(String, u64)>()
74+
.map(|x| (x.0, Arg::Unsigned(x.1)))
75+
.or_else(|_| {
76+
val.extract::<(String, i64)>()
77+
.map(|x| (x.0, Arg::Signed(x.1)))
78+
})
79+
.or_else(|_| {
80+
val.extract::<(String, String)>()
81+
.map(|x| (x.0, Arg::String(x.1)))
82+
})
83+
})
84+
.collect();
85+
86+
let arr = arr.map_err(|_| {
87+
PyTypeError::new_err("Argument must be list of int (signed/unsigned) or string")
88+
})?;
89+
90+
Some(arr)
91+
} else {
92+
None
93+
};
94+
95+
let table = wait_for_future(
96+
py,
97+
self.factory
98+
.table_provider(table_reference.into(), args_vec),
99+
)
100+
.map_err(to_pyerr)?;
101+
102+
Ok(RawTableProvider {
103+
table,
104+
supports_pushdown_filters: true,
105+
})
106+
}
107+
}
108+
109+
pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
110+
m.add_class::<RawClickHouseTableFactory>()?;
111+
112+
Ok(())
113+
}

0 commit comments

Comments
 (0)