Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 8c93f77

Browse files
authored
Fixing quesma common table dynamic mapping case, second approach (#885)
This is a second approach (1 - #878) of fixing a problem where mappings contains more fields than ingest and sql queries are based on this knowledge. In this approach we are not altering table to have all fields that were added by dynamic mapping but we based on field origin information and building queries according to that. <img width="1706" alt="image" src="https://github.com/user-attachments/assets/40e52e56-7da5-4346-b138-8f5b738513c4">
1 parent 939452c commit 8c93f77

File tree

6 files changed

+59
-5
lines changed

6 files changed

+59
-5
lines changed

quesma/ingest/processor.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,15 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
641641
ignoredFields := ip.getIgnoredFields(tableName)
642642
columnsFromJson := JsonToColumns("", jsonData[0], 1,
643643
tableConfig, tableFormatter, ignoredFields)
644+
645+
fieldOrigins := make(map[schema.FieldName]schema.FieldSource)
646+
647+
for _, column := range columnsFromJson {
648+
fieldOrigins[schema.FieldName(column.ClickHouseColumnName)] = schema.FieldSourceIngest
649+
}
650+
651+
ip.schemaRegistry.UpdateFieldsOrigins(schema.TableName(tableName), fieldOrigins)
652+
644653
// This comes externally from (configuration)
645654
// So we need to convert that separately
646655
columnsFromSchema := SchemaToColumns(findSchemaPointer(ip.schemaRegistry, tableName), tableFormatter, tableName, ip.schemaRegistry.GetFieldEncodings())

quesma/quesma/schema_transformer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,10 @@ func (s *SchemaCheckPass) applyWildcardExpansion(indexSchema schema.Schema, quer
463463

464464
cols := make([]string, 0, len(indexSchema.Fields))
465465
for _, col := range indexSchema.Fields {
466-
cols = append(cols, col.InternalPropertyName.AsString())
466+
// Take only fields that are ingested
467+
if col.Origin == schema.FieldSourceIngest {
468+
cols = append(cols, col.InternalPropertyName.AsString())
469+
}
467470
}
468471
sort.Strings(cols)
469472

@@ -491,7 +494,10 @@ func (s *SchemaCheckPass) applyFullTextField(indexSchema schema.Schema, query *m
491494

492495
for _, field := range indexSchema.Fields {
493496
if field.Type.IsFullText() {
494-
fullTextFields = append(fullTextFields, field.InternalPropertyName.AsString())
497+
// Take only fields that are ingested
498+
if field.Origin == schema.FieldSourceIngest {
499+
fullTextFields = append(fullTextFields, field.InternalPropertyName.AsString())
500+
}
495501
}
496502
}
497503

quesma/schema/registry.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type (
1616
Registry interface {
1717
AllSchemas() map[TableName]Schema
1818
FindSchema(name TableName) (Schema, bool)
19+
UpdateFieldsOrigins(name TableName, fields map[FieldName]FieldSource)
1920
UpdateDynamicConfiguration(name TableName, table Table)
2021
UpdateFieldEncodings(encodings map[FieldEncodingKey]EncodedFieldName)
2122
GetFieldEncodings() map[FieldEncodingKey]EncodedFieldName
@@ -34,6 +35,8 @@ type (
3435
dynamicConfiguration map[string]Table
3536
fieldEncodingsLock sync.RWMutex
3637
fieldEncodings map[FieldEncodingKey]EncodedFieldName
38+
fieldOriginsLock sync.RWMutex
39+
fieldOrigins map[TableName]map[FieldName]FieldSource
3740
}
3841
typeAdapter interface {
3942
Convert(string) (QuesmaType, bool)
@@ -95,6 +98,7 @@ func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) {
9598
s.populateAliases(indexConfiguration, fields, aliases)
9699
s.removeIgnoredFields(indexConfiguration, fields, aliases)
97100
s.removeGeoPhysicalFields(fields)
101+
s.populateFieldsOrigins(indexName, fields)
98102
if tableDefinition, ok := definitions[indexName]; ok {
99103
schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource, tableDefinition.DatabaseName)
100104
} else {
@@ -117,7 +121,7 @@ func (s *schemaRegistry) populateSchemaFromDynamicConfiguration(indexName string
117121
continue
118122
}
119123

120-
fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType}
124+
fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType, Origin: FieldSourceMapping}
121125
}
122126
}
123127

@@ -246,7 +250,7 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin
246250
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: QuesmaTypeKeyword}
247251
}
248252
} else {
249-
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type}
253+
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type, Origin: existing.Origin}
250254
}
251255
}
252256
}
@@ -275,3 +279,25 @@ func (s *schemaRegistry) removeGeoPhysicalFields(fields map[FieldName]Field) {
275279
}
276280
}
277281
}
282+
283+
func (s *schemaRegistry) populateFieldsOrigins(indexName string, fields map[FieldName]Field) {
284+
s.fieldOriginsLock.RLock()
285+
if fieldOrigins, ok := s.fieldOrigins[TableName(indexName)]; ok {
286+
for fieldName, field := range fields {
287+
if origin, ok := fieldOrigins[field.InternalPropertyName]; ok {
288+
field.Origin = origin
289+
fields[fieldName] = field
290+
}
291+
}
292+
}
293+
s.fieldOriginsLock.RUnlock()
294+
}
295+
296+
func (s *schemaRegistry) UpdateFieldsOrigins(name TableName, fields map[FieldName]FieldSource) {
297+
s.fieldOriginsLock.Lock()
298+
defer s.fieldOriginsLock.Unlock()
299+
if s.fieldOrigins == nil {
300+
s.fieldOrigins = make(map[TableName]map[FieldName]FieldSource)
301+
}
302+
s.fieldOrigins[name] = fields
303+
}

quesma/schema/registry_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) {
319319
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"},
320320
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
321321
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"},
322-
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}},
322+
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText, Origin: schema.FieldSourceMapping}},
323323
true, "")
324324
resultSchema, resultFound = s.FindSchema(schema.TableName(tableName))
325325
assert.True(t, resultFound, "schema not found")

quesma/schema/schema.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ import (
66
"strings"
77
)
88

9+
// FieldSource is an enum that represents the source of a field in the schema
10+
type FieldSource int
11+
12+
const (
13+
FieldSourceIngest FieldSource = iota
14+
FieldSourceMapping
15+
)
16+
917
type (
1018
Schema struct {
1119
Fields map[FieldName]Field
@@ -23,6 +31,7 @@ type (
2331
InternalPropertyName FieldName
2432
InternalPropertyType string
2533
Type QuesmaType
34+
Origin FieldSource
2635
}
2736
TableName string
2837
FieldName string

quesma/schema/static_registry.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,7 @@ func (e *StaticRegistry) GetFieldEncodings() map[FieldEncodingKey]EncodedFieldNa
4646
}
4747
return e.FieldEncodings
4848
}
49+
50+
func (e *StaticRegistry) UpdateFieldsOrigins(name TableName, fields map[FieldName]FieldSource) {
51+
52+
}

0 commit comments

Comments
 (0)