Skip to content

Commit c65bf31

Browse files
implement fix for primitive types in json shared data space
1 parent 70bd2d2 commit c65bf31

File tree

5 files changed

+391
-5
lines changed

5 files changed

+391
-5
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ The supported method of passing ClickHouse server settings is to prefix such arg
2525

2626
### Bug Fixes
2727
- Fix issue with DROP table in client temp table test.
28+
- Fixed `StreamFailureError` when reading JSON columns with shared data by correctly decoding the binary variant values. Closes [#599](https://github.com/ClickHouse/clickhouse-connect/issues/599)
29+
- Fixed JSON column reconstruction to properly handle nested paths in shared data (keys beyond `max_dynamic_paths`).
30+
- Corrected the internal definition of `SHARED_DATA_TYPE` for JSON columns.
2831

2932
### Improvements
3033
- Add support for QBit data type. Closes [#570](https://github.com/ClickHouse/clickhouse-connect/issues/570)

clickhouse_connect/datatypes/dynamic.py

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from collections import namedtuple
2-
from typing import List, Sequence, Collection, Any
2+
import logging
3+
from typing import List, Sequence, Collection, Any, Union
34
from urllib.parse import unquote
45

56
from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
67
from clickhouse_connect.datatypes.registry import get_from_name
78
from clickhouse_connect.driver.common import unescape_identifier, first_value, write_uint64
9+
from clickhouse_connect.driver.bytesource import ByteArraySource
10+
from clickhouse_connect.datatypes.string import String
811
from clickhouse_connect.driver.ctypes import data_conv
912
from clickhouse_connect.driver.errors import handle_error
1013
from clickhouse_connect.driver.exceptions import DataError
@@ -18,6 +21,8 @@
1821
_JSON_NULL = b'null'
1922
_JSON_NULL_STR = 'null'
2023

24+
logger = logging.getLogger(__name__)
25+
2126
json_serialization_format = 0x1
2227

2328
VariantState = namedtuple('VariantState', 'discriminator_node element_states')
@@ -30,6 +35,11 @@ def _json_path_segments(path: str) -> List[str]:
3035
return segments
3136

3237

38+
class SharedDataString(String):
39+
def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, _read_state: Any):
40+
return source.read_str_col(num_rows, None)
41+
42+
3343
class Variant(ClickHouseType):
3444
_slots = 'element_types'
3545
python_type = object
@@ -169,7 +179,59 @@ def write_str_values(ch_type: ClickHouseType, column: Sequence, dest: bytearray,
169179
handle_error(data_conv.write_str_col(col, False, encoding, dest), ctx)
170180

171181

172-
JSONState = namedtuple('JSONState', 'serialize_version dynamic_paths typed_states dynamic_states')
182+
JSONState = namedtuple("JSONState", "serialize_version dynamic_paths typed_states dynamic_states shared_state")
183+
184+
# Standard discriminator to type mapping for shared data
185+
# From https://github.com/ClickHouse/ClickHouse/src/DataTypes/DataTypesBinaryEncoding.cpp
186+
STANDARD_DISCRIMINATOR_TYPES = {
187+
0x00: "Nothing",
188+
0x01: "UInt8",
189+
0x02: "UInt16",
190+
0x03: "UInt32",
191+
0x04: "UInt64",
192+
0x05: "UInt128",
193+
0x06: "UInt256",
194+
0x07: "Int8",
195+
0x08: "Int16",
196+
0x09: "Int32",
197+
0x0A: "Int64",
198+
0x0B: "Int128",
199+
0x0C: "Int256",
200+
0x0D: "Float32",
201+
0x0E: "Float64",
202+
0x15: "String",
203+
0x2D: "Bool",
204+
}
205+
206+
207+
def decode_shared_data_value(binary_data: Union[bytes, str], ctx: QueryContext):
208+
"""Decode a variant-encoded value from JSON shared data using discriminator byte."""
209+
if binary_data is None:
210+
return None
211+
212+
if len(binary_data) < 1:
213+
return binary_data
214+
215+
discriminator = binary_data[0]
216+
if discriminator == 255:
217+
return None
218+
219+
type_name = STANDARD_DISCRIMINATOR_TYPES.get(discriminator)
220+
if type_name is None:
221+
return binary_data
222+
223+
value_type = get_from_name(type_name)
224+
225+
try:
226+
byte_source = ByteArraySource(binary_data[1:])
227+
read_state = value_type.read_column_prefix(byte_source, ctx)
228+
result = value_type.read_column_data(byte_source, 1, ctx, read_state)
229+
return result[0] if result else None
230+
231+
# pylint: disable=broad-exception-caught
232+
except Exception as e:
233+
logger.debug("Shared data decode failed: %s", e)
234+
return binary_data
173235

174236

175237
class JSON(ClickHouseType):
@@ -247,7 +309,8 @@ def read_column_prefix(self, source: ByteSource, ctx: QueryContext) -> JSONState
247309
dynamic_paths = [source.read_leb128_str() for _ in range(dynamic_path_cnt)]
248310
typed_states = [typed.read_column_prefix(source, ctx) for typed in self.typed_types]
249311
dynamic_states = [read_dynamic_prefix(self, source, ctx) for _ in range(dynamic_path_cnt)]
250-
return JSONState(serialize_version, dynamic_paths, typed_states, dynamic_states)
312+
shared_state = SHARED_DATA_TYPE.read_column_prefix(source, ctx)
313+
return JSONState(serialize_version, dynamic_paths, typed_states, dynamic_states, shared_state)
251314

252315
# pylint: disable=too-many-locals
253316
def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryContext, read_state: JSONState):
@@ -256,7 +319,7 @@ def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryConte
256319
dynamic_columns = [
257320
read_variant_column(source, num_rows, ctx, dynamic_state.variant_types, dynamic_state.variant_states)
258321
for dynamic_state in read_state.dynamic_states]
259-
SHARED_DATA_TYPE.read_column_data(source, num_rows, ctx, None)
322+
shared_columns = SHARED_DATA_TYPE.read_column_data(source, num_rows, ctx, read_state.shared_state)
260323
col = []
261324
for row_num in range(num_rows):
262325
top = {}
@@ -284,6 +347,11 @@ def _read_column_binary(self, source: ByteSource, num_rows: int, ctx: QueryConte
284347
item[key] = child
285348
item = child
286349
item[chain[-1]] = value
350+
if shared_columns and row_num < len(shared_columns):
351+
shared_data = shared_columns[row_num]
352+
if shared_data:
353+
decoded_shared = {key: decode_shared_data_value(value, ctx) for key, value in shared_data.items()}
354+
top.update(decoded_shared)
287355
col.append(top)
288356
if self.read_format(ctx) == 'string':
289357
return [any_to_json(v) for v in col]

clickhouse_connect/datatypes/postinit.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from clickhouse_connect.datatypes import registry, dynamic, geometric
22

3-
dynamic.SHARED_DATA_TYPE = registry.get_from_name('Array(String, String)')
43
dynamic.STRING_DATA_TYPE = registry.get_from_name('String')
4+
dynamic.SHARED_DATA_TYPE = registry.get_from_name('Map(String, String)')
5+
dynamic.SHARED_DATA_TYPE.value_type = dynamic.SharedDataString(dynamic.STRING_DATA_TYPE.type_def)
56

67
point = 'Tuple(Float64, Float64)'
78
ring = f'Array({point})'
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import struct
2+
3+
from clickhouse_connect.driver.types import ByteSource
4+
5+
6+
class ByteArraySource(ByteSource):
7+
"""
8+
ByteSource implementation for in-memory byte arrays.
9+
10+
This class wraps a byte array and provides the ByteSource interface,
11+
allowing ClickHouse type decoders to read from in-memory data instead
12+
of a network stream.
13+
14+
Used primarily for decoding variant-encoded values from JSON shared data
15+
where each value is a complete serialized type instance.
16+
"""
17+
18+
def __init__(self, data: bytes, encoding: str = "utf-8"):
19+
"""
20+
Initialize ByteArraySource with byte array data.
21+
22+
:param data: The byte array to read from
23+
:param encoding: Character encoding for string operations (default: utf-8)
24+
"""
25+
self.data = data
26+
self.pos = 0
27+
self.encoding = encoding
28+
29+
def read_byte(self) -> int:
30+
"""Read a single byte and advance position."""
31+
if self.pos >= len(self.data):
32+
raise EOFError("Attempted to read past end of byte array")
33+
b = self.data[self.pos]
34+
self.pos += 1
35+
return b
36+
37+
def read_bytes(self, sz: int) -> bytes:
38+
"""Read specified number of bytes and advance position."""
39+
if self.pos + sz > len(self.data):
40+
raise EOFError(f"Attempted to read {sz} bytes, only {len(self.data) - self.pos} available")
41+
result = self.data[self.pos : self.pos + sz]
42+
self.pos += sz
43+
return result
44+
45+
def read_leb128(self) -> int:
46+
"""Read a LEB128 (variable-length) encoded integer."""
47+
sz = 0
48+
shift = 0
49+
while self.pos < len(self.data):
50+
b = self.read_byte()
51+
sz += (b & 0x7F) << shift
52+
if (b & 0x80) == 0:
53+
return sz
54+
shift += 7
55+
raise EOFError("Unexpected end while reading LEB128")
56+
57+
def read_leb128_str(self) -> str:
58+
"""Read a LEB128 length-prefixed string."""
59+
sz = self.read_leb128()
60+
return self.read_bytes(sz).decode(self.encoding)
61+
62+
def read_uint64(self) -> int:
63+
"""Read an unsigned 64-bit integer (little-endian)."""
64+
return int.from_bytes(self.read_bytes(8), "little", signed=False)
65+
66+
def read_int64(self) -> int:
67+
"""Read a signed 64-bit integer (little-endian)."""
68+
return int.from_bytes(self.read_bytes(8), "little", signed=True)
69+
70+
def read_uint32(self) -> int:
71+
"""Read an unsigned 32-bit integer (little-endian)."""
72+
return int.from_bytes(self.read_bytes(4), "little", signed=False)
73+
74+
def read_int32(self) -> int:
75+
"""Read a signed 32-bit integer (little-endian)."""
76+
return int.from_bytes(self.read_bytes(4), "little", signed=True)
77+
78+
def read_uint16(self) -> int:
79+
"""Read an unsigned 16-bit integer (little-endian)."""
80+
return int.from_bytes(self.read_bytes(2), "little", signed=False)
81+
82+
def read_int16(self) -> int:
83+
"""Read a signed 16-bit integer (little-endian)."""
84+
return int.from_bytes(self.read_bytes(2), "little", signed=True)
85+
86+
def read_float32(self) -> float:
87+
"""Read a 32-bit float (little-endian)."""
88+
return struct.unpack("<f", self.read_bytes(4))[0]
89+
90+
def read_float64(self) -> float:
91+
"""Read a 64-bit float (double, little-endian)."""
92+
return struct.unpack("<d", self.read_bytes(8))[0]
93+
94+
# pylint: disable=too-many-return-statements
95+
def read_array(self, array_type: str, num_rows: int): # type: ignore
96+
"""
97+
Limited implementation of array reading for basic types.
98+
99+
Args:
100+
array_type: Python struct format character
101+
'B' = UInt8, 'H' = UInt16, 'I' = UInt32, 'Q' = UInt64
102+
'b' = Int8, 'h' = Int16, 'i' = Int32, 'q' = Int64
103+
'f' = Float32, 'd' = Float64
104+
num_rows: Number of elements to read
105+
106+
Returns:
107+
List of values
108+
"""
109+
if array_type == "B":
110+
return [self.read_byte() for _ in range(num_rows)]
111+
elif array_type == "H":
112+
return [self.read_uint16() for _ in range(num_rows)]
113+
elif array_type == "I":
114+
return [self.read_uint32() for _ in range(num_rows)]
115+
elif array_type == "Q":
116+
return [self.read_uint64() for _ in range(num_rows)]
117+
elif array_type == "b":
118+
return [int.from_bytes([self.read_byte()], "little", signed=True) for _ in range(num_rows)]
119+
elif array_type == "h":
120+
return [self.read_int16() for _ in range(num_rows)]
121+
elif array_type == "i":
122+
return [self.read_int32() for _ in range(num_rows)]
123+
elif array_type == "q":
124+
return [self.read_int64() for _ in range(num_rows)]
125+
elif array_type == "f":
126+
return [self.read_float32() for _ in range(num_rows)]
127+
elif array_type == "d":
128+
return [self.read_float64() for _ in range(num_rows)]
129+
else:
130+
raise NotImplementedError(f"Array type {array_type} not implemented for ByteArraySource")
131+
132+
# Minimal implementations for other ByteSource methods that aren't needed
133+
# for single-value decoding but are required by the interface
134+
135+
def read_str_col(self, num_rows, encoding, nullable=False, null_obj=None): # type: ignore
136+
"""
137+
Read a column of strings.
138+
For single-value decoding (num_rows=1), read one LEB128 length-prefixed string.
139+
"""
140+
if num_rows != 1:
141+
raise NotImplementedError("read_str_col only supports num_rows=1 for single-value decoding")
142+
143+
length = self.read_leb128()
144+
string_bytes = self.read_bytes(length)
145+
146+
if encoding is None:
147+
return [string_bytes]
148+
149+
return [string_bytes.decode(encoding)]
150+
151+
def read_bytes_col(self, sz, num_rows):
152+
"""Not used for single-value decoding."""
153+
raise NotImplementedError("read_bytes_col not needed for single-value decoding")
154+
155+
def read_fixed_str_col(self, sz, num_rows, encoding):
156+
"""Not used for single-value decoding."""
157+
raise NotImplementedError("read_fixed_str_col not needed for single-value decoding")
158+
159+
def close(self):
160+
"""No cleanup needed for byte arrays."""

0 commit comments

Comments
 (0)