Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit f5f241b

Browse files
committed
Refining GenerateIngestContent signature
1 parent 53b661a commit f5f241b

File tree

3 files changed

+14
-13
lines changed

3 files changed

+14
-13
lines changed

platform/ingest/alter_table_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@ func TestAlterTable(t *testing.T) {
4141
}
4242
columns := []string{"Test1", "Test2"}
4343
table := &clickhouse.Table{
44-
Name: "tableName",
45-
Cols: map[string]*clickhouse.Column{},
44+
Name: "tableName",
45+
Cols: map[string]*clickhouse.Column{},
46+
Config: chConfig,
4647
}
4748
fieldsMap := util.NewSyncMapWith("tableName", table)
4849

4950
encodings := make(map[schema.FieldEncodingKey]schema.EncodedFieldName)
5051

5152
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
5253
for i := range rowsToInsert {
53-
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
54+
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings)
5455
assert.NoError(t, err)
5556
insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
5657
assert.Equal(t, expectedInsert[i], insert)
@@ -103,8 +104,9 @@ func TestAlterTableHeuristic(t *testing.T) {
103104
for _, tc := range testcases {
104105
const tableName = "tableName"
105106
table := &clickhouse.Table{
106-
Name: tableName,
107-
Cols: map[string]*clickhouse.Column{},
107+
Name: tableName,
108+
Cols: map[string]*clickhouse.Column{},
109+
Config: chConfig,
108110
}
109111
fieldsMap := util.NewSyncMapWith(tableName, table)
110112
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
@@ -128,7 +130,7 @@ func TestAlterTableHeuristic(t *testing.T) {
128130

129131
assert.Equal(t, int64(0), ip.ingestCounter)
130132
for i := range rowsToInsert {
131-
_, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
133+
_, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, encodings)
132134
assert.NoError(t, err)
133135
}
134136
assert.Equal(t, tc.expected, len(table.Cols))

platform/ingest/processor.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -541,10 +541,9 @@ func (ip *IngestProcessor) shouldAlterColumns(table *chLib.Table, attrsMap map[s
541541
func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
542542
data types.JSON,
543543
inValidJson types.JSON,
544-
config *chLib.ChTableConfig,
545544
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) {
546545

547-
if len(config.Attributes) == 0 {
546+
if len(table.Config.Attributes) == 0 {
548547
return nil, data, nil, nil
549548
}
550549

@@ -555,10 +554,10 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
555554
}
556555

557556
// check attributes precondition
558-
if len(config.Attributes) <= 0 {
557+
if len(table.Config.Attributes) <= 0 {
559558
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
560559
}
561-
attrsMap, _ := BuildAttrsMap(mDiff, config)
560+
attrsMap, _ := BuildAttrsMap(mDiff, table.Config)
562561

563562
// generateNewColumns is called on original attributes map
564563
// before adding invalid fields to it
@@ -708,7 +707,6 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
708707
if table == nil {
709708
return nil, fmt.Errorf("table %s not found", tableName)
710709
}
711-
tableConfig = table.Config
712710
var jsonsReadyForInsertion []string
713711
var alterCmd []string
714712
var validatedJsons []types.JSON
@@ -719,7 +717,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
719717
}
720718
for i, preprocessedJson := range validatedJsons {
721719
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
722-
invalidJsons[i], tableConfig, encodings)
720+
invalidJsons[i], encodings)
723721

724722
if err != nil {
725723
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)

platform/ingest/processor_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,11 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {
6868
encodings := make(map[schema.FieldEncodingKey]schema.EncodedFieldName)
6969

7070
tableName, exists := fieldsMap.Load("tableName")
71+
tableName.Config = hasOthersConfig
7172
assert.True(t, exists)
7273
f := func(t1, t2 TableMap) {
7374
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
74-
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
75+
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, encodings)
7576
assert.NoError(t, err)
7677
j, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
7778
assert.NoError(t, err)

0 commit comments

Comments
 (0)