Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions materialize-clickhouse/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,33 @@ SELECT 0::Int32, r.flow_document
--- Begin key_value queryLoadTableNoFlowDocument ---

SELECT 0::Int32,
toJSONString(map(
'key1', r.key1,
'key2', r.key2,
'key!binary', r.`key!binary`,
'array', r.`array`,
'binary', r.binary,
'boolean', r.boolean,
'field_with_projection', r.field_with_projection,
'integer', r.integer,
'integerGt64Bit', r.integerGt64Bit,
'integerWithUserDDL', r.integerWithUserDDL,
'multiple', r.multiple,
'nonAsciiκόσμε', r.`nonAsciiκόσμε`,
'number', r.number,
'numberCastToString', toString(r.numberCastToString),
'object', r.object,
'string', r.string,
'stringInteger', r.stringInteger,
'stringInteger39Chars', r.stringInteger39Chars,
'stringInteger66Chars', r.stringInteger66Chars,
'stringNumber', toString(r.stringNumber)
)) AS flow_document
concat('{',
'"key1":', ifNull(toJSONString(r.key1), 'null'), ',',
'"key2":', ifNull(toJSONString(r.key2), 'null'), ',',
'"key!binary":', ifNull(toJSONString(r.`key!binary`), 'null'), ',',
'"array":', ifNull(r.`array`, 'null'), ',',
'"binary":', ifNull(toJSONString(r.binary), 'null'), ',',
'"boolean":', ifNull(toJSONString(r.boolean), 'null'), ',',
'"field_with_projection":', ifNull(toJSONString(r.field_with_projection), 'null'), ',',
'"integer":', ifNull(toJSONString(r.integer), 'null'), ',',
'"integerGt64Bit":', ifNull(toJSONString(r.integerGt64Bit), 'null'), ',',
'"integerWithUserDDL":', ifNull(toJSONString(r.integerWithUserDDL), 'null'), ',',
'"multiple":', ifNull(r.multiple, 'null'), ',',
'"nonAsciiκόσμε":', ifNull(toJSONString(r.`nonAsciiκόσμε`), 'null'), ',',
'"number":', ifNull(toJSONString(r.number), 'null'), ',',
'"numberCastToString":', ifNull(toJSONString(toString(r.numberCastToString)), 'null'), ',',
'"object":', ifNull(r.object, 'null'), ',',
'"string":', ifNull(toJSONString(r.string), 'null'), ',',
'"stringInteger":', ifNull(toJSONString(r.stringInteger), 'null'), ',',
'"stringInteger39Chars":', ifNull(toJSONString(r.stringInteger39Chars), 'null'), ',',
'"stringInteger66Chars":', ifNull(toJSONString(r.stringInteger66Chars), 'null'), ',',
'"stringNumber":', ifNull(toJSONString(toString(r.stringNumber)), 'null')
, '}') AS flow_document
FROM key_value AS r FINAL
JOIN flow_temp_load_0_key_value AS l
ON l.key1 = r.key1
AND l.key2 = r.key2
AND l.`key!binary` = r.`key!binary`
SETTINGS use_variant_as_common_type = 1
--- End key_value queryLoadTableNoFlowDocument ---

--- Begin delta_updates queryLoadTableNoFlowDocument ---
Expand Down
45 changes: 22 additions & 23 deletions materialize-clickhouse/.snapshots/TestSQLGenerationQuotedTableNames
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,33 @@ SELECT 0::Int32, r.flow_document
--- Begin `key_value-@你好-``-"especiál` queryLoadTableNoFlowDocument ---

