Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit c6ee5bd

Browse files
authored
Make JSON in a structural way (#1470)
This PR addresses the way we construct the JSON sent to the database. Instead of hand-crafting JSON, we use JSON marshaller. JSON marshaller handles quoting better. Some tests are fixed, either.
1 parent 26cf26d commit c6ee5bd

File tree

2 files changed

+100
-43
lines changed

2 files changed

+100
-43
lines changed

platform/ingest/ingest_validator_test.go

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/QuesmaOrg/quesma/platform/util"
1616
mux "github.com/QuesmaOrg/quesma/platform/v2/core"
1717
"github.com/stretchr/testify/assert"
18+
"reflect"
1819
"strings"
1920
"testing"
2021
)
@@ -122,33 +123,32 @@ func TestIngestValidation(t *testing.T) {
122123
]}`,
123124
}
124125
expectedInsertJsons := []string{
125-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"string_field":"10"},"attributes_metadata":{"string_field":"v1;Int64"}}`, tableName),
126+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"string_field":"v1;Int64"},"attributes_values":{"string_field":"10"}}`, tableName),
126127
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"string_field":"10"}`, tableName),
127128

128129
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"int_field":15}`, tableName),
129130
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"int_field":15}`, tableName),
130131

131132
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"int_field":"15"}`, tableName),
132-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"int_field":"1.5"},"attributes_metadata":{"int_field":"v1;String"}}`, tableName),
133-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"string_field":"15"},"attributes_metadata":{"string_field":"v1;Int64"}}`, tableName),
134-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"string_field":"1.5"},"attributes_metadata":{"string_field":"v1;Float64"}}`, tableName),
133+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"int_field":"v1;String"},"attributes_values":{"int_field":"1.5"}}`, tableName),
134+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"string_field":"v1;Int64"},"attributes_values":{"string_field":"15"}}`, tableName),
135+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"string_field":"v1;Float64"},"attributes_values":{"string_field":"1.5"}}`, tableName),
135136
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"int_array_field":[81,85,69,83,77,65]}`, tableName),
136137
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"string_array_field":["DHRFZN","HLVJDR"]}`, tableName),
137-
138-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"int_array_field":"[81,\\"oops\\",69,83,77,65]"},"attributes_metadata":{"int_array_field":"v1;Array(Int64)"}}`, tableName),
139-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"string_array_field":"[\\"DHRFZN\\",15,\\"HLVJDR\\"]"},"attributes_metadata":{"string_array_field":"v1;Array(String)"}}`, tableName),
138+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"int_array_field":"v1;Array(Int64)"},"attributes_values":{"int_array_field":"[81,\\\"oops\\\",69,83,77,65]"}}`, tableName),
139+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"string_array_field":"v1;Array(String)"},"attributes_values":{"string_array_field":"[\\\"DHRFZN\\\",15,\\\"HLVJDR\\\"]"}}`, tableName),
140140
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"int32_field":15}`, tableName),
141141
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_field":7.5}`, tableName),
142142
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_field":15}`, tableName),
143143
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_field":"15"}`, tableName),
144144
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_field":"15.55"}`, tableName),
145145

146146
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"int32_field":2147483647}`, tableName),
147-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"int32_field":"2147483648"},"attributes_metadata":{"int32_field":"v1;Int64"}}`, tableName),
147+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"int32_field":"v1;Int64"},"attributes_values":{"int32_field":"2147483648"}}`, tableName),
148148

149-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"uint8_field":"-1"},"attributes_metadata":{"uint8_field":"v1;Int64"}}`, tableName),
149+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"uint8_field":"v1;Int64"},"attributes_values":{"uint8_field":"-1"}}`, tableName),
150150
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"uint8_field":255}`, tableName),
151-
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_values":{"uint8_field":"1000"},"attributes_metadata":{"uint8_field":"v1;Int64"}}`, tableName),
151+
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"attributes_metadata":{"uint8_field":"v1;Int64"},"attributes_values":{"uint8_field":"1000"}}`, tableName),
152152

153153
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_array_field":[3.14,6.28,0.99]}`, tableName),
154154
fmt.Sprintf(`INSERT INTO "%s" FORMAT JSONEachRow {"float_array_field":[1,2,3]}`, tableName),
@@ -235,8 +235,66 @@ func TestIngestValidation(t *testing.T) {
235235
}},
236236
},
237237
})
238+
239+
splitInsertJSONEachRow := func(sql string) (prefix, jsonPart string, ok bool) {
240+
idx := strings.Index(sql, "{")
241+
if idx == -1 {
242+
return "", "", false // not a JSONEachRow insert or malformed
243+
}
244+
return sql[:idx], sql[idx:], true
245+
}
246+
238247
for i := range inputJson {
239-
conn, mock := util.InitSqlMockWithPrettyPrint(t, true)
248+
249+
queryMatcher := sqlmock.QueryMatcherFunc(func(expectedSQL, actualSQL string) error {
250+
251+
dumpState := func() {
252+
fmt.Println("Expected SQL:", expectedSQL)
253+
fmt.Println("Actual SQL: ", actualSQL)
254+
fmt.Println("---")
255+
}
256+
257+
expectedInsert, expectedJson, ok := splitInsertJSONEachRow(expectedSQL)
258+
if !ok {
259+
dumpState()
260+
return fmt.Errorf("expected SQL does not match JSONEachRow format: %s", expectedSQL)
261+
}
262+
263+
actualInsert, actualJson, ok := splitInsertJSONEachRow(actualSQL)
264+
if !ok {
265+
dumpState()
266+
return fmt.Errorf("actual SQL does not match JSONEachRow format: %s", actualSQL)
267+
}
268+
269+
if expectedInsert != actualInsert {
270+
dumpState()
271+
return fmt.Errorf("expected insert prefix '%s' does not match actual '%s'", expectedInsert, actualInsert)
272+
}
273+
274+
expectedMap, err := types.ParseJSON(expectedJson)
275+
if err != nil {
276+
dumpState()
277+
return fmt.Errorf("failed to parse expected JSON: %s, error: %v", expectedJson, err)
278+
}
279+
actualMap, err := types.ParseJSON(actualJson)
280+
if err != nil {
281+
dumpState()
282+
return fmt.Errorf("failed to parse actual JSON: %s, error: %v", actualJson, err)
283+
}
284+
285+
if !reflect.DeepEqual(expectedMap, actualMap) {
286+
dumpState()
287+
return fmt.Errorf("expected JSON %s does not match actual JSON %s", expectedJson, actualJson)
288+
}
289+
290+
return nil
291+
})
292+
293+
conn, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(queryMatcher))
294+
if err != nil {
295+
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
296+
}
297+
240298
db := backend_connectors.NewClickHouseBackendConnectorWithConnection("", conn)
241299
ip := newIngestProcessorEmpty()
242300
ip.chDb = db
@@ -253,9 +311,10 @@ func TestIngestValidation(t *testing.T) {
253311

254312
defer db.Close()
255313

256-
mock.ExpectExec(EscapeBrackets(expectedInsertJsons[i])).WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0))
257-
err := ip.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON((inputJson[i]))}, &IngestTransformerTest{}, &columNameFormatter{separator: "::"})
314+
mock.ExpectExec(expectedInsertJsons[i]).WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0))
315+
err = ip.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON((inputJson[i]))}, &IngestTransformerTest{}, &columNameFormatter{separator: "::"})
258316
assert.NoError(t, err)
317+
259318
if err := mock.ExpectationsWereMet(); err != nil {
260319
t.Fatal("there were unfulfilled expections:", err)
261320
}

