Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 83609fd

Browse files
authored
Unify golang type to ClickHouse type conversion in CREATE TABLE and validation codepaths, improve Tuple(...) validation (#1259)
In the ingest processor we had two separate codepaths for converting a Golang type into ClickHouse column type: one used in `CREATE TABLE` codepath (`clickhouse.NewType`) and another in ingest validation (`getTypeName`). The first one had better support for (nested) arrays and maps, which were not fully supported in ingest validation. Therefore this PR unifies the ingest validation codepath to use the `clickhouse.NewType` from `CREATE TABLE` codepath. As a result, ingest validation now has a better support for nested arrays, maps (see `test_ingest.go`) and the code is now simpler.
1 parent de57bd5 commit 83609fd

File tree

4 files changed

+141
-150
lines changed

4 files changed

+141
-150
lines changed

ci/it/testcases/test_ingest.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -789,9 +789,7 @@ func (a *IngestTestcase) testIncompleteTypes(ctx context.Context, t *testing.T)
789789
assert.Contains(t, results[0].cols[2].(string), "QUESMA_DOC2_2")
790790
assert.Contains(t, results[0].cols[2].(string), "QUESMA_DOC2_3")
791791
assert.Contains(t, *results[0].cols[5].(*string), "QUESMA_DOC2_4")
792-
793-
// FIXME: this is not inserted correctly yet!
794-
// assert.Contains(t, results[0].cols[8].(string), "QUESMA_DOC2_5")
792+
assert.Contains(t, results[0].cols[8].(string), "QUESMA_DOC2_5")
795793

796794
assert.Contains(t, *results[1].cols[0].(*string), "QUESMA_DOC3_1")
797795
assert.Contains(t, *results[1].cols[1].(*string), "QUESMA_DOC3_2")
@@ -802,9 +800,6 @@ func (a *IngestTestcase) testIncompleteTypes(ctx context.Context, t *testing.T)
802800
assert.Contains(t, *results[1].cols[5].(*string), "QUESMA_DOC3_7")
803801
assert.Contains(t, *results[1].cols[6].(*string), "QUESMA_DOC3_8")
804802
assert.Contains(t, *results[1].cols[7].(*string), "QUESMA_DOC3_9")
805-
806-
// FIXME: this is not inserted correctly yet!
807-
// assert.Contains(t, results[1].cols[8].(string), "QUESMA_DOC3_10")
808-
803+
assert.Contains(t, results[1].cols[8].(string), "QUESMA_DOC3_10")
809804
assert.Contains(t, results[1].cols[9].(string), "QUESMA_DOC3_11")
810805
}

quesma/clickhouse/schema.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,16 @@ func (t MultiValueType) CanConvert(v interface{}) bool {
191191
return false // TODO for now. For sure can implement tuples easily, maybe some other too
192192
}
193193

