Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quesma/ingest/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestAlterTable(t *testing.T) {
assert.NoError(t, err)
insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
assert.Equal(t, expectedInsert[i], insert)
assert.Equal(t, alters[i], alter[0])
assert.Equal(t, alters[i], alter["Test1"])
// Table will grow with each iteration
assert.Equal(t, i+1, len(table.Cols))
for _, col := range columns[:i+1] {
Expand Down
99 changes: 67 additions & 32 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ func getAttributesByArrayName(arrayName string,
return attributes
}

type AlterDDL struct {
tableName string
columnName string
columnType string
comment string
}

// This function generates ALTER TABLE commands for adding new columns
// to the table based on the attributesMap and the table name
// AttributesMap contains the attributes that are not part of the schema
Expand All @@ -295,12 +302,11 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap map[string][]interface{},
table *chLib.Table,
alteredAttributesIndexes []int,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) []string {
var alterCmd []string
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) map[string]AlterDDL {
alterDDLMap := make(map[string]AlterDDL)
attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap)
attrTypes := getAttributesByArrayName(chLib.DeprecatedAttributesValueType, attrsMap)
var deleteIndexes []int

reverseMap := reverseFieldEncoding(encodings, table.Name)

// HACK Alert:
Expand Down Expand Up @@ -334,12 +340,8 @@ func (ip *IngestProcessor) generateNewColumns(
metadata.Values[comment_metadata.ElasticFieldName] = propertyName
comment := metadata.Marshall()

alterTable := fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", table.Name, attrKeys[i], columnType)
newColumns[attrKeys[i]] = &chLib.Column{Name: attrKeys[i], Type: chLib.NewBaseType(attrTypes[i]), Modifiers: modifiers, Comment: comment}
alterCmd = append(alterCmd, alterTable)

alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment)
alterCmd = append(alterCmd, alterColumn)
alterDDLMap[attrKeys[i]] = AlterDDL{tableName: table.Name, columnName: attrKeys[i], columnType: columnType, comment: comment}

deleteIndexes = append(deleteIndexes, i)
}
Expand All @@ -358,7 +360,7 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap[chLib.DeprecatedAttributesValueType] = append(attrsMap[chLib.DeprecatedAttributesValueType][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueType][deleteIndexes[i]+1:]...)
attrsMap[chLib.DeprecatedAttributesValueColumn] = append(attrsMap[chLib.DeprecatedAttributesValueColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueColumn][deleteIndexes[i]+1:]...)
}
return alterCmd
return alterDDLMap
}

// This struct contains the information about the columns that aren't part of the schema
Expand Down Expand Up @@ -496,7 +498,7 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
data types.JSON,
inValidJson types.JSON,
config *chLib.ChTableConfig,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) {
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) (map[string]AlterDDL, types.JSON, []NonSchemaField, error) {

jsonAsBytesSlice, err := json.Marshal(data)

Expand Down Expand Up @@ -537,10 +539,10 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
// otherwise it would contain invalid fields e.g. with wrong types
// we only want to add fields that are not part of the schema e.g we don't
// have columns for them
var alterCmd []string
alterDDLMap := make(map[string]AlterDDL)
atomic.AddInt64(&ip.ingestCounter, 1)
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
alterCmd = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
alterDDLMap = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
}
// If there are some invalid fields, we need to add them to the attributes map
// to not lose them and be able to store them later by
Expand All @@ -556,7 +558,7 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,

onlySchemaFields := RemoveNonSchemaFields(jsonMap, table)

return alterCmd, onlySchemaFields, nonSchemaFields, nil
return alterDDLMap, onlySchemaFields, nonSchemaFields, nil
}

func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
Expand All @@ -572,12 +574,17 @@ func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types
return fmt.Sprintf("{%s%s%s", nonSchemaStr, comma, schemaFieldsJson[1:]), err
}

func generateSqlStatements(createTableCmd string, alterCmd []string, insert string) []string {
func generateSqlStatements(createTableCmd string, alterDDL map[string]AlterDDL, insert string) []string {
var statements []string
if createTableCmd != "" {
statements = append(statements, createTableCmd)
}
statements = append(statements, alterCmd...)
alterStatements := make([]string, 0, len(alterDDL))
for _, alterCmd := range alterDDL {
alterStatements = append(alterStatements, fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", alterCmd.tableName, alterCmd.columnName, alterCmd.columnType))
alterStatements = append(alterStatements, fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", alterCmd.tableName, alterCmd.columnName, alterCmd.comment))
}
statements = append(statements, alterStatements...)
statements = append(statements, insert)
return statements
}
Expand All @@ -598,7 +605,7 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.
func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) {
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) (string, string, map[string]AlterDDL, error) {
// this is pre ingest transformer
// here we transform the data before it's structure evaluation and insertion
//
Expand All @@ -607,7 +614,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
for _, jsonValue := range jsonData {
result, err := preIngestTransformer.Transform(jsonValue)
if err != nil {
return nil, fmt.Errorf("error while rewriting json: %v", err)
return "", "", nil, fmt.Errorf("error while rewriting json: %v", err)
}
processed = append(processed, result)
}
Expand Down Expand Up @@ -652,7 +659,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
createTableCmd, err = ip.createTableObjectAndAttributes(ctx, createTableCmd, tableConfig, tableName, tableDefinitionChangeOnly)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err)
return nil, err
return "", "", nil, err
}
// Set pointer to table after creating it
table = ip.FindTable(tableName)
Expand All @@ -661,35 +668,37 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
}
tableConfig = table.Config
var jsonsReadyForInsertion []string
var alterCmd []string
alterDDLMapGlobal := make(map[string]AlterDDL)
var preprocessedJsons []types.JSON
var invalidJsons []types.JSON
preprocessedJsons, invalidJsons, err := ip.preprocessJsons(ctx, table.Name, jsonData, transformer)
if err != nil {
return nil, fmt.Errorf("error preprocessJsons: %v", err)
return "", "", nil, fmt.Errorf("error preprocessJsons: %v", err)
}
for i, preprocessedJson := range preprocessedJsons {
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
alterDDLMap, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
invalidJsons[i], tableConfig, encodings)

