Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 54ab46d

Browse files
committed
Moving some stuff around
1 parent c55d8e4 commit 54ab46d

File tree

3 files changed

+114
-49
lines changed

3 files changed

+114
-49
lines changed

platform/ingest/hydrolixlowerer.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@ import (
99
"github.com/QuesmaOrg/quesma/platform/schema"
1010
"github.com/QuesmaOrg/quesma/platform/types"
1111
"strings"
12+
"sync"
13+
"sync/atomic"
1214
)
1315

1416
type HydrolixLowerer struct {
15-
virtualTableStorage persistence.JSONDatabase
17+
virtualTableStorage persistence.JSONDatabase
18+
ingestCounter int64
19+
ingestFieldStatistics IngestFieldStatistics
20+
ingestFieldStatisticsLock sync.Mutex
1621
}
1722

1823
func NewHydrolixLowerer(virtualTableStorage persistence.JSONDatabase) *HydrolixLowerer {
@@ -21,6 +26,54 @@ func NewHydrolixLowerer(virtualTableStorage persistence.JSONDatabase) *HydrolixL
2126
}
2227
}
2328

29+
func (ip *HydrolixLowerer) GenerateIngestContent(table *chLib.Table,
30+
data types.JSON,
31+
inValidJson types.JSON,
32+
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]AlterStatement, types.JSON, []NonSchemaField, error) {
33+
34+
if len(table.Config.Attributes) == 0 {
35+
return nil, data, nil, nil
36+
}
37+
38+
mDiff := DifferenceMap(data, table) // TODO change to DifferenceMap(m, t)
39+
40+
if len(mDiff) == 0 && len(inValidJson) == 0 { // no need to modify, just insert 'js'
41+
return nil, data, nil, nil
42+
}
43+
44+
// check attributes precondition
45+
if len(table.Config.Attributes) <= 0 {
46+
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
47+
}
48+
attrsMap, _ := BuildAttrsMap(mDiff, table.Config)
49+
50+
// generateNewColumns is called on original attributes map
51+
// before adding invalid fields to it
52+
// otherwise it would contain invalid fields e.g. with wrong types
53+
// we only want to add fields that are not part of the schema e.g we don't
54+
// have columns for them
55+
var alterStatements []AlterStatement
56+
atomic.AddInt64(&ip.ingestCounter, 1)
57+
//if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
58+
// alterStatements = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
59+
//}
60+
// If there are some invalid fields, we need to add them to the attributes map
61+
// to not lose them and be able to store them later by
62+
// generating correct update query
63+
// addInvalidJsonFieldsToAttributes returns a new map with invalid fields added
64+
// this map is then used to generate non-schema fields string
65+
attrsMapWithInvalidFields := addInvalidJsonFieldsToAttributes(attrsMap, inValidJson)
66+
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)
67+
68+
if err != nil {
69+
return nil, nil, nil, err
70+
}
71+
72+
onlySchemaFields := RemoveNonSchemaFields(data, table)
73+
74+
return alterStatements, onlySchemaFields, nonSchemaFields, nil
75+
}
76+
2477
func (l *HydrolixLowerer) LowerToDDL(
2578
validatedJsons []types.JSON,
2679
table *chLib.Table,
@@ -68,6 +121,17 @@ func (l *HydrolixLowerer) LowerToDDL(
68121
partitioningField, defaultField,
69122
partitioningGranularity, defaultGranularity)
70123

124+
for i, preprocessedJson := range validatedJsons {
125+
alter, onlySchemaFields, nonSchemaFields, err := l.GenerateIngestContent(table, preprocessedJson,
126+
invalidJsons[i], encodings)
127+
_ = alter
128+
_ = onlySchemaFields
129+
_ = nonSchemaFields
130+
if err != nil {
131+
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
132+
}
133+
}
134+
71135
result := fmt.Sprintf(`{
72136
"schema": {
73137
"project": "%s",

platform/ingest/processor.go

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -627,54 +627,6 @@ func (ip *SqlLowerer) shouldAlterColumns(table *chLib.Table, attrsMap map[string
627627
return false, nil
628628
}
629629

630-
func (ip *SqlLowerer) GenerateIngestContent(table *chLib.Table,
631-
data types.JSON,
632-
inValidJson types.JSON,
633-
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]AlterStatement, types.JSON, []NonSchemaField, error) {
634-
635-
if len(table.Config.Attributes) == 0 {
636-
return nil, data, nil, nil
637-
}
638-
639-
mDiff := DifferenceMap(data, table) // TODO change to DifferenceMap(m, t)
640-
641-
if len(mDiff) == 0 && len(inValidJson) == 0 { // no need to modify, just insert 'js'
642-
return nil, data, nil, nil
643-
}
644-
645-
// check attributes precondition
646-
if len(table.Config.Attributes) <= 0 {
647-
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
648-
}
649-
attrsMap, _ := BuildAttrsMap(mDiff, table.Config)
650-
651-
// generateNewColumns is called on original attributes map
652-
// before adding invalid fields to it
653-
// otherwise it would contain invalid fields e.g. with wrong types
654-
// we only want to add fields that are not part of the schema e.g we don't
655-
// have columns for them
656-
var alterStatements []AlterStatement
657-
atomic.AddInt64(&ip.ingestCounter, 1)
658-
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
659-
alterStatements = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
660-
}
661-
// If there are some invalid fields, we need to add them to the attributes map
662-
// to not lose them and be able to store them later by
663-
// generating correct update query
664-
// addInvalidJsonFieldsToAttributes returns a new map with invalid fields added
665-
// this map is then used to generate non-schema fields string
666-
attrsMapWithInvalidFields := addInvalidJsonFieldsToAttributes(attrsMap, inValidJson)
667-
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)
668-
669-
if err != nil {
670-
return nil, nil, nil, err
671-
}
672-
673-
onlySchemaFields := RemoveNonSchemaFields(data, table)
674-
675-
return alterStatements, onlySchemaFields, nonSchemaFields, nil
676-
}
677-
678630
func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
679631
result := convertNonSchemaFieldsToMap(nonSchemaFields)
680632

platform/ingest/sqllowerer.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/QuesmaOrg/quesma/platform/types"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
)
1415

1516
type SqlLowerer struct {
@@ -26,6 +27,54 @@ func NewSqlLowerer(virtualTableStorage persistence.JSONDatabase) *SqlLowerer {
2627
}
2728
}
2829

30+
func (ip *SqlLowerer) GenerateIngestContent(table *chLib.Table,
31+
data types.JSON,
32+
inValidJson types.JSON,
33+
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]AlterStatement, types.JSON, []NonSchemaField, error) {
34+
35+
if len(table.Config.Attributes) == 0 {
36+
return nil, data, nil, nil
37+
}
38+
39+
mDiff := DifferenceMap(data, table) // TODO change to DifferenceMap(m, t)
40+
41+
if len(mDiff) == 0 && len(inValidJson) == 0 { // no need to modify, just insert 'js'
42+
return nil, data, nil, nil
43+
}
44+
45+
// check attributes precondition
46+
if len(table.Config.Attributes) <= 0 {
47+
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
48+
}
49+
attrsMap, _ := BuildAttrsMap(mDiff, table.Config)
50+
51+
// generateNewColumns is called on original attributes map
52+
// before adding invalid fields to it
53+
// otherwise it would contain invalid fields e.g. with wrong types
54+
// we only want to add fields that are not part of the schema e.g we don't
55+
// have columns for them
56+
var alterStatements []AlterStatement
57+
atomic.AddInt64(&ip.ingestCounter, 1)
58+
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
59+
alterStatements = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
60+
}
61+
// If there are some invalid fields, we need to add them to the attributes map
62+
// to not lose them and be able to store them later by
63+
// generating correct update query
64+
// addInvalidJsonFieldsToAttributes returns a new map with invalid fields added
65+
// this map is then used to generate non-schema fields string
66+
attrsMapWithInvalidFields := addInvalidJsonFieldsToAttributes(attrsMap, inValidJson)
67+
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)
68+
69+
if err != nil {
70+
return nil, nil, nil, err
71+
}
72+
73+
onlySchemaFields := RemoveNonSchemaFields(data, table)
74+
75+
return alterStatements, onlySchemaFields, nonSchemaFields, nil
76+
}
77+
2978
func (l *SqlLowerer) LowerToDDL(validatedJsons []types.JSON,
3079
table *chLib.Table,
3180
invalidJsons []types.JSON,

0 commit comments

Comments
 (0)