Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit d46cf78

Browse files
authored
Improve validation of Array types (#1248)
Before this change, `validateValueAgainstType` would just compare the type name string in case of arrays. This could cause problems in case of arrays of numbers, for example if the user had an `Array(Float64)` ClickHouse column, but tried to insert an array of ints (e.g. `[3, 5, 7]`). After this fix, the `validateValueAgainstType` logic now works recursively, validating the element type of `Array` (e.g. `Float64` of `Array(Float64)`) and comparing that with an element of incoming array from Elastic API.
1 parent f553c27 commit d46cf78

File tree

2 files changed

+48
-5
lines changed

2 files changed

+48
-5
lines changed

quesma/ingest/ingest_validator.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,41 @@ func validateNumericType(columnType string, incomingValueType string, value inte
169169
return false
170170
}
171171

172-
func validateValueAgainstType(fieldName string, value interface{}, column *clickhouse.Column) types.JSON {
172+
func validateValueAgainstType(fieldName string, value interface{}, targetColumnType clickhouse.Type) types.JSON {
173+
// Validate Array() types by recursing on the inner type:
174+
if compoundType, isCompound := targetColumnType.(clickhouse.CompoundType); isCompound && compoundType.Name == "Array" {
175+
if valueAsArray, isArray := value.([]interface{}); isArray && len(valueAsArray) > 0 {
176+
innerTypesAreCompatible := true
177+
innerTypeName := getTypeName(valueAsArray[0])
178+
179+
// Make sure that all elements of the array have the same type
180+
for _, e := range valueAsArray {
181+
eTypeName := getTypeName(e)
182+
// Check if the type names are exactly the same. However, if it's a numeric type, perform more advanced
183+
// validation (to handle an array like [3.5, 4, 4.5]).
184+
if isNumericType(eTypeName) && isNumericType(innerTypeName) {
185+
if !validateNumericType(innerTypeName, eTypeName, e) {
186+
innerTypesAreCompatible = false
187+
break
188+
}
189+
} else {
190+
if getTypeName(e) != innerTypeName {
191+
innerTypesAreCompatible = false
192+
break
193+
}
194+
}
195+
}
196+
197+
if innerTypesAreCompatible {
198+
return validateValueAgainstType(fieldName, valueAsArray[0], compoundType.BaseType)
199+
}
200+
}
201+
}
202+
173203
const DateTimeType = "DateTime64"
174204
const StringType = "String"
175205
deletedFields := make(types.JSON, 0)
176-
columnType := column.Type.String()
206+
columnType := targetColumnType.String()
177207
columnType = removeLowCardinality(columnType)
178208
incomingValueType := getTypeName(value)
179209

@@ -217,7 +247,7 @@ func (ip *IngestProcessor) validateIngest(tableName string, document types.JSON)
217247
if value == nil {
218248
continue
219249
}
220-
for k, v := range validateValueAgainstType(columnName, value, column) {
250+
for k, v := range validateValueAgainstType(columnName, value, column.Type) {
221251
deletedFields[k] = v
222252
}
223253
}

quesma/ingest/ingest_validator_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ func TestValidateIngest(t *testing.T) {
4646
GoType: clickhouse.NewBaseType("float64").GoType,
4747
}}
4848

49-
invalidJson := validateValueAgainstType("float", 1, floatCol)
49+
invalidJson := validateValueAgainstType("float", 1, floatCol.Type)
5050
assert.Equal(t, 0, len(invalidJson))
5151
StringCol := &clickhouse.Column{Name: "float_field", Type: clickhouse.BaseType{
5252
Name: "String",
5353
GoType: clickhouse.NewBaseType("string").GoType,
5454
}}
5555

56-
invalidJson = validateValueAgainstType("string", 1, StringCol)
56+
invalidJson = validateValueAgainstType("string", 1, StringCol.Type)
5757
assert.Equal(t, 1, len(invalidJson))
5858

5959
}
@@ -97,6 +97,9 @@ func TestIngestValidation(t *testing.T) {
9797
`{"uint8_field":-1}`,
9898
`{"uint8_field":255}`,
9999
`{"uint8_field":1000}`,
100+
101+
`{"float_array_field":[3.14, 6.28, 0.99]}`,
102+
`{"float_array_field":[1, 2, 3]}`,
100103
}
101104
expectedInsertJsons := []string{
102105
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"string_field":"10"},"attributes_metadata":{"string_field":"v1;Int64"}}`, tableName),
@@ -124,6 +127,9 @@ func TestIngestValidation(t *testing.T) {
124127
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"uint8_field":"-1"},"attributes_metadata":{"uint8_field":"v1;Int64"}}`, tableName),
125128
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"uint8_field":255}`, tableName),
126129
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"uint8_field":"1000"},"attributes_metadata":{"uint8_field":"v1;Int64"}}`, tableName),
130+
131+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_array_field":[3.14,6.28,0.99]}`, tableName),
132+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_array_field":[1,2,3]}`, tableName),
127133
}
128134
tableMap := util.NewSyncMapWith(tableName, &clickhouse.Table{
129135
Name: tableName,
@@ -163,6 +169,13 @@ func TestIngestValidation(t *testing.T) {
163169
GoType: clickhouse.NewBaseType("Int64").GoType,
164170
},
165171
}},
172+
"float_array_field": {Name: "float_array_field", Type: clickhouse.CompoundType{
173+
Name: "Array",
174+
BaseType: clickhouse.BaseType{
175+
Name: "Float64",
176+
GoType: clickhouse.NewBaseType("Float64").GoType,
177+
},
178+
}},
166179
},
167180
Created: true,
168181
})

0 commit comments

Comments
 (0)