SELECT 0::Int32,
toJSONString(map(
'key1', r.key1,
'key2', r.key2,
'key!binary', r.`key!binary`,
'array', r.`array`,
'binary', r.binary,
'boolean', r.boolean,
'field_with_projection', r.field_with_projection,
'integer', r.integer,
'integerGt64Bit', r.integerGt64Bit,
'integerWithUserDDL', r.integerWithUserDDL,
'multiple', r.multiple,
'nonAsciiκόσμε', r.`nonAsciiκόσμε`,
'number', r.number,
'numberCastToString', toString(r.numberCastToString),
'object', r.object,
'string', r.string,
'stringInteger', r.stringInteger,
'stringInteger39Chars', r.stringInteger39Chars,
'stringInteger66Chars', r.stringInteger66Chars,
'stringNumber', toString(r.stringNumber)
)) AS flow_document
concat('{',
'"key1":', ifNull(toJSONString(r.key1), 'null'), ',',
'"key2":', ifNull(toJSONString(r.key2), 'null'), ',',
'"key!binary":', ifNull(toJSONString(r.`key!binary`), 'null'), ',',
'"array":', ifNull(r.`array`, 'null'), ',',
'"binary":', ifNull(toJSONString(r.binary), 'null'), ',',
'"boolean":', ifNull(toJSONString(r.boolean), 'null'), ',',
'"field_with_projection":', ifNull(toJSONString(r.field_with_projection), 'null'), ',',
'"integer":', ifNull(toJSONString(r.integer), 'null'), ',',
'"integerGt64Bit":', ifNull(toJSONString(r.integerGt64Bit), 'null'), ',',
'"integerWithUserDDL":', ifNull(toJSONString(r.integerWithUserDDL), 'null'), ',',
'"multiple":', ifNull(r.multiple, 'null'), ',',
'"nonAsciiκόσμε":', ifNull(toJSONString(r.`nonAsciiκόσμε`), 'null'), ',',
'"number":', ifNull(toJSONString(r.number), 'null'), ',',
'"numberCastToString":', ifNull(toJSONString(toString(r.numberCastToString)), 'null'), ',',
'"object":', ifNull(r.object, 'null'), ',',
'"string":', ifNull(toJSONString(r.string), 'null'), ',',
'"stringInteger":', ifNull(toJSONString(r.stringInteger), 'null'), ',',
'"stringInteger39Chars":', ifNull(toJSONString(r.stringInteger39Chars), 'null'), ',',
'"stringInteger66Chars":', ifNull(toJSONString(r.stringInteger66Chars), 'null'), ',',
'"stringNumber":', ifNull(toJSONString(toString(r.stringNumber)), 'null')
, '}') AS flow_document
FROM `key_value-@你好-``-"especiál` AS r FINAL
JOIN `flow_temp_load_0_key_value-@你好-``-"especiál` AS l
ON l.key1 = r.key1
AND l.key2 = r.key2
AND l.`key!binary` = r.`key!binary`
SETTINGS use_variant_as_common_type = 1
--- End `key_value-@你好-``-"especiál` queryLoadTableNoFlowDocument ---

--- Begin `delta_updates-@你好-``-"especiál` queryLoadTableNoFlowDocument ---
Expand Down
184 changes: 184 additions & 0 deletions materialize-clickhouse/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,3 +1100,187 @@ func TestMovePartitionMissingTarget(t *testing.T) {
require.ErrorAs(t, moveErr, &exc)
require.EqualValues(t, chproto.ErrUnknownTable, exc.Code)
}

// noFlowDocShape builds a TableShape for NoFlowDocument tests with a variety
// of column types: a string key, nullable and non-nullable object/array/multiple
// values, a nullable string, a date-time, and a string_number.
func noFlowDocShape(tableName string) sql.TableShape {
return sql.TableShape{
Path: sql.TablePath{tableName},
Binding: 0,
Keys: []sql.Projection{
{
Projection: pf.Projection{
Ptr: "/id",
Field: "id",
Inference: pf.Inference{
Types: []string{"string"},
Exists: pf.Inference_MUST,
String_: &pf.Inference_String{},
},
},
},
},
Values: []sql.Projection{
{
Projection: pf.Projection{
Ptr: "/_meta",
Field: "_meta",
Inference: pf.Inference{
Types: []string{"object"},
Exists: pf.Inference_MUST,
},
},
},
{
Projection: pf.Projection{
Ptr: "/tags",
Field: "tags",
Inference: pf.Inference{
Types: []string{"array", "null"},
Exists: pf.Inference_MAY,
},
},
},
{
Projection: pf.Projection{
Ptr: "/extra",
Field: "extra",
Inference: pf.Inference{
Types: []string{"integer", "object", "boolean", "null"},
Exists: pf.Inference_MAY,
},
},
},
{
Projection: pf.Projection{
Ptr: "/note",
Field: "note",
Inference: pf.Inference{
Types: []string{"string", "null"},
Exists: pf.Inference_MAY,
String_: &pf.Inference_String{},
},
},
},
{
Projection: pf.Projection{
Ptr: "/score",
Field: "score",
Inference: pf.Inference{
Types: []string{"number", "string"},
Exists: pf.Inference_MUST,
String_: &pf.Inference_String{Format: "number"},
},
},
},
{
Projection: pf.Projection{
Ptr: "/flow_published_at",
Field: "flow_published_at",
Inference: pf.Inference{
Types: []string{"string"},
Exists: pf.Inference_MUST,
String_: &pf.Inference_String{Format: "date-time"},
},
},
},
},
}
}

