diff --git a/platform/ingest/alter_table_test.go b/platform/ingest/alter_table_test.go index 21cc7e502..584f491c1 100644 --- a/platform/ingest/alter_table_test.go +++ b/platform/ingest/alter_table_test.go @@ -41,8 +41,9 @@ func TestAlterTable(t *testing.T) { } columns := []string{"Test1", "Test2"} table := &clickhouse.Table{ - Name: "tableName", - Cols: map[string]*clickhouse.Column{}, + Name: "tableName", + Cols: map[string]*clickhouse.Column{}, + Config: chConfig, } fieldsMap := util.NewSyncMapWith("tableName", table) @@ -50,7 +51,7 @@ func TestAlterTable(t *testing.T) { ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) for i := range rowsToInsert { - alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings) + alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings) assert.NoError(t, err) insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields) assert.Equal(t, expectedInsert[i], insert) @@ -103,8 +104,9 @@ func TestAlterTableHeuristic(t *testing.T) { for _, tc := range testcases { const tableName = "tableName" table := &clickhouse.Table{ - Name: tableName, - Cols: map[string]*clickhouse.Column{}, + Name: tableName, + Cols: map[string]*clickhouse.Column{}, + Config: chConfig, } fieldsMap := util.NewSyncMapWith(tableName, table) ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) @@ -128,7 +130,7 @@ func TestAlterTableHeuristic(t *testing.T) { assert.Equal(t, int64(0), ip.ingestCounter) for i := range rowsToInsert { - _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings) + _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings) assert.NoError(t, err) } assert.Equal(t, tc.expected, len(table.Cols)) diff --git a/platform/ingest/processor.go b/platform/ingest/processor.go index b4d230829..6cdb8bbdf 100644 --- a/platform/ingest/processor.go +++ b/platform/ingest/processor.go @@ -541,10 +541,9 @@ func (ip *IngestProcessor) shouldAlterColumns(table *chLib.Table, attrsMap map[s func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table, data types.JSON, inValidJson types.JSON, - config *chLib.ChTableConfig, encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) { - if len(config.Attributes) == 0 { + if len(table.Config.Attributes) == 0 { return nil, data, nil, nil } @@ -555,10 +554,10 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table, } // check attributes precondition - if len(config.Attributes) <= 0 { + if len(table.Config.Attributes) <= 0 { return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff) } - attrsMap, _ := BuildAttrsMap(mDiff, config) + attrsMap, _ := BuildAttrsMap(mDiff, table.Config) // generateNewColumns is called on original attributes map // before adding invalid fields to it @@ -708,7 +707,6 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, if table == nil { return nil, fmt.Errorf("table %s not found", tableName) } - tableConfig = table.Config var jsonsReadyForInsertion []string var alterCmd []string var validatedJsons []types.JSON @@ -719,7 +717,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, } for i, preprocessedJson := range validatedJsons { alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson, - invalidJsons[i], tableConfig, encodings) + invalidJsons[i], encodings) if err != nil { return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err) diff --git a/platform/ingest/processor_test.go b/platform/ingest/processor_test.go index 8bb6d2826..4c9a7f20f 100644 --- a/platform/ingest/processor_test.go +++ b/platform/ingest/processor_test.go @@ -68,10 +68,11 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) { encodings := make(map[schema.FieldEncodingKey]schema.EncodedFieldName) tableName, exists := fieldsMap.Load("tableName") + tableName.Config = hasOthersConfig assert.True(t, exists) f := func(t1, t2 TableMap) { ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) - alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings) + alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, encodings) assert.NoError(t, err) j, err := generateInsertJson(nonSchemaFields, onlySchemaFields) assert.NoError(t, err)