Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quesma/ingest/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestAlterTable(t *testing.T) {

ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
for i := range rowsToInsert {
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
assert.NoError(t, err)
insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
assert.Equal(t, expectedInsert[i], insert)
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestAlterTableHeuristic(t *testing.T) {

assert.Equal(t, int64(0), ip.ingestCounter)
for i := range rowsToInsert {
_, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
_, _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
assert.NoError(t, err)
}
assert.Equal(t, tc.expected, len(table.Cols))
Expand Down
91 changes: 63 additions & 28 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ func getAttributesByArrayName(arrayName string,
return attributes
}

type AlterDDL struct {
tableName string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

columnName string
columnType string
comment string
}

// This function generates ALTER TABLE commands for adding new columns
// to the table based on the attributesMap and the table name
// AttributesMap contains the attributes that are not part of the schema
Expand All @@ -295,12 +302,12 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap map[string][]interface{},
table *chLib.Table,
alteredAttributesIndexes []int,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) []string {
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL) {
var alterCmd []string
alterDDLMap := make(map[string]AlterDDL)
attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap)
attrTypes := getAttributesByArrayName(chLib.DeprecatedAttributesValueType, attrsMap)
var deleteIndexes []int

reverseMap := reverseFieldEncoding(encodings, table.Name)

// HACK Alert:
Expand Down Expand Up @@ -339,6 +346,7 @@ func (ip *IngestProcessor) generateNewColumns(
alterCmd = append(alterCmd, alterTable)

alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment)
alterDDLMap[attrKeys[i]] = AlterDDL{tableName: table.Name, columnName: attrKeys[i], columnType: columnType, comment: comment}
alterCmd = append(alterCmd, alterColumn)

deleteIndexes = append(deleteIndexes, i)
Expand All @@ -358,7 +366,7 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap[chLib.DeprecatedAttributesValueType] = append(attrsMap[chLib.DeprecatedAttributesValueType][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueType][deleteIndexes[i]+1:]...)
attrsMap[chLib.DeprecatedAttributesValueColumn] = append(attrsMap[chLib.DeprecatedAttributesValueColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueColumn][deleteIndexes[i]+1:]...)
}
return alterCmd
return alterCmd, alterDDLMap
}

// This struct contains the information about the columns that aren't part of the schema
Expand Down Expand Up @@ -496,39 +504,39 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
data types.JSON,
inValidJson types.JSON,
config *chLib.ChTableConfig,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) {
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL, types.JSON, []NonSchemaField, error) {

jsonAsBytesSlice, err := json.Marshal(data)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// we find all non-schema fields
jsonMap, err := types.ParseJSON(string(jsonAsBytesSlice))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

if len(config.Attributes) == 0 {
return nil, jsonMap, nil, nil
return nil, nil, jsonMap, nil, nil
}

schemaFieldsJson, err := json.Marshal(jsonMap)

if err != nil {
return nil, jsonMap, nil, err
return nil, nil, jsonMap, nil, err
}

mDiff := DifferenceMap(jsonMap, table) // TODO change to DifferenceMap(m, t)

if len(mDiff) == 0 && string(schemaFieldsJson) == string(jsonAsBytesSlice) && len(inValidJson) == 0 { // no need to modify, just insert 'js'
return nil, jsonMap, nil, nil
return nil, nil, jsonMap, nil, nil
}

// check attributes precondition
if len(config.Attributes) <= 0 {
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
return nil, nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
}
attrsMap, _ := BuildAttrsMap(mDiff, config)

Expand All @@ -538,9 +546,10 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
// we only want to add fields that are not part of the schema e.g we don't
// have columns for them
var alterCmd []string
alterDDLMap := make(map[string]AlterDDL)
atomic.AddInt64(&ip.ingestCounter, 1)
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
alterCmd = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
alterCmd, alterDDLMap = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
}
// If there are some invalid fields, we need to add them to the attributes map
// to not lose them and be able to store them later by
Expand All @@ -551,12 +560,12 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

onlySchemaFields := RemoveNonSchemaFields(jsonMap, table)

return alterCmd, onlySchemaFields, nonSchemaFields, nil
return alterCmd, alterDDLMap, onlySchemaFields, nonSchemaFields, nil
}