// TestNoFlowDocumentObjectColumns verifies that root-level OBJECT, ARRAY, and
// MULTIPLE columns are correctly embedded as JSON objects/arrays (not
// double-encoded as strings) when reconstructed by the
// queryLoadTableNoFlowDocument template.
func TestNoFlowDocumentObjectColumns(t *testing.T) {
var cfg = testConfig()
cfg.Advanced.NoFlowDocument = true
var ctx = t.Context()
var dialect = clickHouseDialect(cfg.Database)
var tpls = renderTemplates(dialect, cfg.HardDelete)
var tableName = "test_no_flow_doc_object"

var shape = noFlowDocShape(tableName)
table, err := sql.ResolveTable(shape, dialect)
require.NoError(t, err)

b, storeConn, loadConn := setupTable(t, ctx, cfg, dialect, tpls, table, tableName)

storeRows(t, ctx, storeConn, b, cfg.Database, []any{
"k1", // id
`{"op":"c","extra":"stuff"}`, // _meta (object)
`["tag1","tag2"]`, // tags (nullable array)
`{"nested":true}`, // extra (nullable multiple)
"hello", // note (nullable string)
float64(3.14), // score (string_number → Float64)
testTime, // flow_published_at (date-time)
})

docs := loadDocuments(t, ctx, loadConn, b, []any{"k1"})
require.Len(t, docs, 1)

var parsed map[string]json.RawMessage
require.NoError(t, json.Unmarshal([]byte(docs[0]), &parsed))

// OBJECT: embedded as JSON object.
require.JSONEq(t, `{"op":"c","extra":"stuff"}`, string(parsed["_meta"]))

// ARRAY: embedded as JSON array.
require.JSONEq(t, `["tag1","tag2"]`, string(parsed["tags"]))

// MULTIPLE: embedded as its actual JSON type.
require.JSONEq(t, `{"nested":true}`, string(parsed["extra"]))

// STRING: properly quoted.
require.Equal(t, `"hello"`, string(parsed["note"]))

// STRING_NUMBER: serialized as a JSON string (quoted), not a bare number.
require.Equal(t, `"3.14"`, string(parsed["score"]))

// DATE-TIME: RFC3339 string.
require.Equal(t, `"2024-01-01T00:00:00.000000Z"`, string(parsed["flow_published_at"]))
}

// TestNoFlowDocumentNullValues verifies that nullable columns serialize as JSON
// null (not the string "null" or a missing key) when their value is NULL.
func TestNoFlowDocumentNullValues(t *testing.T) {
var cfg = testConfig()
cfg.Advanced.NoFlowDocument = true
var ctx = t.Context()
var dialect = clickHouseDialect(cfg.Database)
var tpls = renderTemplates(dialect, cfg.HardDelete)
var tableName = "test_no_flow_doc_nulls"

var shape = noFlowDocShape(tableName)
table, err := sql.ResolveTable(shape, dialect)
require.NoError(t, err)

b, storeConn, loadConn := setupTable(t, ctx, cfg, dialect, tpls, table, tableName)

// Store a row where all nullable columns are NULL.
storeRows(t, ctx, storeConn, b, cfg.Database, []any{
"k1", // id
`{"op":"c"}`, // _meta (non-nullable object)
nil, // tags (nullable array → NULL)
nil, // extra (nullable multiple → NULL)
nil, // note (nullable string → NULL)
float64(1.0), // score (non-nullable string_number)
testTime, // flow_published_at
})

docs := loadDocuments(t, ctx, loadConn, b, []any{"k1"})
require.Len(t, docs, 1)

var parsed map[string]json.RawMessage
require.NoError(t, json.Unmarshal([]byte(docs[0]), &parsed))

// Nullable columns with NULL values must serialize as JSON null.
require.Equal(t, "null", string(parsed["tags"]))
require.Equal(t, "null", string(parsed["extra"]))
require.Equal(t, "null", string(parsed["note"]))

// Non-nullable columns are still present.
require.JSONEq(t, `{"op":"c"}`, string(parsed["_meta"]))
require.Equal(t, `"1"`, string(parsed["score"]))
}
45 changes: 24 additions & 21 deletions materialize-clickhouse/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,44 +257,47 @@ SELECT {{ $.Binding }}::Int32, r.{{$.Document.Identifier}}
{{ end -}}
{{ end }}

