Skip to content
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ instead of being passed as ClickHouse server settings. This is in conjunction wi
The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`.

## UNRELEASED

### Bug Fixes

### Improvements
- Add support for QBit data type. Closes [#570](https://github.com/ClickHouse/clickhouse-connect/issues/570)

## 0.10.0, 2025-11-14

Expand Down
17 changes: 17 additions & 0 deletions clickhouse_connect/cc_sqlalchemy/datatypes/sqltypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,20 @@ def __init__(self, *params, type_def: TypeDef = None):
values += (x,)
type_def = TypeDef(values=values)
super().__init__(type_def)


class QBit(ChSqlaType, UserDefinedType):
python_type = list

def __init__(self, element_type: str = None, dimension: int = None, type_def: TypeDef = None):
"""
QBit constructor for bit-transposed vector types
:param element_type: Element type (BFloat16, Float32, or Float64)
:param dimension: Number of elements in the vector
:param type_def: TypeDef from parse_name function (used during reflection)
"""
if not type_def:
if not element_type or not dimension:
raise ArgumentError("QBit requires element_type and dimension parameters")
type_def = TypeDef(values=(element_type, dimension))
super().__init__(type_def)
1 change: 1 addition & 0 deletions clickhouse_connect/datatypes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import clickhouse_connect.datatypes.string
import clickhouse_connect.datatypes.temporal
import clickhouse_connect.datatypes.geometric
import clickhouse_connect.datatypes.vector
import clickhouse_connect.datatypes.dynamic
import clickhouse_connect.datatypes.registry
import clickhouse_connect.datatypes.postinit
276 changes: 276 additions & 0 deletions clickhouse_connect/datatypes/vector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
import logging
from math import ceil, nan
from struct import pack, unpack
from typing import Any, Sequence

from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
from clickhouse_connect.datatypes.registry import get_from_name
from clickhouse_connect.driver.ctypes import data_conv
from clickhouse_connect.driver.insert import InsertContext
from clickhouse_connect.driver.options import np
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.types import ByteSource

logger = logging.getLogger(__name__)

if np is None:
logger.info("NumPy not detected. Install NumPy to see 10-30x performance gains with QBit columns.")


class QBit(ClickHouseType):
"""
QBit type - represents bit-transposed vectors for efficient vector search operations.
Syntax: QBit(element_type, dimension)
- element_type: BFloat16, Float32, or Float64
- dimension: Number of elements per vector
Over the Native protocol, ClickHouse transmits QBit columns as bit-transposed Tuples.
Requires:
- SET allow_experimental_qbit_type = 1
- Server version >=25.10
"""

__slots__ = (
"element_type",
"dimension",
"_bits_per_element",
"_bytes_per_fixedstring",
"_tuple_type",
)

python_type = list
_BIT_SHIFTS = [1 << i for i in range(8)]
_ELEMENT_BITS = {"BFloat16": 16, "Float32": 32, "Float64": 64}

def __init__(self, type_def: TypeDef):
super().__init__(type_def)

self.element_type = type_def.values[0]
if self.element_type not in self._ELEMENT_BITS:
raise ValueError(f"Unsupported QBit element type '{self.element_type}'. Supported types: BFloat16, Float32, Float64.")

self.dimension = type_def.values[1]
if self.dimension <= 0:
raise ValueError(f"QBit dimension must be greater than 0. Got: {self.dimension}.")

self._name_suffix = f"({self.element_type}, {self.dimension})"
self._bits_per_element = self._ELEMENT_BITS.get(self.element_type, 32)
self._bytes_per_fixedstring = ceil(self.dimension / 8)

# Create the underlying Tuple type for bit-transposed representation
# E.g., for Float32 with dim=8: Tuple(FixedString(1), FixedString(1), ... x32)
fixedstring_type = f"FixedString({self._bytes_per_fixedstring})"
tuple_types = ", ".join([fixedstring_type] * self._bits_per_element)
tuple_type_name = f"Tuple({tuple_types})"
self._tuple_type = get_from_name(tuple_type_name)
self.byte_size = self._bits_per_element * self._bytes_per_fixedstring

def read_column_prefix(self, source: ByteSource, ctx: QueryContext):
return self._tuple_type.read_column_prefix(source, ctx)

def read_column_data(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: Any) -> Sequence:
"""Read bit-transposed Tuple data and convert to flat float vectors."""
if num_rows == 0:
return []

null_map = None
if self.nullable:
null_map = source.read_bytes(num_rows)

tuple_data = self._tuple_type.read_column_data(source, num_rows, ctx, read_state)
vectors = [self._untranspose_row(t) for t in tuple_data]
if self.nullable:
return data_conv.build_nullable_column(vectors, null_map, self._active_null(ctx))
return vectors

def write_column_prefix(self, dest: bytearray):
self._tuple_type.write_column_prefix(dest)

def write_column_data(self, column: Sequence, dest: bytearray, ctx: InsertContext):
"""Convert flat float vectors to bit-transposed Tuple data and write."""
if len(column) == 0:
return

if self.nullable:
dest += bytes([1 if x is None else 0 for x in column])

null_tuple = tuple(b"\x00" * self._bytes_per_fixedstring for _ in range(self._bits_per_element))
tuple_column = [null_tuple if row is None else self._transpose_row(row) for row in column]

self._tuple_type.write_column_data(tuple_column, dest, ctx)

def _active_null(self, ctx: QueryContext):
"""Return context-appropriate null value for nullable QBit columns."""
if ctx.use_none:
return None
if ctx.use_extended_dtypes:
return nan
return None

def _values_to_words(self, values: list[float]) -> Sequence[int]:
"""Convert float values to integer words using batch struct processing."""
count = len(values)

if self.element_type == "BFloat16":
# BFloat16 is the top 16 bits of a Float32 (truncate mantissa)
raw_ints = unpack(f"<{count}I", pack(f"<{count}f", *values))
return [(x >> 16) & 0xFFFF for x in raw_ints]

fmt_char = "I" if self.element_type == "Float32" else "Q"
float_char = "f" if self.element_type == "Float32" else "d"

return unpack(f"<{count}{fmt_char}", pack(f"<{count}{float_char}", *values))

def _words_to_values(self, words: list[int]) -> list[float]:
"""Convert integer words to float values using batch unpacking."""
count = len(words)

if self.element_type == "BFloat16":
# Pad BFloat16 words with zeros to reconstruct valid Float32s
shifted_words = [(w & 0xFFFF) << 16 for w in words]
return list(unpack(f"<{count}f", pack(f"<{count}I", *shifted_words)))

if self.element_type == "Float32":
return list(unpack(f"<{count}f", pack(f"<{count}I", *words)))

# Float64
return list(unpack(f"<{count}d", pack(f"<{count}Q", *words)))

def _untranspose_row(self, bit_planes: tuple):
"""Convert bit-transposed tuple to flat float vector."""
if np is not None:
return self._untranspose_row_numpy(bit_planes)

words = [0] * self.dimension
bit_shifts = self._BIT_SHIFTS
dim = self.dimension

# Iterate Planes (MSB -> LSB)
for bit_idx, bit_plane_bytes in enumerate(bit_planes):
bit_pos = self._bits_per_element - 1 - bit_idx
mask = 1 << bit_pos

# Iterate Bytes in Plane
for byte_idx, byte_val in enumerate(bit_plane_bytes):
# if byte is 0, skip processing 8 bits
if byte_val == 0:
continue

base_elem_idx = byte_idx << 3 # Each byte encodes 8 elements

# Extract set bits from this byte
for bit_in_byte in range(8):
if byte_val & bit_shifts[bit_in_byte]:
elem_idx = base_elem_idx + bit_in_byte
if elem_idx < dim:
words[elem_idx] |= mask # Accumulate bit at position bit_pos

return self._words_to_values(words)

def _untranspose_row_numpy(self, bit_planes: tuple) -> list[float]:
"""Vectorized numpy operations version of _untranspose_row"""
# 1. Convert tuple of bytes to a single uint8 array
total_bytes = b"".join(bit_planes)
planes_uint8 = np.frombuffer(total_bytes, dtype=np.uint8)
planes_uint8 = planes_uint8.reshape(self._bits_per_element, -1)

# 2. Unpack bits to get the boolean/integer matrix
bits_matrix: "np.ndarray" = np.unpackbits(planes_uint8, axis=1, bitorder="little")

# 3. Trim padding if necessary
if bits_matrix.shape[1] != self.dimension: # pylint: disable=no-member
bits_matrix = bits_matrix[:, : self.dimension] # pylint: disable=invalid-sequence-index

# 4. Reconstruct the integer words
if self.element_type == "Float64":
int_dtype = np.uint64
final_dtype = np.float64
else:
# Float32 and BFloat16 use 32-bit containers
int_dtype = np.uint32
final_dtype = np.float32

# Accumulate bits into integers
words = np.zeros(self.dimension, dtype=int_dtype)

for i in range(self._bits_per_element):
# MSB is at index 0
shift = self._bits_per_element - 1 - i

# If the bit row is 1, add 2^shift to the word
# Cast bits to the target int type before shifting to avoid overflow
words |= bits_matrix[i].astype(int_dtype) << shift

# 5. Interpret as Floats
if self.element_type == "BFloat16":
# Shift back up to the top 16 bits of a Float32
# Cast to uint32 first to ensure safe shifting
words = words.astype(np.uint32) << 16
return words.view(np.float32).tolist()

return words.view(final_dtype).tolist()

def _transpose_row(self, values: list[float]) -> tuple:
"""Convert flat float vector to bit-transposed tuple."""
if len(values) != self.dimension:
raise ValueError(f"Vector dimension mismatch: expected {self.dimension}, got {len(values)}")

# If numpy is available, use the fast path
if np is not None:
if isinstance(values, np.ndarray):
return self._transpose_row_numpy(values)

# If numpy is available but user supplied python list, convert to np array anyway for
# huge performance gains.
dtype = np.float64 if self.element_type == "Float64" else np.float32
return self._transpose_row_numpy(np.array(values, dtype=dtype))

words = self._values_to_words(values)
bit_planes = []
bit_shifts = self._BIT_SHIFTS
bytes_per_fs = self._bytes_per_fixedstring

for bit_idx in range(self._bits_per_element):
bit_pos = self._bits_per_element - 1 - bit_idx
mask = 1 << bit_pos
plane = bytearray(bytes_per_fs)

for elem_idx, word in enumerate(words):
if word & mask:
plane[elem_idx >> 3] |= bit_shifts[elem_idx & 7]

bit_planes.append(bytes(plane))

return tuple(bit_planes)

def _transpose_row_numpy(self, vector: "np.ndarray") -> tuple:
"""Fast path for numpy arrays using vectorized operations."""
# Cast to int view
if self.element_type == "BFloat16":
# Numpy doesn't have bfloat16. Input is Float32 so just
# discard the bottom 16 bits.
v_float = vector.astype(np.float32, copy=False)
# View as uint32, shift right 16, cast to uint16
v_int = (v_float.view(np.uint32) >> 16).astype(np.uint16)

elif self.element_type == "Float32":
# Ensure it is 32-bit float first (handles float64->32 downcast safely)
v_float = vector.astype(np.float32, copy=False)
v_int = v_float.view(np.uint32)

else: # Float64
v_float = vector.astype(np.float64, copy=False)
v_int = v_float.view(np.uint64)

bits = self._bits_per_element
masks = (1 << np.arange(bits - 1, -1, -1, dtype=v_int.dtype)).reshape(-1, 1)

# Extract bits: (Bits, Dim)
# v_int broadcasted to (1, Dim)
bits_extracted = (v_int & masks) != 0

packed = np.packbits(bits_extracted.view(np.uint8), axis=1, bitorder="little")

return tuple(row.tobytes() for row in packed)
2 changes: 1 addition & 1 deletion tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def test_temporary_tables(test_client: Client):
test_client.insert_df('temp_test_table', df, settings=session_settings)
df = test_client.query_df('SELECT * FROM temp_test_table', settings=session_settings)
assert len(df['field1']) == 4
test_client.command('DROP TABLE temp_test_table', settings=session_settings)
test_client.command('DROP TABLE IF EXISTS temp_test_table', settings=session_settings)


def test_str_as_bytes(test_client: Client, table_context: Callable):
Expand Down
33 changes: 32 additions & 1 deletion tests/integration_tests/test_sqlalchemy/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from clickhouse_connect import common
from clickhouse_connect.cc_sqlalchemy.datatypes.sqltypes import Int8, UInt16, Decimal, Enum16, Float64, Boolean, \
FixedString, String, UInt64, UUID, DateTime, DateTime64, LowCardinality, Nullable, Array, AggregateFunction, \
UInt32, IPv4
UInt32, IPv4, QBit
from clickhouse_connect.cc_sqlalchemy import final
from clickhouse_connect.cc_sqlalchemy.ddl.custom import CreateDatabase, DropDatabase
from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import engine_map, ReplacingMergeTree
Expand Down Expand Up @@ -181,3 +181,34 @@ def test_final_modifier_error_cases(test_engine: Engine, test_db: str):

test_table.drop(conn)
other_table.drop(conn)


def test_qbit_table(test_engine: Engine, test_db: str, test_table_engine: str, test_config: TestConfig):
"""Test QBit type DDL and basic operations"""
common.set_setting('invalid_setting_action', 'drop')
with test_engine.begin() as conn:
if test_config.cloud:
pytest.skip('QBit type requires allow_experimental_qbit_type setting, but settings are locked in cloud')

if not conn.connection.driver_connection.client.min_version('25.10'):
pytest.skip('QBit type requires ClickHouse version 25.10+')

conn.execute(text('SET allow_experimental_qbit_type = 1'))

table_cls = engine_map[test_table_engine]
metadata = MetaData(schema=test_db)
conn.execute(text('DROP TABLE IF EXISTS qbit_test'))

table = db.Table('qbit_test', metadata,
db.Column('id', UInt32),
db.Column('vector', QBit('Float32', 8)),
db.Column('embedding', QBit('Float32', 128)),
table_cls('id'))
table.create(conn)

result = conn.execute(text("SHOW CREATE TABLE qbit_test"))
create_sql = result.fetchone()[0]
assert 'QBit(Float32, 8)' in create_sql
assert 'QBit(Float32, 128)' in create_sql

conn.execute(text('DROP TABLE IF EXISTS qbit_test'))
Loading