Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 749062d

Browse files
committed
Remove leftovers
1 parent 6df737d commit 749062d

File tree

3 files changed

+27
-40
lines changed

3 files changed

+27
-40
lines changed

quesma/ingest/alter_table_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestAlterTable(t *testing.T) {
5151

5252
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
5353
for i := range rowsToInsert {
54-
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
54+
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
5555
assert.NoError(t, err)
5656
insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
5757
assert.Equal(t, expectedInsert[i], insert)
@@ -130,7 +130,7 @@ func TestAlterTableHeuristic(t *testing.T) {
130130

131131
assert.Equal(t, int64(0), ip.ingestCounter)
132132
for i := range rowsToInsert {
133-
_, _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
133+
_, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
134134
assert.NoError(t, err)
135135
}
136136
assert.Equal(t, tc.expected, len(table.Cols))

quesma/ingest/processor.go

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -286,13 +286,6 @@ func getAttributesByArrayName(arrayName string,
286286
return attributes
287287
}
288288

289-
type AlterDDL struct {
290-
tableName string
291-
columnName string
292-
columnType string
293-
comment string
294-
}
295-
296289
// This function generates ALTER TABLE commands for adding new columns
297290
// to the table based on the attributesMap and the table name
298291
// AttributesMap contains the attributes that are not part of the schema
@@ -302,12 +295,12 @@ func (ip *IngestProcessor) generateNewColumns(
302295
attrsMap map[string][]interface{},
303296
table *chLib.Table,
304297
alteredAttributesIndexes []int,
305-
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL) {
298+
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) []string {
306299
var alterCmd []string
307-
alterDDLMap := make(map[string]AlterDDL)
308300
attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap)
309301
attrTypes := getAttributesByArrayName(chLib.DeprecatedAttributesValueType, attrsMap)
310302
var deleteIndexes []int
303+
311304
reverseMap := reverseFieldEncoding(encodings, table.Name)
312305

313306
// HACK Alert:
@@ -346,7 +339,6 @@ func (ip *IngestProcessor) generateNewColumns(
346339
alterCmd = append(alterCmd, alterTable)
347340

348341
alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment)
349-
alterDDLMap[attrKeys[i]] = AlterDDL{tableName: table.Name, columnName: attrKeys[i], columnType: columnType, comment: comment}
350342
alterCmd = append(alterCmd, alterColumn)
351343

352344
deleteIndexes = append(deleteIndexes, i)
@@ -366,7 +358,7 @@ func (ip *IngestProcessor) generateNewColumns(
366358
attrsMap[chLib.DeprecatedAttributesValueType] = append(attrsMap[chLib.DeprecatedAttributesValueType][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueType][deleteIndexes[i]+1:]...)
367359
attrsMap[chLib.DeprecatedAttributesValueColumn] = append(attrsMap[chLib.DeprecatedAttributesValueColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueColumn][deleteIndexes[i]+1:]...)
368360
}
369-
return alterCmd, alterDDLMap
361+
return alterCmd
370362
}
371363

372364
// This struct contains the information about the columns that aren't part of the schema
@@ -504,39 +496,39 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
504496
data types.JSON,
505497
inValidJson types.JSON,
506498
config *chLib.ChTableConfig,
507-
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL, types.JSON, []NonSchemaField, error) {
499+
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) {
508500

509501
jsonAsBytesSlice, err := json.Marshal(data)
510502

511503
if err != nil {
512-
return nil, nil, nil, nil, err
504+
return nil, nil, nil, err
513505
}
514506

515507
// we find all non-schema fields
516508
jsonMap, err := types.ParseJSON(string(jsonAsBytesSlice))
517509
if err != nil {
518-
return nil, nil, nil, nil, err
510+
return nil, nil, nil, err
519511
}
520512

521513
if len(config.Attributes) == 0 {
522-
return nil, nil, jsonMap, nil, nil
514+
return nil, jsonMap, nil, nil
523515
}
524516

525517
schemaFieldsJson, err := json.Marshal(jsonMap)
526518

527519
if err != nil {
528-
return nil, nil, jsonMap, nil, err
520+
return nil, jsonMap, nil, err
529521
}
530522

531523
mDiff := DifferenceMap(jsonMap, table) // TODO change to DifferenceMap(m, t)
532524

533525
if len(mDiff) == 0 && string(schemaFieldsJson) == string(jsonAsBytesSlice) && len(inValidJson) == 0 { // no need to modify, just insert 'js'
534-
return nil, nil, jsonMap, nil, nil
526+
return nil, jsonMap, nil, nil
535527
}
536528

537529
// check attributes precondition
538530
if len(config.Attributes) <= 0 {
539-
return nil, nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
531+
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
540532
}
541533
attrsMap, _ := BuildAttrsMap(mDiff, config)
542534

@@ -546,10 +538,9 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
546538
// we only want to add fields that are not part of the schema e.g we don't
547539
// have columns for them
548540
var alterCmd []string
549-
alterDDLMap := make(map[string]AlterDDL)
550541
atomic.AddInt64(&ip.ingestCounter, 1)
551542
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
552-
alterCmd, alterDDLMap = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
543+
alterCmd = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
553544
}
554545
// If there are some invalid fields, we need to add them to the attributes map
555546
// to not lose them and be able to store them later by
@@ -560,12 +551,12 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
560551
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)
561552