-- Templated query for no_flow_document mode - reconstructs JSON from root-level columns
-- using toJSONString(map(...)) with SETTINGS use_variant_as_common_type = 1 to allow
-- mixed-type map values.
--
-- toJSONString(map(...)) handles most types correctly, including Nullable columns
-- (serialized as null). Four types need pre-processing before going into the map:
-- - DateTime64: toJSONString formats as "YYYY-MM-DD HH:MM:SS.ffffff" which isn't
-- RFC3339; we formatDateTime and append 'Z' to produce a proper string value.
-- - STRING_NUMBER: stored as Float64 but must appear as a JSON string; toString()
-- converts it so the map serializes it quoted.

{{ define "mapValue" -}}
-- Templated query for no_flow_document mode - reconstructs JSON from root-level
-- columns using concat(). Each column value is individually serialized:
-- - OBJECT, ARRAY, MULTIPLE: stored as String containing valid JSON, embedded
-- directly via ifNull(col, 'null') so they appear as proper JSON
-- objects/arrays rather than double-encoded strings.
-- - DateTime64: formatDateTime produces RFC3339, wrapped in toJSONString for
-- proper quoting.
-- - STRING_NUMBER: stored as Float64 but must appear as a JSON string;
-- toString() converts before toJSONString quotes it.
-- - All other types: toJSONString handles correct JSON serialization
-- (quoting strings, bare numbers/bools).
-- Nullable columns use ifNull(..., 'null') to emit JSON null.

{{ define "concatValue" -}}
{{ $ident := printf "%s.%s" $.Alias $.Identifier }}
{{- if and (eq $.AsFlatType "string") (eq $.Format "date-time") -}}
formatDateTime({{ $ident }}, '%Y-%m-%dT%H:%i:%S.%f', 'UTC') || 'Z'
{{- if or (eq $.AsFlatType "object") (eq $.AsFlatType "array") (eq $.AsFlatType "multiple") -}}
ifNull({{ $ident }}, 'null')
{{- else if and (eq $.AsFlatType "string") (eq $.Format "date-time") -}}
ifNull(toJSONString(formatDateTime({{ $ident }}, '%Y-%m-%dT%H:%i:%S.%f', 'UTC') || 'Z'), 'null')
{{- else if eq $.AsFlatType "string_number" -}}
toString({{ $ident }})
ifNull(toJSONString(toString({{ $ident }})), 'null')
{{- else -}}
{{ $ident }}
ifNull(toJSONString({{ $ident }}), 'null')
{{- end -}}
{{- end }}

{{ define "queryLoadTableNoFlowDocument" }}
{{ if not $.DeltaUpdates -}}
SELECT {{ $.Binding }}::Int32,
toJSONString(map(
concat('{',
{{- range $i, $col := $.RootLevelColumns -}}
{{- if $i }},{{ end }}
{{ Literal $col.Field }}, {{ template "mapValue" (ColumnWithAlias $col "r") }}
{{- if $i }}, ',',{{ end }}
'"{{ $col.Field }}":', {{ template "concatValue" (ColumnWithAlias $col "r") }}
{{- end }}
)) AS flow_document
, '}') AS flow_document
FROM {{$.Identifier}} AS r FINAL
JOIN {{ template "loadTableName" . }} AS l
{{- range $ind, $key := $.Keys }}
{{ if $ind }} AND {{ else }} ON {{ end -}}
l.{{$key.Identifier}} = r.{{$key.Identifier}}
{{- end }}
SETTINGS use_variant_as_common_type = 1
{{ else -}}
SELECT * FROM (SELECT -1::Int32, ''::String LIMIT 0) as nodoc
{{ end -}}
Expand Down
Loading