Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The supported method of passing ClickHouse server settings is to prefix such arg

### Improvements
- Add support for QBit data type. Closes [#570](https://github.com/ClickHouse/clickhouse-connect/issues/570)
- Add the ability to create table from PyArrow objects. Addresses [#588](https://github.com/ClickHouse/clickhouse-connect/issues/588)

## 0.10.0, 2025-11-14

Expand Down
108 changes: 108 additions & 0 deletions clickhouse_connect/driver/ddl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import NamedTuple, Sequence

from clickhouse_connect.datatypes.base import ClickHouseType
from clickhouse_connect.driver.options import check_arrow


class TableColumnDef(NamedTuple):
Expand All @@ -26,3 +27,110 @@ def create_table(table_name: str, columns: Sequence[TableColumnDef], engine: str
for key, value in engine_params.items():
stmt += f' {key} {value}'
return stmt


def _arrow_type_to_ch(arrow_type: "pa.DataType") -> str:
"""
Best-effort mapping from common PyArrow types to ClickHouse type names.

Covers core scalar types. For anything unknown, we raise so the
caller is aware that the automatic mapping is not implemented for that Arrow type.
"""
pa = check_arrow()

pat = pa.types

# Signed ints
if pat.is_int8(arrow_type):
return 'Int8'
if pat.is_int16(arrow_type):
return 'Int16'
if pat.is_int32(arrow_type):
return 'Int32'
if pat.is_int64(arrow_type):
return 'Int64'

# Unsigned ints
if pat.is_uint8(arrow_type):
return 'UInt8'
if pat.is_uint16(arrow_type):
return 'UInt16'
if pat.is_uint32(arrow_type):
return 'UInt32'
if pat.is_uint64(arrow_type):
return 'UInt64'

# Floats
if pat.is_float16(arrow_type) or pat.is_float32(arrow_type):
return 'Float32'
if pat.is_float64(arrow_type):
return 'Float64'

# Boolean
if pat.is_boolean(arrow_type):
return 'Bool'

# Strings (this covers pa.string(), pa.large_string())
if pat.is_string(arrow_type) or pat.is_large_string(arrow_type):
return 'String'

# for any currently unsupported type, we raise so it’s clear that
# this Arrow type isn’t supported by the helper yet.
raise TypeError(f'Unsupported Arrow type for automatic mapping: {arrow_type!r}')


class _DDLType:
"""
Minimal helper used to satisfy TableColumnDef.ch_type.

create_table() only needs ch_type.name when building the DDL string,
so we'll wrap the ClickHouse type name in this tiny object instead of
constructing full ClickHouseType instances here.
"""
def __init__(self, name: str):
self.name = name


def arrow_schema_to_column_defs(schema: "pa.Schema") -> list[TableColumnDef]:
"""
Convert a PyArrow Schema into a list of TableColumnDef objects.

This helper uses an *optimistic non-null* strategy: it always produces
non-nullable ClickHouse types, even though Arrow fields are nullable by
default.

If the user later inserts an Arrow table that contains nulls, ClickHouse
will raise an error for that insert, and the user can adjust the DDL
(e.g. wrap specific types in Nullable(...)) to match their data.
"""
pa = check_arrow()

if not isinstance(schema, pa.Schema):
raise TypeError(f'Expected pyarrow.Schema, got {type(schema)!r}')

col_defs: list[TableColumnDef] = []
for field in schema:
ch_type_name = _arrow_type_to_ch(field.type)
col_defs.append(
TableColumnDef(
name=field.name,
ch_type=_DDLType(ch_type_name),
)
)
return col_defs


def create_table_from_arrow_schema(
table_name: str,
schema: "pa.Schema",
engine: str,
engine_params: dict,
) -> str:
"""
Helper function to build a CREATE TABLE statement from a PyArrow Schema.

Internally:
schema -> arrow_schema_to_column_defs -> create_table(...)
"""
col_defs = arrow_schema_to_column_defs(schema)
return create_table(table_name, col_defs, engine, engine_params)
105 changes: 105 additions & 0 deletions tests/integration_tests/test_pyarrow_ddl_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import pytest

from clickhouse_connect.driver import Client

pytest.importorskip("pyarrow")

import pyarrow as pa

from clickhouse_connect.driver.ddl import (
arrow_schema_to_column_defs,
create_table,
create_table_from_arrow_schema,
)


def test_arrow_create_table_and_insert(test_client: Client):
if not test_client.min_version("20"):
pytest.skip(
f"Not supported server version {test_client.server_version}"
)

table_name = "test_arrow_basic_integration"

test_client.command(f"DROP TABLE IF EXISTS {table_name}")

schema = pa.schema(
[
("id", pa.int64()),
("name", pa.string()),
("score", pa.float32()),
("flag", pa.bool_()),
]
)

ddl = create_table_from_arrow_schema(
table_name=table_name,
schema=schema,
engine="MergeTree",
engine_params={"ORDER BY": "id"},
)
test_client.command(ddl)

arrow_table = pa.table(
{
"id": [1, 2],
"name": ["a", "b"],
"score": [1.5, 2.5],
"flag": [True, False],
},
schema=schema,
)

test_client.insert_arrow(table=table_name, arrow_table=arrow_table)

result = test_client.query(
f"SELECT id, name, score, flag FROM {table_name} ORDER BY id"
)
assert result.result_rows == [
(1, "a", 1.5, True),
(2, "b", 2.5, False),
]

test_client.command(f"DROP TABLE IF EXISTS {table_name}")


def test_arrow_schema_to_column_defs(test_client: Client):
table_name = "test_arrow_manual_integration"

test_client.command(f"DROP TABLE IF EXISTS {table_name}")

schema = pa.schema(
[
("id", pa.int64()),
("name", pa.string()),
]
)

# check using the explicit helper path.
col_defs = arrow_schema_to_column_defs(schema)

ddl = create_table(
table_name=table_name,
columns=col_defs,
engine="MergeTree",
engine_params={"ORDER BY": "id"},
)
test_client.command(ddl)

arrow_table = pa.table(
{
"id": [10, 20],
"name": ["x", "y"],
},
schema=schema,
)

test_client.insert_arrow(table=table_name, arrow_table=arrow_table)

result = test_client.query(f"SELECT id, name FROM {table_name} ORDER BY id")
assert result.result_rows == [
(10, "x"),
(20, "y"),
]

test_client.command(f"DROP TABLE IF EXISTS {table_name}")
138 changes: 138 additions & 0 deletions tests/unit_tests/test_pyarrow_ddl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import pytest

pytest.importorskip("pyarrow")

import pyarrow as pa

from clickhouse_connect.driver.ddl import (
arrow_schema_to_column_defs,
create_table,
create_table_from_arrow_schema,
)


def test_arrow_schema_to_column_defs_basic_mappings():
schema = pa.schema(
[
("i8", pa.int8()),
("i16", pa.int16()),
("i32", pa.int32()),
("i64", pa.int64()),
("u8", pa.uint8()),
("u16", pa.uint16()),
("u32", pa.uint32()),
("u64", pa.uint64()),
("f16", pa.float16()),
("f32", pa.float32()),
("f64", pa.float64()),
("s", pa.string()),
("ls", pa.large_string()),
("b", pa.bool_()),
]
)

col_defs = arrow_schema_to_column_defs(schema)

assert [c.name for c in col_defs] == [
"i8",
"i16",
"i32",
"i64",
"u8",
"u16",
"u32",
"u64",
"f16",
"f32",
"f64",
"s",
"ls",
"b",
]

type_names = [c.ch_type.name for c in col_defs]

assert type_names == [
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
"Float32",
"Float32",
"Float64",
"String",
"String",
"Bool",
]


def test_arrow_schema_to_column_defs_unsupported_type_raises():
schema = pa.schema(
[
("ts", pa.timestamp("ms")),
]
)

with pytest.raises(TypeError, match="Unsupported Arrow type"):
arrow_schema_to_column_defs(schema)


def test_arrow_schema_to_column_defs_invalid_input_type():
with pytest.raises(TypeError, match="Expected pyarrow.Schema"):
arrow_schema_to_column_defs("not a schema")


def test_create_table_from_arrow_schema_builds_expected_ddl():
schema = pa.schema(
[
("id", pa.int64()),
("name", pa.string()),
("score", pa.float32()),
("flag", pa.bool_()),
]
)

ddl = create_table_from_arrow_schema(
table_name="arrow_basic_test",
schema=schema,
engine="MergeTree",
engine_params={"ORDER BY": "id"},
)

assert (
ddl
== "CREATE TABLE arrow_basic_test "
"(id Int64, name String, score Float32, flag Bool) "
"ENGINE MergeTree ORDER BY id"
)


def test_create_table_from_arrow_schema_matches_manual_create_table():
schema = pa.schema(
[
("id", pa.int64()),
("name", pa.string()),
]
)

col_defs = arrow_schema_to_column_defs(schema)

ddl_manual = create_table(
table_name="arrow_compare_test",
columns=col_defs,
engine="MergeTree",
engine_params={"ORDER BY": "id"},
)

ddl_wrapper = create_table_from_arrow_schema(
table_name="arrow_compare_test",
schema=schema,
engine="MergeTree",
engine_params={"ORDER BY": "id"},
)

assert ddl_manual == ddl_wrapper