194+
func (t MultiValueType) GetColumn(name string) *Column {
195+
// TODO: linear scan, but this will suffice for now (Tuples aren't typically large)
196+
for _, col := range t.Cols {
197+
if col.Name == name {
198+
return col
199+
}
200+
}
201+
return nil
202+
}
203+
194204
func NewBaseType(clickHouseTypeName string) BaseType {
195205
var GoType = ResolveType(clickHouseTypeName)
196206
if GoType == nil {
@@ -255,6 +265,8 @@ func NewType(value any, valueOrigin string) (Type, error) {
255265
} else {
256266
return BaseType{Name: "Float64", GoType: reflect.TypeOf(float64(0))}, nil
257267
}
268+
case int:
269+
return BaseType{Name: "Int64", GoType: reflect.TypeOf(int64(0))}, nil
258270
case bool:
259271
return BaseType{Name: "Bool", GoType: reflect.TypeOf(true)}, nil
260272
case map[string]interface{}:

quesma/ingest/ingest_validator.go

Lines changed: 51 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -9,80 +9,8 @@ import (
99
"github.com/QuesmaOrg/quesma/quesma/logger"
1010
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
1111
"math"
12-
"reflect"
1312
)
1413

15-
func isInt(f float64) bool {
16-
return f == float64(int64(f))
17-
}
18-
19-
func isUnsignedInt(f float64) bool {
20-
if f < 0 {
21-
return false
22-
}
23-
return f == float64(uint64(f))
24-
}
25-
26-
func getTypeName(v interface{}) string {
27-
const unknownLiteral = "unknown"
28-
const arrayLiteral = "Array"
29-
primitiveTypes := map[string]string{
30-
"string": "String",
31-
"bool": "Bool",
32-
"int": "Int64",
33-
"float64": "Float64",
34-
"uint": "UInt64",
35-
}
36-
if v == nil {
37-
return unknownLiteral
38-
}
39-
GoType := reflect.TypeOf(v).String()
40-
switch GoType {
41-
case "string", "bool":
42-
return primitiveTypes[GoType]
43-
case "int":
44-
if v.(int) < 0 {
45-
return primitiveTypes["int"]
46-
} else {
47-
return primitiveTypes["uint"]
48-
}
49-
case "float64":
50-
if isInt(v.(float64)) {
51-
return primitiveTypes["int"]
52-
} else if isUnsignedInt(v.(float64)) {
53-
return primitiveTypes["uint"]
54-
}
55-
return primitiveTypes[GoType]
56-
}
57-
switch elem := v.(type) {
58-
case []interface{}:
59-
if len(elem) == 0 {
60-
return arrayLiteral + "(unknown)"
61-
} else {
62-
innerTypeName := getTypeName(elem[0])
63-
// Make sure that all elements of the array have the same type
64-
for _, e := range elem {
65-
if getTypeName(e) != innerTypeName {
66-
return arrayLiteral + "(unknown)"
67-
}
68-
}
69-
return arrayLiteral + "(" + innerTypeName + ")"
70-
}
71-
case interface{}:
72-
if e := reflect.ValueOf(elem); e.Kind() == reflect.Slice {
73-
innerTypeName := getTypeName(e.Index(0).Interface())
74-
// Make sure that all elements of the slice have the same type
75-
for i := 1; i < e.Len(); i++ {
76-
if getTypeName(e.Index(i).Interface()) != innerTypeName {
77-
return arrayLiteral + "(unknown)"
78-
}
79-
}
80-
return arrayLiteral + "(" + innerTypeName + ")"
81-
}
82-
}
83-
return GoType
84-
}
85-
8614
func removeLowCardinality(columnType string) string {
8715
if columnType == "LowCardinality(String)" {
8816
return "String"
@@ -169,63 +97,66 @@ func validateNumericType(columnType string, incomingValueType string, value inte
16997
return false
17098
}
17199

172-
func validateValueAgainstType(fieldName string, value interface{}, targetColumnType clickhouse.Type) types.JSON {
173-
// Validate Array() types by recursing on the inner type:
174-
if compoundType, isCompound := targetColumnType.(clickhouse.CompoundType); isCompound && compoundType.Name == "Array" {
175-
if valueAsArray, isArray := value.([]interface{}); isArray && len(valueAsArray) > 0 {
176-
innerTypesAreCompatible := true
177-
innerTypeName := getTypeName(valueAsArray[0])
100+
func validateValueAgainstType(fieldName string, value interface{}, columnType clickhouse.Type) (isValid bool) {
101+
switch columnType := columnType.(type) {
102+
case clickhouse.BaseType:
103+
incomingValueType, err := clickhouse.NewType(value, fieldName)
104+
if err != nil {
105+
return false
106+
}
107+
108+
columnTypeName := removeLowCardinality(columnType.Name)
109+
110+
if isNumericType(columnTypeName) {
111+
if incomingValueType, isBaseType := incomingValueType.(clickhouse.BaseType); isBaseType && validateNumericType(columnTypeName, incomingValueType.Name, value) {
112+
// Numeric types match!
113+
return true
114+
}
115+
}
178116

179-
// Make sure that all elements of the array have the same type
180-
for _, e := range valueAsArray {
181-
eTypeName := getTypeName(e)
182-
// Check if the type names are exactly the same. However, if it's a numeric type, perform more advanced
183-
// validation (to handle an array like [3.5, 4, 4.5]).
184-
if isNumericType(eTypeName) && isNumericType(innerTypeName) {
185-
if !validateNumericType(innerTypeName, eTypeName, e) {
186-
innerTypesAreCompatible = false
187-
break
117+
if incomingValueType, isBaseType := incomingValueType.(clickhouse.BaseType); isBaseType && incomingValueType.Name == columnTypeName {
118+
// Types match exactly!
119+
return true
120+
}
121+
122+
return false
123+
case clickhouse.MultiValueType:
124+
if columnType.Name == "Tuple" {
125+
if value, isMap := value.(map[string]interface{}); isMap {
126+
for key, elem := range value {
127+
subtype := columnType.GetColumn(key)
128+
if subtype == nil {
129+
return false
188130
}
189-
} else {
190-
if getTypeName(e) != innerTypeName {
191-
innerTypesAreCompatible = false
192-
break
131+
if !validateValueAgainstType(fmt.Sprintf("%s.%s", fieldName, key), elem, subtype.Type) {
132+
return false
193133
}
194134
}
135+
return true
195136
}
137+
} else {
138+
logger.Error().Msgf("MultiValueType validation is not yet supported for type: %v", columnType)
139+
}
196140

197-
if innerTypesAreCompatible {
198-
return validateValueAgainstType(fieldName, valueAsArray[0], compoundType.BaseType)
141+
return false
142+
case clickhouse.CompoundType:
143+
if columnType.Name == "Array" {
144+
if value, isArray := value.([]interface{}); isArray {
145+
for i, elem := range value {
146+
if !validateValueAgainstType(fmt.Sprintf("%s[%d]", fieldName, i), elem, columnType.BaseType) {
147+
return false
148+
}
149+
}
150+
return true
199151
}
152+
} else {
153+
logger.Error().Msgf("CompoundType validation is not yet supported for type: %v", columnType)
200154
}
201-
}
202-
203-
const DateTimeType = "DateTime64"
204-
const StringType = "String"
205-
deletedFields := make(types.JSON, 0)
206-
columnType := targetColumnType.String()
207-
columnType = removeLowCardinality(columnType)
208-
incomingValueType := getTypeName(value)
209155

210-
// Hot path
211-
if columnType == incomingValueType {
212-
return deletedFields
156+
return false
213157
}
214158

215-
if columnType == DateTimeType {
216-
// TODO validate date format
217-
// For now we store dates as strings
218-
if incomingValueType != StringType {
219-
deletedFields[fieldName] = value
220-
}
221-
} else if isNumericType(columnType) {
222-
if !validateNumericType(columnType, incomingValueType, value) {
223-
deletedFields[fieldName] = value
224-
}
225-
} else if columnType != incomingValueType {
226-
deletedFields[fieldName] = value
227-
}
228-
return deletedFields
159+
return false
229160
}
230161

231162
// validateIngest validates the document against the table schema
@@ -247,8 +178,8 @@ func (ip *IngestProcessor) validateIngest(tableName string, document types.JSON)
247178
if value == nil {
248179
continue
249180
}
250-
for k, v := range validateValueAgainstType(columnName, value, column.Type) {
251-
deletedFields[k] = v
181+
if !validateValueAgainstType(columnName, value, column.Type) {
182+
deletedFields[columnName] = value
252183
}
253184
}
254185
}

quesma/ingest/ingest_validator_test.go

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,21 @@ import (
1919
"testing"
2020
)
2121

22-
func TestGetTypeName(t *testing.T) {
23-
values := make(map[string][]interface{})
24-
values["UInt64"] = []interface{}{1}
25-
values["Float64"] = []interface{}{1.1}
26-
values["Int64"] = []interface{}{-1}
27-
values["String"] = []interface{}{"string"}
28-
values["Bool"] = []interface{}{true}
29-
values["Array(UInt64)"] = []interface{}{[]interface{}{1}}
30-
values["Array(Int64)"] = []interface{}{[]interface{}{-1}}
31-
values["Array(Array(Int64))"] = []interface{}{[][]interface{}{{-1}}}
32-
values["Array(Array(Array(Int64)))"] = []interface{}{[][][]interface{}{{{-1}}}}
33-
for typeName, values := range values {
34-
for _, value := range values {
35-
t.Run(typeName, func(t *testing.T) {
36-
assert.NotNil(t, value)
37-
assert.Equal(t, typeName, getTypeName(value))
38-
})
39-
}
40-
}
41-
}
42-
4322
func TestValidateIngest(t *testing.T) {
4423
floatCol := &clickhouse.Column{Name: "float_field", Type: clickhouse.BaseType{
4524
Name: "Float64",
4625
GoType: clickhouse.NewBaseType("float64").GoType,
4726
}}
4827

4928
invalidJson := validateValueAgainstType("float", 1, floatCol.Type)
50-
assert.Equal(t, 0, len(invalidJson))
29+
assert.True(t, invalidJson)
5130
StringCol := &clickhouse.Column{Name: "float_field", Type: clickhouse.BaseType{
5231
Name: "String",
5332
GoType: clickhouse.NewBaseType("string").GoType,
5433
}}
5534

5635
invalidJson = validateValueAgainstType("string", 1, StringCol.Type)
57-
assert.Equal(t, 1, len(invalidJson))
36+
assert.False(t, invalidJson)
5837

5938
}
6039

@@ -100,6 +79,45 @@ func TestIngestValidation(t *testing.T) {
10079

10180
`{"float_array_field":[3.14, 6.28, 0.99]}`,
10281
`{"float_array_field":[1, 2, 3]}`,
82+
83+
`{"nested_array_map_field": [
84+
[
85+
[
86+
{"field1": "value1", "field2": [1, 2, 3]},
87+
{"field1": "value2", "field2": [4, 5, 6]}
88+
],
89+
[
90+
{"field1": "value3", "field2": [7, 8, 9]},
91+
{"field1": "value4", "field2": [10, 11, 12]}
92+
]
93+
],
94+
[
95+
[
96+
{"field1": "value1", "field2": [1, 2, 3]}
97+
]
98+
],
99+
[]
100+
]}`,
101+
`{"nested_array_map_field": [
102+
[],
103+
[
104+
[],
105+
[{}],
106+
[
107+
{"field1": "value1", "field2": [1, 2, 3]},
108+
{"field1": "value2", "field2": [4, 5, 6]}
109+
],
110+
[
111+
{"field1": "value3", "field2": [7, 8, 9]},
112+
{"field1": "value4", "field2": [10, 11, 12]}
113+
]
114+
],
115+
[
116+
[
117+
{"field1": "value1", "field2": [1, 2, 3]}
118+
]
119+
],
120+
]}`,
103121
}
104122
expectedInsertJsons := []string{
105123
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"string_field":"10"},"attributes_metadata":{"string_field":"v1;Int64"}}`, tableName),
@@ -130,6 +148,9 @@ func TestIngestValidation(t *testing.T) {
130148

131149
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_array_field":[3.14,6.28,0.99]}`, tableName),
132150
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_array_field":[1,2,3]}`, tableName),
151+
152+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"nested_array_map_field":[[[{"field1":"value1","field2":[1,2,3]},{"field1":"value2","field2":[4,5,6]}],[{"field1":"value3","field2":[7,8,9]},{"field1":"value4","field2":[10,11,12]}]],[[{"field1":"value1","field2":[1,2,3]}]],[]]}`, tableName),
153+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"nested_array_map_field":[[],[[],[{}],[{"field1":"value1","field2":[1,2,3]},{"field1":"value2","field2":[4,5,6]}],[{"field1":"value3","field2":[7,8,9]},{"field1":"value4","field2":[10,11,12]}]],[[{"field1":"value1","field2":[1,2,3]}]]]}`, tableName),
133154
}
134155
tableMap := util.NewSyncMapWith(tableName, &clickhouse.Table{
135156
Name: tableName,
@@ -176,6 +197,38 @@ func TestIngestValidation(t *testing.T) {
176197
GoType: clickhouse.NewBaseType("Float64").GoType,
177198
},
178199
}},
200+
// Array(Array(Array(Tuple(field1 String, field2 Array(Int64)))))
201+
"nested_array_map_field": {Name: "nested_array_map_field", Type: clickhouse.CompoundType{
202+
Name: "Array",
203+
BaseType: clickhouse.CompoundType{
204+
Name: "Array",
205+
BaseType: clickhouse.CompoundType{
206+
Name: "Array",
207+
BaseType: clickhouse.MultiValueType{
208+
Name: "Tuple",
209+
Cols: []*clickhouse.Column{
210+
{
211+
Name: "field1",
212+
Type: clickhouse.BaseType{
213+
Name: "String",
214+
GoType: clickhouse.NewBaseType("String").GoType,
215+
},
216+
},
217+
{
218+
Name: "field2",
219+
Type: clickhouse.CompoundType{
220+
Name: "Array",
221+
BaseType: clickhouse.BaseType{
222+
Name: "Int64",
223+
GoType: clickhouse.NewBaseType("Int64").GoType,
224+
},
225+
},
226+
},
227+
},
228+
},
229+
},
230+
},
231+
}},
179232
},
180233
Created: true,
181234
})

0 commit comments

Comments
 (0)