Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 04ba601

Browse files
authored
Skip invalid/incomplete types in CREATE TABLE (#1256)
When a user sends a JSON with a `nil` field, we can't determine a ClickHouse type for such field (column). This is the case for `nil` (we used to fall back to String), an empty array or empty dictionary. This PR changes the logic in `NewType()` - when the type is determined to be invalid/incomplete, the function now returns an error. The `CREATE TABLE` code now can skip such problematic column. If a user sends such field with complete type, it will be handled in some future `ALTER TABLE ADD COLUMN` statement (as now Quesma sees an example with complete type).
1 parent 41c2a02 commit 04ba601

File tree

6 files changed

+237
-35
lines changed

6 files changed

+237
-35
lines changed

ci/it/configs/quesma-ingest.yml.template

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ processors:
9999
encodings_test:
100100
target:
101101
- my-clickhouse-instance
102+
incomplete_types_test:
103+
target:
104+
- my-clickhouse-instance
102105
"*":
103106
target:
104107
- my-clickhouse-instance
@@ -177,6 +180,9 @@ processors:
177180
encodings_test:
178181
target:
179182
- my-clickhouse-instance
183+
incomplete_types_test:
184+
target:
185+
- my-clickhouse-instance
180186
"*":
181187
target:
182188
- my-clickhouse-instance

ci/it/testcases/test_ingest.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (a *IngestTestcase) RunTests(ctx context.Context, t *testing.T) error {
4141
t.Run("test ignored fields", func(t *testing.T) { a.testIgnoredFields(ctx, t) })
4242
t.Run("test nested fields", func(t *testing.T) { a.testNestedFields(ctx, t) })
4343
t.Run("test field encodings (mappings bug)", func(t *testing.T) { a.testFieldEncodingsMappingsBug(ctx, t) })
44+
t.Run("test incomplete types", func(t *testing.T) { a.testIncompleteTypes(ctx, t) })
4445
return nil
4546
}
4647

@@ -645,3 +646,165 @@ func (a *IngestTestcase) testFieldEncodingsMappingsBug(ctx context.Context, t *t
645646
assert.Equal(t, "quesmaMetadataV1:fieldName=Field1", comments["field1"])
646647
assert.Equal(t, "quesmaMetadataV1:fieldName=Field2", comments["field2"])
647648
}
649+
650+
// Test "incomplete" types (e.g. null, empty array, empty object) for which Quesma can't infer a ClickHouse type.
651+
func (a *IngestTestcase) testIncompleteTypes(ctx context.Context, t *testing.T) {
652+
doc := []byte(`
653+
{
654+
"field1": "abc",
655+
"field2": null,
656+
657+
"field3": ["def", "ghi"],
658+
"field4": [],
659+
"field5": [[]],
660+
661+
"field6": {"ijk": "klm"},
662+
"field7": {},
663+
"field8": {"cde": {}},
664+
665+
"field9": [[{"nop": "qrs"}]],
666+
"field10": [[{}]]
667+
}
668+
`)
669+
resp, body := a.RequestToQuesma(ctx, t, "POST", "/incomplete_types_test/_doc", doc)
670+
assert.Equal(t, http.StatusOK, resp.StatusCode)
671+
assert.NotContains(t, string(body), "error")
672+
673+
cols, err := a.FetchClickHouseColumns(ctx, "incomplete_types_test")
674+
assert.NoError(t, err, "error fetching clickhouse columns")
675+
676+
expectedCols := map[string]string{
677+
"@timestamp": "DateTime64(3)",
678+
"attributes_metadata": "Map(String, String)",
679+
"attributes_values": "Map(String, String)",
680+
681+
"field1": "Nullable(String)",
682+
// field2 is null; null can be a Nullable(String), Nullable(Int64), ...
683+
684+
"field3": "Array(String)",
685+
// field4 is an empty array; an empty array can be a Array(String), Array(Int64), ...
686+
// field5 is an array of empty array; it could be an Array(Array(String)), Array(Array(Int64)), ...
687+
688+
"field6_ijk": "Nullable(String)",
689+
// field7 is an empty object; it could be a Tuple(field1 String), Tuple(field1 Int64, field2 String), ...
690+
// field8 is an object with an empty object; it could be a Tuple(cde Tuple(subfield Int64)), Tuple(cde Tuple(subfield String)), ...
691+
692+
"field9": "Array(Array(Tuple(nop Nullable(String))))",
693+
// field10 is an array of arrays of empty objects; it could be an Array(Array(Tuple(subfield Int64))), Array(Array(Tuple(subfield String))), ...
694+
}
695+
assert.Equal(t, expectedCols, cols)
696+
697+
// Insert a similar document again (now that the table is already created)
698+
doc2 := []byte(`
699+
{
700+
"field1": "QUESMA_DOC2_1",
701+
"field2": null,
702+
703+
"field3": ["QUESMA_DOC2_2", "QUESMA_DOC2_3"],
704+
"field4": [],
705+
"field5": [[]],
706+
707+
"field6": {"ijk": "QUESMA_DOC2_4"},
708+
"field7": {},
709+
"field8": {"cde": {}},
710+
711+
"field9": [[{"nop": "QUESMA_DOC2_5"}]],
712+
"field10": [[{}]]
713+
}
714+
`)
715+
resp, body = a.RequestToQuesma(ctx, t, "POST", "/incomplete_types_test/_doc", doc2)
716+
assert.Equal(t, http.StatusOK, resp.StatusCode)
717+
assert.NotContains(t, string(body), "error")
718+
719+
cols, err = a.FetchClickHouseColumns(ctx, "incomplete_types_test")
720+
assert.NoError(t, err, "error fetching clickhouse columns")
721+
assert.Equal(t, expectedCols, cols)
722+
723+
// Insert a document with all fields with complete types, testing ALTER TABLE ADD COLUMN behavior.
724+
doc3 := []byte(`
725+
{
726+
"field1": "QUESMA_DOC3_1",
727+
"field2": "QUESMA_DOC3_2",
728+
729+
"field3": ["QUESMA_DOC3_3", "QUESMA_DOC3_4"],
730+
"field4": ["QUESMA_DOC3_5"],
731+
"field5": [["QUESMA_DOC3_6"]],
732+
733+
"field6": {"ijk": "QUESMA_DOC3_7"},
734+
"field7": {"klm": "QUESMA_DOC3_8"},
735+
"field8": {"cde": {"efg":"QUESMA_DOC3_9"}},
736+
737+
"field9": [[{"nop": "QUESMA_DOC3_10"}]],
738+
"field10": [[{"asd": "QUESMA_DOC3_11"}]]
739+
}
740+
`)
741+
resp, body = a.RequestToQuesma(ctx, t, "POST", "/incomplete_types_test/_doc", doc3)
742+
assert.Equal(t, http.StatusOK, resp.StatusCode)
743+
assert.NotContains(t, string(body), "error")
744+
745+
expectedCols = map[string]string{
746+
"@timestamp": "DateTime64(3)",
747+
"attributes_metadata": "Map(String, String)",
748+
"attributes_values": "Map(String, String)",
749+
750+
"field1": "Nullable(String)",
751+
"field2": "Nullable(String)",
752+
753+
"field3": "Array(String)",
754+
"field4": "Array(String)",
755+
"field5": "Array(Array(String))",
756+
757+
"field6_ijk": "Nullable(String)",
758+
"field7_klm": "Nullable(String)",
759+
"field8_cde_efg": "Nullable(String)",
760+
761+
"field9": "Array(Array(Tuple(nop Nullable(String))))",
762+
"field10": "Array(Array(Tuple(asd Nullable(String))))",
763+
}
764+
cols, err = a.FetchClickHouseColumns(ctx, "incomplete_types_test")
765+
assert.NoError(t, err, "error fetching clickhouse columns")
766+
assert.Equal(t, expectedCols, cols)
767+
768+
// Verify that DOC2 and DOC3 were correctly inserted.
769+
rows, err := a.ExecuteClickHouseQuery(ctx, "SELECT toString(field1), toString(field2), toString(field3), toString(field4), toString(field5), toString(field6_ijk), toString(field7_klm), toString(field8_cde_efg), toString(field9), toString(field10) FROM incomplete_types_test WHERE field1 IN ('QUESMA_DOC2_1', 'QUESMA_DOC3_1') ORDER BY field1")
770+
assert.NoError(t, err)
771+
defer rows.Close()
772+
773+
var results []struct {
774+
cols []interface{}
775+
}
776+
for rows.Next() {
777+
r := make([]interface{}, 10)
778+
valPtrs := make([]interface{}, 10)
779+
for i := range r {
780+
valPtrs[i] = &r[i]
781+
}
782+
err = rows.Scan(valPtrs...)
783+
assert.NoError(t, err)
784+
results = append(results, struct{ cols []interface{} }{cols: r})
785+
}
786+
assert.Equal(t, 2, len(results))
787+
788+
assert.Contains(t, *results[0].cols[0].(*string), "QUESMA_DOC2_1")
789+
assert.Contains(t, results[0].cols[2].(string), "QUESMA_DOC2_2")
790+
assert.Contains(t, results[0].cols[2].(string), "QUESMA_DOC2_3")
791+
assert.Contains(t, *results[0].cols[5].(*string), "QUESMA_DOC2_4")
792+
793+
// FIXME: this is not inserted correctly yet!
794+
// assert.Contains(t, results[0].cols[8].(string), "QUESMA_DOC2_5")
795+
796+
assert.Contains(t, *results[1].cols[0].(*string), "QUESMA_DOC3_1")
797+
assert.Contains(t, *results[1].cols[1].(*string), "QUESMA_DOC3_2")
798+
assert.Contains(t, results[1].cols[2].(string), "QUESMA_DOC3_3")
799+
assert.Contains(t, results[1].cols[2].(string), "QUESMA_DOC3_4")
800+
assert.Contains(t, results[1].cols[3].(string), "QUESMA_DOC3_5")
801+
assert.Contains(t, results[1].cols[4].(string), "QUESMA_DOC3_6")
802+
assert.Contains(t, *results[1].cols[5].(*string), "QUESMA_DOC3_7")
803+
assert.Contains(t, *results[1].cols[6].(*string), "QUESMA_DOC3_8")
804+
assert.Contains(t, *results[1].cols[7].(*string), "QUESMA_DOC3_9")
805+
806+
// FIXME: this is not inserted correctly yet!
807+
// assert.Contains(t, results[1].cols[8].(string), "QUESMA_DOC3_10")
808+
809+
assert.Contains(t, results[1].cols[9].(string), "QUESMA_DOC3_11")
810+
}

quesma/clickhouse/parser.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,13 @@ func BuildAttrsMap(m SchemaMap, config *ChTableConfig) (map[string][]interface{}
156156
if a.Type.CanConvert(value) {
157157
result[a.KeysArrayName] = append(result[a.KeysArrayName], name)
158158
result[a.ValuesArrayName] = append(result[a.ValuesArrayName], fmt.Sprintf("%v", value))
159-
result[a.TypesArrayName] = append(result[a.TypesArrayName], NewType(value, name).String())
159+
160+
valueType, err := NewType(value, name)
161+
if err != nil {
162+
result[a.TypesArrayName] = append(result[a.TypesArrayName], UndefinedType)
163+
} else {
164+
result[a.TypesArrayName] = append(result[a.TypesArrayName], valueType.String())
165+
}
160166

161167
matched = true
162168
break

quesma/clickhouse/schema.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ const (
2121
attributesColumnType = "Map(String, String)" // ClickHouse type of AttributesValuesColumn, AttributesMetadataColumn
2222
AttributesValuesColumn = "attributes_values"
2323
AttributesMetadataColumn = "attributes_metadata"
24+
25+
UndefinedType = "Undefined" // used for unknown types or incomplete types for which NewType can't infer a proper type
2426
)
2527

2628
type (
@@ -231,50 +233,63 @@ func ResolveType(clickHouseTypeName string) reflect.Type {
231233
}
232234

233235
// 'value': value of a field, from unmarshalled JSON
234-
func NewType(value any, valueOrigin string) Type {
236+
// 'valueOrigin': name of the field (for error messages)
237+
func NewType(value any, valueOrigin string) (Type, error) {
235238
isFloatInt := func(f float64) bool {
236239
return math.Mod(f, 1.0) == 0.0
237240
}
238241
switch valueCasted := value.(type) {
239242
case string:
240243
t, err := time.Parse(time.RFC3339Nano, valueCasted)
241244
if err == nil {
242-
return BaseType{Name: "DateTime64", GoType: reflect.TypeOf(t)}
245+
return BaseType{Name: "DateTime64", GoType: reflect.TypeOf(t)}, nil
243246
}
244247
t, err = time.Parse("2006-01-02T15:04:05", valueCasted)
245248
if err == nil {
246-
return BaseType{Name: "DateTime64", GoType: reflect.TypeOf(t)}
249+
return BaseType{Name: "DateTime64", GoType: reflect.TypeOf(t)}, nil
247250
}
248-
return BaseType{Name: "String", GoType: reflect.TypeOf("")}
251+
return BaseType{Name: "String", GoType: reflect.TypeOf("")}, nil
249252
case float64:
250253
if isFloatInt(valueCasted) {
251-
return BaseType{Name: "Int64", GoType: reflect.TypeOf(int64(0))}
254+
return BaseType{Name: "Int64", GoType: reflect.TypeOf(int64(0))}, nil
252255
} else {
253-
return BaseType{Name: "Float64", GoType: reflect.TypeOf(float64(0))}
256+
return BaseType{Name: "Float64", GoType: reflect.TypeOf(float64(0))}, nil
254257
}
255258
case bool:
256-
return BaseType{Name: "Bool", GoType: reflect.TypeOf(true)}
259+
return BaseType{Name: "Bool", GoType: reflect.TypeOf(true)}, nil
257260
case map[string]interface{}:
258261
cols := make([]*Column, len(valueCasted))
259262
for k, v := range valueCasted {
260-
if v != nil {
261-
cols = append(cols, &Column{Name: k, Type: NewType(v, fmt.Sprintf("%s.%s", valueOrigin, k)), Codec: Codec{Name: ""}})
263+
innerName := fmt.Sprintf("%s.%s", valueOrigin, k)
264+
innerType, err := NewType(v, innerName)
265+
if err != nil {
266+
return nil, err
262267
}
268+
cols = append(cols, &Column{Name: k, Type: innerType, Codec: Codec{Name: ""}})
269+
}
270+
if len(cols) == 0 {
271+
logger.Warn().Msgf("Empty map type (origin: %s).", valueOrigin)
272+
return nil, fmt.Errorf("empty map type (origin: %s)", valueOrigin)
263273
}
264-
return MultiValueType{Name: "Tuple", Cols: cols}
274+
return MultiValueType{Name: "Tuple", Cols: cols}, nil
265275
case []interface{}:
266276
if len(valueCasted) == 0 {
267-
// empty array defaults to string for now, maybe change needed or error returned
268-
return CompoundType{Name: "Array", BaseType: NewBaseType("String")}
277+
logger.Warn().Msgf("Empty array type (origin: %s).", valueOrigin)
278+
return nil, fmt.Errorf("empty array type (origin: %s)", valueOrigin)
279+
}
280+
innerName := fmt.Sprintf("%s[0]", valueOrigin)
281+
innerType, err := NewType(valueCasted[0], innerName)
282+
if err != nil {
283+
return nil, err
269284
}
270-
return CompoundType{Name: "Array", BaseType: NewType(valueCasted[0], fmt.Sprintf("%s[0]", valueOrigin))}
285+
return CompoundType{Name: "Array", BaseType: innerType}, nil
286+
case nil:
287+
logger.Warn().Msgf("Nil type (origin: %s).", valueOrigin)
288+
return nil, fmt.Errorf("nil type (origin: %s)", valueOrigin)
271289
}
272290

273291
logger.Warn().Msgf("Unsupported type '%T' of value: %v (origin: %s).", value, value, valueOrigin)
274-
275-
// value can be nil, so should return something reasonable here
276-
return BaseType{Name: "String", GoType: reflect.TypeOf("")}
277-
292+
return nil, fmt.Errorf("unsupported type '%T' of value: %v (origin: %s)", value, value, valueOrigin)
278293
}
279294

280295
func NewTable(createTableQuery string, config *ChTableConfig) (*Table, error) {

quesma/ingest/parser.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,18 @@ func JsonToColumns(m SchemaMap, chConfig *clickhouse.ChTableConfig) []CreateTabl
9090
var resultColumns []CreateTableEntry
9191

9292
for name, value := range m {
93-
var fTypeString string
94-
if value == nil { // HACK ALERT -> We're treating null values as strings for now, so that we don't completely discard documents with empty values
95-
fTypeString = "Nullable(String)"
96-
} else {
97-
fType := clickhouse.NewType(value, name)
98-
99-
// handle "field":{} case (Elastic Agent sends such JSON fields) by ignoring them
100-
if multiValueType, ok := fType.(clickhouse.MultiValueType); ok && len(multiValueType.Cols) == 0 {
101-
logger.Warn().Msgf("Ignoring empty JSON object: \"%s\":%v", name, value)
102-
continue
103-
}
93+
fType, err := clickhouse.NewType(value, name)
94+
if err != nil {
95+
// Skip column with invalid/incomplete type
96+
logger.Warn().Msgf("Skipping field '%s' with invalid/incomplete type: %v", name, err)
97+
continue
98+
}
10499

105-
fTypeString = fType.String()
106-
if !strings.Contains(fTypeString, "Array") && !strings.Contains(fTypeString, "DateTime") {
107-
fTypeString = "Nullable(" + fTypeString + ")"
108-
}
100+
fTypeString := fType.String()
101+
if !strings.Contains(fTypeString, "Array") && !strings.Contains(fTypeString, "DateTime") {
102+
fTypeString = "Nullable(" + fTypeString + ")"
109103
}
104+
110105
// hack for now
111106
if name == timestampFieldName && chConfig.TimestampDefaultsNow {
112107
fTypeString += " DEFAULT now64()"
@@ -315,7 +310,13 @@ func BuildAttrsMap(m SchemaMap, config *clickhouse.ChTableConfig) (map[string][]
315310
if a.Type.CanConvert(value) {
316311
result[a.KeysArrayName] = append(result[a.KeysArrayName], name)
317312
result[a.ValuesArrayName] = append(result[a.ValuesArrayName], fmt.Sprintf("%v", value))
318-
result[a.TypesArrayName] = append(result[a.TypesArrayName], clickhouse.NewType(value, name).String())
313+
314+
valueType, err := clickhouse.NewType(value, name)
315+
if err != nil {
316+
result[a.TypesArrayName] = append(result[a.TypesArrayName], clickhouse.UndefinedType)
317+
} else {
318+
result[a.TypesArrayName] = append(result[a.TypesArrayName], valueType.String())
319+
}
319320

320321
matched = true
321322
break

quesma/ingest/processor.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,13 @@ func addInvalidJsonFieldsToAttributes(attrsMap map[string][]interface{}, invalid
258258
for k, v := range invalidJson {
259259
newAttrsMap[chLib.DeprecatedAttributesKeyColumn] = append(newAttrsMap[chLib.DeprecatedAttributesKeyColumn], k)
260260
newAttrsMap[chLib.DeprecatedAttributesValueColumn] = append(newAttrsMap[chLib.DeprecatedAttributesValueColumn], v)
261-
newAttrsMap[chLib.DeprecatedAttributesValueType] = append(newAttrsMap[chLib.DeprecatedAttributesValueType], chLib.NewType(v, k).String())
261+
262+
valueType, err := chLib.NewType(v, k)
263+
if err != nil {
264+
newAttrsMap[chLib.DeprecatedAttributesValueType] = append(newAttrsMap[chLib.DeprecatedAttributesValueType], chLib.UndefinedType)
265+
} else {
266+
newAttrsMap[chLib.DeprecatedAttributesValueType] = append(newAttrsMap[chLib.DeprecatedAttributesValueType], valueType.String())
267+
}
262268
}
263269
return newAttrsMap
264270
}
@@ -308,6 +314,11 @@ func (ip *IngestProcessor) generateNewColumns(
308314

309315
columnType := ""
310316
modifiers := ""
317+
318+
if attrTypes[i] == chLib.UndefinedType {
319+
continue
320+
}
321+
311322
// Array and Map are not Nullable
312323
if strings.Contains(attrTypes[i], "Array") || strings.Contains(attrTypes[i], "Map") {
313324
columnType = attrTypes[i]

0 commit comments

Comments
 (0)