if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
return "", "", nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
}
insertJson, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
if err != nil {
return nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return "", "", nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
for key, value := range alterDDLMap {
alterDDLMapGlobal[key] = value
}
alterCmd = append(alterCmd, alter...)
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return "", "", nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson)
}

insertValues := strings.Join(jsonsReadyForInsertion, ", ")
insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues)

return generateSqlStatements(createTableCmd, alterCmd, insert), nil
return createTableCmd, insert, alterDDLMapGlobal, nil
}

func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string,
Expand All @@ -708,39 +717,65 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
clonedJsonData = append(clonedJsonData, jsonValue.Clone())
}

err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true)
err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true, nil, tableName)
if err != nil {
// we ignore an error here, because we want to process the data and don't lose it
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
}

sourceIndexSchema := findSchemaPointer(lm.schemaRegistry, tableName)
sourceIndex := tableName
pipeline := jsonprocessor.IngestTransformerPipeline{}
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
pipeline = append(pipeline, transformer)
tableName = common_table.TableName

err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false)
err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false, sourceIndexSchema, sourceIndex)
if err != nil {
return fmt.Errorf("error processing insert query to a common table: %w", err)
}

return nil
}

return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false)
return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false, nil, tableName)

}

func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, isVirtualTable bool) error {
statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
tableFormatter TableColumNameFormatter, isVirtualTable bool, sourceIndexSchema *schema.Schema, sourceIndex string) error {

createStatement, insertStatement, alterDDLMap, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
if err != nil {
return err
}

statements := generateSqlStatements(createStatement, alterDDLMap, insertStatement)

var logVirtualTableDDL bool // maybe this should be a part of the config or sth

// TODO that's a hack, we add columns to quesma-common-table that
// came from mappings instead of ingest
if sourceIndexSchema != nil {
if ip.cfg.IndexConfig[sourceIndex].UseCommonTable && len(alterDDLMap) > 0 {
var columnsFromDynamicMapping []string
for _, field := range sourceIndexSchema.Fields {
if _, ok := alterDDLMap[field.InternalPropertyName.AsString()]; !ok {

if field.Origin == schema.FieldSourceMapping {
columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", tableName, field.InternalPropertyName, field.InternalPropertyType))
metadata := comment_metadata.NewCommentMetadata()
metadata.Values[comment_metadata.ElasticFieldName] = field.PropertyName.AsString()
comment := metadata.Marshall()
columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", tableName, field.InternalPropertyName, comment))
}

}
}
statements = append(columnsFromDynamicMapping, statements...)
}
}

if isVirtualTable && logVirtualTableDDL {
for _, statement := range statements {
if strings.HasPrefix(statement, "ALTER") || strings.HasPrefix(statement, "CREATE") {
Expand Down
6 changes: 4 additions & 2 deletions quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ func (s *SchemaCheckPass) applyWildcardExpansion(indexSchema schema.Schema, quer

cols := make([]string, 0, len(indexSchema.Fields))
for _, col := range indexSchema.Fields {
cols = append(cols, col.InternalPropertyName.AsString())
if col.Origin == schema.FieldSourceIngest {
cols = append(cols, col.InternalPropertyName.AsString())
}
}
sort.Strings(cols)

Expand All @@ -490,7 +492,7 @@ func (s *SchemaCheckPass) applyFullTextField(indexSchema schema.Schema, query *m
var fullTextFields []string

for _, field := range indexSchema.Fields {
if field.Type.IsFullText() {
if field.Type.IsFullText() && field.Origin == schema.FieldSourceIngest {
fullTextFields = append(fullTextFields, field.InternalPropertyName.AsString())
}
}
Expand Down
4 changes: 2 additions & 2 deletions quesma/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *schemaRegistry) populateSchemaFromDynamicConfiguration(indexName string
continue
}

fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType}
fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType, Origin: FieldSourceMapping}
}
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: QuesmaTypeKeyword}
}
} else {
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type}
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type, Origin: FieldSourceIngest}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/schema/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"},
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}},
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText, Origin: schema.FieldSourceMapping}},
true, "")
resultSchema, resultFound = s.FindSchema(schema.TableName(tableName))
assert.True(t, resultFound, "schema not found")
Expand Down
10 changes: 10 additions & 0 deletions quesma/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"strings"
)

// FieldSource is an enum that represents the source of a field in the schema
type FieldSource int

const (
FieldSourceIngest FieldSource = iota
FieldSourceMapping
FieldSourceStaticConfiguration
)

type (
Schema struct {
Fields map[FieldName]Field
Expand All @@ -23,6 +32,7 @@ type (
InternalPropertyName FieldName
InternalPropertyType string
Type QuesmaType
Origin FieldSource
}
TableName string
FieldName string
Expand Down
Loading