func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
Expand Down Expand Up @@ -598,7 +607,7 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.
func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) {
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, map[string]AlterDDL, error) {
// this is pre ingest transformer
// here we transform the data before it's structure evaluation and insertion
//
Expand All @@ -607,7 +616,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
for _, jsonValue := range jsonData {
result, err := preIngestTransformer.Transform(jsonValue)
if err != nil {
return nil, fmt.Errorf("error while rewriting json: %v", err)
return nil, nil, fmt.Errorf("error while rewriting json: %v", err)
}
processed = append(processed, result)
}
Expand Down Expand Up @@ -652,7 +661,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
createTableCmd, err = ip.createTableObjectAndAttributes(ctx, createTableCmd, tableConfig, tableName, tableDefinitionChangeOnly)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err)
return nil, err
return nil, nil, err
}
// Set pointer to table after creating it
table = ip.FindTable(tableName)
Expand All @@ -662,34 +671,38 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
tableConfig = table.Config
var jsonsReadyForInsertion []string
var alterCmd []string
alterDDLMapGlobal := make(map[string]AlterDDL)
var preprocessedJsons []types.JSON
var invalidJsons []types.JSON
preprocessedJsons, invalidJsons, err := ip.preprocessJsons(ctx, table.Name, jsonData, transformer)
if err != nil {
return nil, fmt.Errorf("error preprocessJsons: %v", err)
return nil, nil, fmt.Errorf("error preprocessJsons: %v", err)
}
for i, preprocessedJson := range preprocessedJsons {
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
alter, alterDDLMap, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
invalidJsons[i], tableConfig, encodings)

if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
}
insertJson, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
if err != nil {
return nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return nil, nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
alterCmd = append(alterCmd, alter...)
for key, value := range alterDDLMap {
alterDDLMapGlobal[key] = value
}
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson)
}

insertValues := strings.Join(jsonsReadyForInsertion, ", ")
insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues)

return generateSqlStatements(createTableCmd, alterCmd, insert), nil
return generateSqlStatements(createTableCmd, alterCmd, insert), alterDDLMapGlobal, nil
}

func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string,
Expand All @@ -708,39 +721,61 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
clonedJsonData = append(clonedJsonData, jsonValue.Clone())
}

err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true)
err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true, nil, tableName)
if err != nil {
// we ignore an error here, because we want to process the data and don't lose it
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
}

sourceIndexSchema := findSchemaPointer(lm.schemaRegistry, tableName)
sourceIndex := tableName
pipeline := jsonprocessor.IngestTransformerPipeline{}
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
pipeline = append(pipeline, transformer)
tableName = common_table.TableName

err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false)
err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false, sourceIndexSchema, sourceIndex)
if err != nil {
return fmt.Errorf("error processing insert query to a common table: %w", err)
}

return nil
}

return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false)
return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false, nil, tableName)

}

func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, isVirtualTable bool) error {
statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
tableFormatter TableColumNameFormatter, isVirtualTable bool, sourceIndexSchema *schema.Schema, sourceIndex string) error {

statements, alterDDLMap, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
if err != nil {
return err
}

var logVirtualTableDDL bool // maybe this should be a part of the config or sth

// TODO that's a hack, we add columns to quesma-common-table that
// came from mappings instead of ingest
if sourceIndexSchema != nil {
if ip.cfg.IndexConfig[sourceIndex].UseCommonTable && len(alterDDLMap) > 0 {
var columnsFromDynamicMapping []string
for _, field := range sourceIndexSchema.Fields {
if _, ok := alterDDLMap[field.InternalPropertyName.AsString()]; !ok {
if field.Origin == schema.FieldSourceMapping {
columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", tableName, field.InternalPropertyName, field.InternalPropertyType))
metadata := comment_metadata.NewCommentMetadata()
metadata.Values[comment_metadata.ElasticFieldName] = field.PropertyName.AsString()
comment := metadata.Marshall()
columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", tableName, field.InternalPropertyName, comment))
}
}
}
statements = append(columnsFromDynamicMapping, statements...)
}
}

if isVirtualTable && logVirtualTableDDL {
for _, statement := range statements {
if strings.HasPrefix(statement, "ALTER") || strings.HasPrefix(statement, "CREATE") {
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {
assert.True(t, exists)
f := func(t1, t2 TableMap) {
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
assert.NoError(t, err)
j, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions quesma/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *schemaRegistry) populateSchemaFromDynamicConfiguration(indexName string
continue
}

fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType}
fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType, Origin: FieldSourceMapping}
}
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: QuesmaTypeKeyword}
}
} else {
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type}
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type, Origin: existing.Origin}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/schema/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"},
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}},
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText, Origin: schema.FieldSourceMapping}},
true, "")
resultSchema, resultFound = s.FindSchema(schema.TableName(tableName))
assert.True(t, resultFound, "schema not found")
Expand Down
11 changes: 11 additions & 0 deletions quesma/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ import (
"strings"
)

// FieldSource is an enum that represents the source of a field in the schema
type FieldSource int

const (
FieldSourceIngest FieldSource = iota
FieldSourceMapping
FieldSourceAutoDiscovery
FieldSourceStaticConfiguration
)

type (
Schema struct {
Fields map[FieldName]Field
Expand All @@ -23,6 +33,7 @@ type (
InternalPropertyName FieldName
InternalPropertyType string
Type QuesmaType
Origin FieldSource
}
TableName string
FieldName string
Expand Down