562553
if err != nil {
563-
return nil, nil, nil, nil, err
554+
return nil, nil, nil, err
564555
}
565556

566557
onlySchemaFields := RemoveNonSchemaFields(jsonMap, table)
567558

568-
return alterCmd, alterDDLMap, onlySchemaFields, nonSchemaFields, nil
559+
return alterCmd, onlySchemaFields, nonSchemaFields, nil
569560
}
570561

571562
func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
@@ -607,7 +598,7 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.
607598
func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
608599
tableName string,
609600
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
610-
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, map[string]AlterDDL, error) {
601+
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) {
611602
// this is pre ingest transformer
612603
// here we transform the data before it's structure evaluation and insertion
613604
//
@@ -616,7 +607,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
616607
for _, jsonValue := range jsonData {
617608
result, err := preIngestTransformer.Transform(jsonValue)
618609
if err != nil {
619-
return nil, nil, fmt.Errorf("error while rewriting json: %v", err)
610+
return nil, fmt.Errorf("error while rewriting json: %v", err)
620611
}
621612
processed = append(processed, result)
622613
}
@@ -670,7 +661,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
670661
createTableCmd, err = ip.createTableObjectAndAttributes(ctx, createTableCmd, tableConfig, tableName, tableDefinitionChangeOnly)
671662
if err != nil {
672663
logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err)
673-
return nil, nil, err
664+
return nil, err
674665
}
675666
// Set pointer to table after creating it
676667
table = ip.FindTable(tableName)
@@ -680,38 +671,34 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
680671
tableConfig = table.Config
681672
var jsonsReadyForInsertion []string
682673
var alterCmd []string
683-
alterDDLMapGlobal := make(map[string]AlterDDL)
684674
var preprocessedJsons []types.JSON
685675
var invalidJsons []types.JSON
686676
preprocessedJsons, invalidJsons, err := ip.preprocessJsons(ctx, table.Name, jsonData, transformer)
687677
if err != nil {
688-
return nil, nil, fmt.Errorf("error preprocessJsons: %v", err)
678+
return nil, fmt.Errorf("error preprocessJsons: %v", err)
689679
}
690680
for i, preprocessedJson := range preprocessedJsons {
691-
alter, alterDDLMap, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
681+
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
692682
invalidJsons[i], tableConfig, encodings)
693683

694684
if err != nil {
695-
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
685+
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
696686
}
697687
insertJson, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
698688
if err != nil {
699-
return nil, nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
689+
return nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
700690
}
701691
alterCmd = append(alterCmd, alter...)
702-
for key, value := range alterDDLMap {
703-
alterDDLMapGlobal[key] = value
704-
}
705692
if err != nil {
706-
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
693+
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
707694
}
708695
jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson)
709696
}
710697

711698
insertValues := strings.Join(jsonsReadyForInsertion, ", ")
712699
insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues)
713700

714-
return generateSqlStatements(createTableCmd, alterCmd, insert), alterDDLMapGlobal, nil
701+
return generateSqlStatements(createTableCmd, alterCmd, insert), nil
715702
}
716703

717704
func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string,
@@ -735,6 +722,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
735722
// we ignore an error here, because we want to process the data and don't lose it
736723
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
737724
}
725+
738726
pipeline := jsonprocessor.IngestTransformerPipeline{}
739727
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
740728
pipeline = append(pipeline, transformer)
@@ -755,8 +743,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
755743
func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string,
756744
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
757745
tableFormatter TableColumNameFormatter, isVirtualTable bool) error {
758-
759-
statements, _, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
746+
statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
760747
if err != nil {
761748
return err
762749
}

quesma/ingest/processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {
7272
assert.True(t, exists)
7373
f := func(t1, t2 TableMap) {
7474
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
75-
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
75+
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
7676
assert.NoError(t, err)
7777
j, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
7878
assert.NoError(t, err)

0 commit comments

Comments
 (0)