Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 3114b52

Browse files
authored
Map fields discovery fix, additional logging (#1368)
This PR fixes the addition of a map field to the schema in case of subsequent query failures and adds more logging. <!-- A note on testing your PR --> <!-- Basic unit test run is executed against each commit in the PR. If you want to run a full integration test suite, you can trigger it by commenting with '/run-integration-tests' -->
1 parent 20bc8c3 commit 3114b52

File tree

2 files changed

+36
-17
lines changed

2 files changed

+36
-17
lines changed

platform/clickhouse/table_discovery.go

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -631,40 +631,58 @@ func (td *tableDiscovery) enrichTableWithMapFields(inputTable map[string]map[str
631631
for table, columns := range inputTable {
632632
for colName, columnMeta := range columns {
633633
if strings.HasPrefix(columnMeta.colType, "Map(String") {
634-
// Query ClickHouse for map keys in the given column
635-
rows, err := td.dbConnPool.Query(context.Background(), fmt.Sprintf("SELECT arrayJoin(mapKeys(%s)) FROM %s", colName, table))
636-
if err != nil {
637-
fmt.Println("Error querying map keys:", err)
638-
continue
639-
}
640-
634+
logger.Debug().Msgf("Discovered map column: %s.%s", table, colName)
641635
// Ensure the table exists in outputTable
642636
if _, ok := outputTable[table]; !ok {
643637
outputTable[table] = make(map[string]columnMetadata)
644638
}
639+
if _, ok := outputTable[table][colName]; !ok {
640+
// Update origin for incoming map column
641+
columnMeta.origin = schema.FieldSourceIngest
642+
outputTable[table][colName] = columnMeta
643+
logger.Debug().Msgf("Added column: %s.%s", table, colName)
644+
}
645645

646-
// Process returned keys and add them as virtual columns
646+
// Query ClickHouse for map keys in the given column
647+
rows, err := td.dbConnPool.Query(context.Background(), fmt.Sprintf("SELECT arrayJoin(mapKeys(%s)) FROM %s", colName, table))
648+
if err != nil {
649+
logger.Error().Msgf("Error querying map keys for table, column: %s, %s, %v", table, colName, err)
650+
continue
651+
}
652+
foundKeys := false
647653
for rows.Next() {
654+
foundKeys = true
648655
var key string
649656
if err := rows.Scan(&key); err != nil {
650-
fmt.Println("Error scanning key:", err)
657+
logger.Error().Msgf("Error scanning key for table, column: %s, %s, %v", table, colName, err)
651658
continue
652659
}
653-
// Update origin for incoming map column
654-
columnMeta.origin = schema.FieldSourceIngest
655-
outputTable[table][colName] = columnMeta
656660
// Add virtual column for each key in the map
657661
// with origin set to mapping
658-
virtualColName := colName + "." + key
659-
valueType, err := extractMapValueType(columnMeta.colType)
660-
if err == nil {
661-
outputTable[table][virtualColName] = columnMetadata{
662+
mapKeyCol := colName + "." + key
663+
var valueType string
664+
valueType, err = extractMapValueType(columnMeta.colType)
665+
if err != nil {
666+
logger.Error().Msgf("Error extracting value type for table, column: %s, %s, %v", table, colName, err)
667+
continue
668+
} else {
669+
outputTable[table][mapKeyCol] = columnMetadata{
662670
colType: valueType,
663671
origin: schema.FieldSourceMapping,
664672
}
673+
logger.Debug().Msgf("Added map key column: %s.%s", table, mapKeyCol)
665674
}
666675
}
667-
rows.Close() // Close after processing
676+
if !foundKeys {
677+
logger.Debug().Msgf("No map keys found for table, column: %s, %s", table, colName)
678+
}
679+
if err := rows.Err(); err != nil {
680+
logger.Error().Msgf("Error iterating map keys for %s.%s: %v", table, colName, err)
681+
}
682+
err = rows.Close() // Close after processing
683+
if err != nil {
684+
logger.Error().Msgf("Error closing rows for table, column: %s, %s, %v", table, colName, err)
685+
}
668686
} else {
669687
// Copy other columns as-is
670688
if _, ok := outputTable[table]; !ok {

platform/config/index_config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func (c IndexConfiguration) String(indexName string) string {
7575
if c.UseCommonTable {
7676
builder.WriteString(", useSingleTable: true")
7777
}
78+
builder.WriteString(fmt.Sprintf(", enableFieldMapSyntax: %v", c.EnableFieldMapSyntax))
7879

7980
return builder.String()
8081
}

0 commit comments

Comments
 (0)