platform/ingest/processor.go

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -415,32 +415,29 @@ type NonSchemaField struct {
415415
Type string // inferred from incoming json
416416
}
417417

418-
func convertNonSchemaFieldsToString(nonSchemaFields []NonSchemaField) string {
419-
if len(nonSchemaFields) <= 0 {
420-
return ""
421-
}
422-
attributesColumns := []string{chLib.AttributesValuesColumn, chLib.AttributesMetadataColumn}
423-
var nonSchemaStr string
424-
for columnIndex, column := range attributesColumns {
425-
var value string
426-
if columnIndex > 0 {
427-
nonSchemaStr += ","
418+
func convertNonSchemaFieldsToMap(nonSchemaFields []NonSchemaField) map[string]any {
419+
valuesMap := make(map[string]string)
420+
typesMap := make(map[string]string)
421+
422+
for _, f := range nonSchemaFields {
423+
if f.Value != "" {
424+
valuesMap[f.Key] = f.Value
428425
}
429-
nonSchemaStr += "\"" + column + "\":{"
430-
for i := 0; i < len(nonSchemaFields); i++ {
431-
if columnIndex > 0 {
432-
value = nonSchemaFields[i].Type
433-
} else {
434-
value = nonSchemaFields[i].Value
435-
}
436-
if i > 0 {
437-
nonSchemaStr += ","
438-
}
439-
nonSchemaStr += fmt.Sprintf("\"%s\":\"%s\"", nonSchemaFields[i].Key, value)
426+
if f.Type != "" {
427+
typesMap[f.Key] = f.Type
440428
}
441-
nonSchemaStr = nonSchemaStr + "}"
442429
}
443-
return nonSchemaStr
430+
431+
result := make(map[string]any)
432+
433+
if len(valuesMap) > 0 {
434+
result[chLib.AttributesValuesColumn] = valuesMap
435+
}
436+
if len(typesMap) > 0 {
437+
result[chLib.AttributesMetadataColumn] = typesMap
438+
}
439+
440+
return result
444441
}
445442

446443
func generateNonSchemaFields(attrsMap map[string][]interface{}) ([]NonSchemaField, error) {
@@ -587,16 +584,17 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
587584
}
588585

589586
func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
590-
nonSchemaStr := convertNonSchemaFieldsToString(nonSchemaFields)
591-
schemaFieldsJson, err := json.Marshal(onlySchemaFields)
587+
result := convertNonSchemaFieldsToMap(nonSchemaFields)
588+
589+
for k, v := range onlySchemaFields {
590+
result[k] = v
591+
}
592+
593+
jsonBytes, err := json.Marshal(result)
592594
if err != nil {
593595
return "", err
594596
}
595-
comma := ""
596-
if nonSchemaStr != "" && len(schemaFieldsJson) > 2 {
597-
comma = ","
598-
}
599-
return fmt.Sprintf("{%s%s%s", nonSchemaStr, comma, schemaFieldsJson[1:]), err
597+
return string(jsonBytes), nil
600598
}
601599

602600
func generateSqlStatements(createTableCmd string, alterCmd []string, insert string) []string {

0 commit comments

Comments
 (0)