@@ -286,13 +286,6 @@ func getAttributesByArrayName(arrayName string,
286286 return attributes
287287}
288288
289- type AlterDDL struct {
290- tableName string
291- columnName string
292- columnType string
293- comment string
294- }
295-
296289// This function generates ALTER TABLE commands for adding new columns
297290// to the table based on the attributesMap and the table name
298291// AttributesMap contains the attributes that are not part of the schema
@@ -302,12 +295,12 @@ func (ip *IngestProcessor) generateNewColumns(
302295 attrsMap map [string ][]interface {},
303296 table * chLib.Table ,
304297 alteredAttributesIndexes []int ,
305- encodings map [schema.FieldEncodingKey ]schema.EncodedFieldName ) ( []string , map [ string ] AlterDDL ) {
298+ encodings map [schema.FieldEncodingKey ]schema.EncodedFieldName ) []string {
306299 var alterCmd []string
307- alterDDLMap := make (map [string ]AlterDDL )
308300 attrKeys := getAttributesByArrayName (chLib .DeprecatedAttributesKeyColumn , attrsMap )
309301 attrTypes := getAttributesByArrayName (chLib .DeprecatedAttributesValueType , attrsMap )
310302 var deleteIndexes []int
303+
311304 reverseMap := reverseFieldEncoding (encodings , table .Name )
312305
313306 // HACK Alert:
@@ -346,7 +339,6 @@ func (ip *IngestProcessor) generateNewColumns(
346339 alterCmd = append (alterCmd , alterTable )
347340
348341 alterColumn := fmt .Sprintf ("ALTER TABLE \" %s\" COMMENT COLUMN \" %s\" '%s'" , table .Name , attrKeys [i ], comment )
349- alterDDLMap [attrKeys [i ]] = AlterDDL {tableName : table .Name , columnName : attrKeys [i ], columnType : columnType , comment : comment }
350342 alterCmd = append (alterCmd , alterColumn )
351343
352344 deleteIndexes = append (deleteIndexes , i )
@@ -366,7 +358,7 @@ func (ip *IngestProcessor) generateNewColumns(
366358 attrsMap [chLib .DeprecatedAttributesValueType ] = append (attrsMap [chLib .DeprecatedAttributesValueType ][:deleteIndexes [i ]], attrsMap [chLib .DeprecatedAttributesValueType ][deleteIndexes [i ]+ 1 :]... )
367359 attrsMap [chLib .DeprecatedAttributesValueColumn ] = append (attrsMap [chLib .DeprecatedAttributesValueColumn ][:deleteIndexes [i ]], attrsMap [chLib .DeprecatedAttributesValueColumn ][deleteIndexes [i ]+ 1 :]... )
368360 }
369- return alterCmd , alterDDLMap
361+ return alterCmd
370362}
371363
372364// This struct contains the information about the columns that aren't part of the schema
@@ -504,39 +496,39 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
504496 data types.JSON ,
505497 inValidJson types.JSON ,
506498 config * chLib.ChTableConfig ,
507- encodings map [schema.FieldEncodingKey ]schema.EncodedFieldName ) ([]string , map [ string ] AlterDDL , types.JSON , []NonSchemaField , error ) {
499+ encodings map [schema.FieldEncodingKey ]schema.EncodedFieldName ) ([]string , types.JSON , []NonSchemaField , error ) {
508500
509501 jsonAsBytesSlice , err := json .Marshal (data )
510502
511503 if err != nil {
512- return nil , nil , nil , nil , err
504+ return nil , nil , nil , err
513505 }
514506
515507 // we find all non-schema fields
516508 jsonMap , err := types .ParseJSON (string (jsonAsBytesSlice ))
517509 if err != nil {
518- return nil , nil , nil , nil , err
510+ return nil , nil , nil , err
519511 }
520512
521513 if len (config .Attributes ) == 0 {
522- return nil , nil , jsonMap , nil , nil
514+ return nil , jsonMap , nil , nil
523515 }
524516
525517 schemaFieldsJson , err := json .Marshal (jsonMap )
526518
527519 if err != nil {
528- return nil , nil , jsonMap , nil , err
520+ return nil , jsonMap , nil , err
529521 }
530522
531523 mDiff := DifferenceMap (jsonMap , table ) // TODO change to DifferenceMap(m, t)
532524
533525 if len (mDiff ) == 0 && string (schemaFieldsJson ) == string (jsonAsBytesSlice ) && len (inValidJson ) == 0 { // no need to modify, just insert 'js'
534- return nil , nil , jsonMap , nil , nil
526+ return nil , jsonMap , nil , nil
535527 }
536528
537529 // check attributes precondition
538530 if len (config .Attributes ) <= 0 {
539- return nil , nil , nil , nil , fmt .Errorf ("no attributes config, but received non-schema fields: %s" , mDiff )
531+ return nil , nil , nil , fmt .Errorf ("no attributes config, but received non-schema fields: %s" , mDiff )
540532 }
541533 attrsMap , _ := BuildAttrsMap (mDiff , config )
542534
@@ -546,10 +538,9 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
546538 // we only want to add fields that are not part of the schema e.g we don't
547539 // have columns for them
548540 var alterCmd []string
549- alterDDLMap := make (map [string ]AlterDDL )
550541 atomic .AddInt64 (& ip .ingestCounter , 1 )
551542 if ok , alteredAttributesIndexes := ip .shouldAlterColumns (table , attrsMap ); ok {
552- alterCmd , alterDDLMap = ip .generateNewColumns (attrsMap , table , alteredAttributesIndexes , encodings )
543+ alterCmd = ip .generateNewColumns (attrsMap , table , alteredAttributesIndexes , encodings )
553544 }
554545 // If there are some invalid fields, we need to add them to the attributes map
555546 // to not lose them and be able to store them later by
@@ -560,12 +551,12 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
560551 nonSchemaFields , err := generateNonSchemaFields (attrsMapWithInvalidFields )
561552
562553 if err != nil {
563- return nil , nil , nil , nil , err
554+ return nil , nil , nil , err
564555 }
565556
566557 onlySchemaFields := RemoveNonSchemaFields (jsonMap , table )
567558
568- return alterCmd , alterDDLMap , onlySchemaFields , nonSchemaFields , nil
559+ return alterCmd , onlySchemaFields , nonSchemaFields , nil
569560}
570561
571562func generateInsertJson (nonSchemaFields []NonSchemaField , onlySchemaFields types.JSON ) (string , error ) {
@@ -607,7 +598,7 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.
607598func (ip * IngestProcessor ) processInsertQuery (ctx context.Context ,
608599 tableName string ,
609600 jsonData []types.JSON , transformer jsonprocessor.IngestTransformer ,
610- tableFormatter TableColumNameFormatter , tableDefinitionChangeOnly bool ) ([]string , map [ string ] AlterDDL , error ) {
601+ tableFormatter TableColumNameFormatter , tableDefinitionChangeOnly bool ) ([]string , error ) {
611602 // this is pre ingest transformer
612603 // here we transform the data before it's structure evaluation and insertion
613604 //
@@ -616,7 +607,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
616607 for _ , jsonValue := range jsonData {
617608 result , err := preIngestTransformer .Transform (jsonValue )
618609 if err != nil {
619- return nil , nil , fmt .Errorf ("error while rewriting json: %v" , err )
610+ return nil , fmt .Errorf ("error while rewriting json: %v" , err )
620611 }
621612 processed = append (processed , result )
622613 }
@@ -670,7 +661,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
670661 createTableCmd , err = ip .createTableObjectAndAttributes (ctx , createTableCmd , tableConfig , tableName , tableDefinitionChangeOnly )
671662 if err != nil {
672663 logger .ErrorWithCtx (ctx ).Msgf ("error createTableObjectAndAttributes, can't create table: %v" , err )
673- return nil , nil , err
664+ return nil , err
674665 }
675666 // Set pointer to table after creating it
676667 table = ip .FindTable (tableName )
@@ -680,38 +671,34 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
680671 tableConfig = table .Config
681672 var jsonsReadyForInsertion []string
682673 var alterCmd []string
683- alterDDLMapGlobal := make (map [string ]AlterDDL )
684674 var preprocessedJsons []types.JSON
685675 var invalidJsons []types.JSON
686676 preprocessedJsons , invalidJsons , err := ip .preprocessJsons (ctx , table .Name , jsonData , transformer )
687677 if err != nil {
688- return nil , nil , fmt .Errorf ("error preprocessJsons: %v" , err )
678+ return nil , fmt .Errorf ("error preprocessJsons: %v" , err )
689679 }
690680 for i , preprocessedJson := range preprocessedJsons {
691- alter , alterDDLMap , onlySchemaFields , nonSchemaFields , err := ip .GenerateIngestContent (table , preprocessedJson ,
681+ alter , onlySchemaFields , nonSchemaFields , err := ip .GenerateIngestContent (table , preprocessedJson ,
692682 invalidJsons [i ], tableConfig , encodings )
693683
694684 if err != nil {
695- return nil , nil , fmt .Errorf ("error BuildInsertJson, tablename: '%s' : %v" , table .Name , err )
685+ return nil , fmt .Errorf ("error BuildInsertJson, tablename: '%s' : %v" , table .Name , err )
696686 }
697687 insertJson , err := generateInsertJson (nonSchemaFields , onlySchemaFields )
698688 if err != nil {
699- return nil , nil , fmt .Errorf ("error generatateInsertJson, tablename: '%s' json: '%s': %v" , table .Name , PrettyJson (insertJson ), err )
689+ return nil , fmt .Errorf ("error generatateInsertJson, tablename: '%s' json: '%s': %v" , table .Name , PrettyJson (insertJson ), err )
700690 }
701691 alterCmd = append (alterCmd , alter ... )
702- for key , value := range alterDDLMap {
703- alterDDLMapGlobal [key ] = value
704- }
705692 if err != nil {
706- return nil , nil , fmt .Errorf ("error BuildInsertJson, tablename: '%s' json: '%s': %v" , table .Name , PrettyJson (insertJson ), err )
693+ return nil , fmt .Errorf ("error BuildInsertJson, tablename: '%s' json: '%s': %v" , table .Name , PrettyJson (insertJson ), err )
707694 }
708695 jsonsReadyForInsertion = append (jsonsReadyForInsertion , insertJson )
709696 }
710697
711698 insertValues := strings .Join (jsonsReadyForInsertion , ", " )
712699 insert := fmt .Sprintf ("INSERT INTO \" %s\" FORMAT JSONEachRow %s" , table .Name , insertValues )
713700
714- return generateSqlStatements (createTableCmd , alterCmd , insert ), alterDDLMapGlobal , nil
701+ return generateSqlStatements (createTableCmd , alterCmd , insert ), nil
715702}
716703
717704func (lm * IngestProcessor ) ProcessInsertQuery (ctx context.Context , tableName string ,
@@ -755,8 +742,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
755742func (ip * IngestProcessor ) processInsertQueryInternal (ctx context.Context , tableName string ,
756743 jsonData []types.JSON , transformer jsonprocessor.IngestTransformer ,
757744 tableFormatter TableColumNameFormatter , isVirtualTable bool ) error {
758-
759- statements , _ , err := ip .processInsertQuery (ctx , tableName , jsonData , transformer , tableFormatter , isVirtualTable )
745+ statements , err := ip .processInsertQuery (ctx , tableName , jsonData , transformer , tableFormatter , isVirtualTable )
760746 if err != nil {
761747 return err
762748 }
0 commit comments