Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 9bf027a

Browse files
authored
Moving jsonprocessor to util (#1218)
1 parent d46efa2 commit 9bf027a

File tree

8 files changed

+23
-26
lines changed

8 files changed

+23
-26
lines changed

quesma/ingest/common_table_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
99
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
1010
"github.com/QuesmaOrg/quesma/quesma/common_table"
11-
"github.com/QuesmaOrg/quesma/quesma/jsonprocessor"
1211
"github.com/QuesmaOrg/quesma/quesma/persistence"
1312
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
1413
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
@@ -238,7 +237,7 @@ func TestIngestToCommonTable(t *testing.T) {
238237
ctx := context.Background()
239238
formatter := DefaultColumnNameFormatter()
240239

241-
transformer := jsonprocessor.IngestTransformerFor(indexName, quesmaConfig)
240+
transformer := IngestTransformerFor(indexName, quesmaConfig)
242241

243242
for _, stm := range tt.expectedStatements {
244243
mock.ExpectExec(stm).WillReturnResult(sqlmock.NewResult(1, 1))

quesma/jsonprocessor/ingest.go renamed to quesma/ingest/ingest.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
// Copyright Quesma, licensed under the Elastic License 2.0.
22
// SPDX-License-Identifier: Elastic-2.0
3-
package jsonprocessor
3+
package ingest
44

55
import (
66
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
77
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
8+
"github.com/QuesmaOrg/quesma/quesma/util"
89
"strings"
910
)
1011

@@ -17,7 +18,7 @@ type flattenMapTransformer struct {
1718
}
1819

1920
func (t *flattenMapTransformer) Transform(document types.JSON) (types.JSON, error) {
20-
return FlattenMap(document, t.separator), nil
21+
return util.FlattenMap(document, t.separator), nil
2122
}
2223

2324
type removeFieldsTransformer struct {

quesma/ingest/ingest_validator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func TestIngestValidation(t *testing.T) {
185185
defer db.Close()
186186

187187
mock.ExpectExec(EscapeBrackets(expectedInsertJsons[i])).WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0))
188-
err := ip.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON((inputJson[i]))}, &IngestTransformer{}, &columNameFormatter{separator: "::"})
188+
err := ip.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON((inputJson[i]))}, &IngestTransformerTest{}, &columNameFormatter{separator: "::"})
189189
assert.NoError(t, err)
190190
if err := mock.ExpectationsWereMet(); err != nil {
191191
t.Fatal("there were unfulfilled expections:", err)

quesma/ingest/insert_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/DATA-DOG/go-sqlmock"
99
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
1010
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
11-
"github.com/QuesmaOrg/quesma/quesma/jsonprocessor"
1211
"github.com/QuesmaOrg/quesma/quesma/persistence"
1312
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
1413
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
@@ -133,10 +132,10 @@ type ingestProcessorHelper struct {
133132
tableAlreadyCreated bool
134133
}
135134

136-
type IngestTransformer struct {
135+
type IngestTransformerTest struct {
137136
}
138137

139-
func (*IngestTransformer) Transform(document types.JSON) (types.JSON, error) {
138+
func (*IngestTransformerTest) Transform(document types.JSON) (types.JSON, error) {
140139
return document, nil
141140
}
142141

@@ -270,7 +269,7 @@ func TestProcessInsertQuery(t *testing.T) {
270269
}
271270
}
272271

273-
err := ip.ip.ProcessInsertQuery(ctx, tableName, []types.JSON{types.MustJSON(tt.insertJson)}, &IngestTransformer{}, &columNameFormatter{separator: "::"})
272+
err := ip.ip.ProcessInsertQuery(ctx, tableName, []types.JSON{types.MustJSON(tt.insertJson)}, &IngestTransformerTest{}, &columNameFormatter{separator: "::"})
274273
assert.NoError(t, err)
275274
if err := mock.ExpectationsWereMet(); err != nil {
276275
t.Fatal("there were unfulfilled expections:", err)
@@ -302,7 +301,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {
302301
mock.ExpectExec(`CREATE TABLE IF NOT EXISTS "` + tableName).WillReturnResult(sqlmock.NewResult(0, 0))
303302
mock.ExpectExec(expectedInsertJsons[i]).WillReturnResult(sqlmock.NewResult(0, 0))
304303

305-
err := lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))}, &IngestTransformer{}, &columNameFormatter{separator: "::"})
304+
err := lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))}, &IngestTransformerTest{}, &columNameFormatter{separator: "::"})
306305
assert.NoError(t, err)
307306
if err := mock.ExpectationsWereMet(); err != nil {
308307
t.Fatal("there were unfulfilled expections:", err)
@@ -448,7 +447,7 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
448447
ctx := context.Background()
449448
formatter := DefaultColumnNameFormatter()
450449

451-
transformer := jsonprocessor.IngestTransformerFor(indexName, quesmaConfig)
450+
transformer := IngestTransformerFor(indexName, quesmaConfig)
452451

453452
for _, stm := range tt.expectedStatements {
454453
mock.ExpectExec(stm).WillReturnResult(sqlmock.NewResult(1, 1))

quesma/ingest/processor.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/QuesmaOrg/quesma/quesma/common_table"
1212
"github.com/QuesmaOrg/quesma/quesma/elasticsearch"
1313
"github.com/QuesmaOrg/quesma/quesma/end_user_errors"
14-
"github.com/QuesmaOrg/quesma/quesma/jsonprocessor"
1514
"github.com/QuesmaOrg/quesma/quesma/logger"
1615
"github.com/QuesmaOrg/quesma/quesma/model"
1716
"github.com/QuesmaOrg/quesma/quesma/persistence"
@@ -560,7 +559,7 @@ func generateSqlStatements(createTableCmd string, alterCmd []string, insert stri
560559
func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.FieldEncodingKey]schema.EncodedFieldName {
561560
encodings := make(map[schema.FieldEncodingKey]schema.EncodedFieldName)
562561
for _, jsonValue := range jsonData {
563-
flattenJson := jsonprocessor.FlattenMap(jsonValue, ".")
562+
flattenJson := util.FlattenMap(jsonValue, ".")
564563
for field := range flattenJson {
565564
encodedField := util.FieldToColumnEncoder(field)
566565
encodings[schema.FieldEncodingKey{TableName: tableName, FieldName: field}] =
@@ -572,12 +571,12 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.
572571

573572
func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
574573
tableName string,
575-
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
574+
jsonData []types.JSON, transformer IngestTransformer,
576575
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) {
577576
// this is pre ingest transformer
578577
// here we transform the data before it's structure evaluation and insertion
579578
//
580-
preIngestTransformer := &jsonprocessor.RewriteArrayOfObject{}
579+
preIngestTransformer := &util.RewriteArrayOfObject{}
581580
var processed []types.JSON
582581
for _, jsonValue := range jsonData {
583582
result, err := preIngestTransformer.Transform(jsonValue)
@@ -693,12 +692,12 @@ func (lm *IngestProcessor) Ingest(ctx context.Context, indexName string, jsonDat
693692
}
694693

695694
nameFormatter := DefaultColumnNameFormatter()
696-
transformer := jsonprocessor.IngestTransformerFor(indexName, lm.cfg)
695+
transformer := IngestTransformerFor(indexName, lm.cfg)
697696
return lm.ProcessInsertQuery(ctx, indexName, jsonData, transformer, nameFormatter)
698697
}
699698

700699
func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string,
701-
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
700+
jsonData []types.JSON, transformer IngestTransformer,
702701
tableFormatter TableColumNameFormatter) error {
703702

704703
decision := lm.tableResolver.Resolve(quesma_api.IngestPipeline, tableName)
@@ -738,7 +737,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
738737
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
739738
}
740739

741-
pipeline := jsonprocessor.IngestTransformerPipeline{}
740+
pipeline := IngestTransformerPipeline{}
742741
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
743742
pipeline = append(pipeline, transformer)
744743

@@ -796,7 +795,7 @@ func (ip *IngestProcessor) applyAsyncInsertOptimizer(tableName string, clickhous
796795
}
797796

798797
func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string,
799-
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
798+
jsonData []types.JSON, transformer IngestTransformer,
800799
tableFormatter TableColumNameFormatter, isVirtualTable bool) error {
801800
statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
802801
if err != nil {

quesma/stats/ingest_statistics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ package stats
44

55
import (
66
"fmt"
7-
"github.com/QuesmaOrg/quesma/quesma/jsonprocessor"
87
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
8+
"github.com/QuesmaOrg/quesma/quesma/util"
99
"sort"
1010
"strconv"
1111
"strings"
@@ -81,7 +81,7 @@ func (s *Statistics) getValueStatisticsPtr(keyStatistics *KeyStatistics, nonSche
8181
func (s *Statistics) process(index string,
8282
jsonData types.JSON, nonSchemaFields bool, nestedSeparator string) {
8383

84-
flatJson := jsonprocessor.FlattenMap(jsonData, nestedSeparator)
84+
flatJson := util.FlattenMap(jsonData, nestedSeparator)
8585

8686
mu.Lock()
8787
defer mu.Unlock()

quesma/jsonprocessor/json_processor.go renamed to quesma/util/json_processor.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
// Copyright Quesma, licensed under the Elastic License 2.0.
22
// SPDX-License-Identifier: Elastic-2.0
3-
package jsonprocessor
3+
package util
44

55
import (
66
"fmt"
7-
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
87
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
98
)
109

@@ -111,12 +110,12 @@ func (t *RewriteArrayOfObject) Transform(data types.JSON) (types.JSON, error) {
111110
}
112111

113112
type RemoveFieldsOfObject struct {
114-
RemovedFields []config.FieldName
113+
RemovedFields []string
115114
}
116115

117116
func (t *RemoveFieldsOfObject) Transform(data types.JSON) (types.JSON, error) {
118117
for _, field := range t.RemovedFields {
119-
delete(data, field.AsString())
118+
delete(data, field)
120119
}
121120
return data, nil
122121
}

quesma/jsonprocessor/json_processor_test.go renamed to quesma/util/json_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright Quesma, licensed under the Elastic License 2.0.
22
// SPDX-License-Identifier: Elastic-2.0
3-
package jsonprocessor
3+
package util
44

55
import (
66
"github.com/QuesmaOrg/quesma/quesma/quesma/types"

0 commit comments

Comments
 (0)