Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 37b6fd8

Browse files
authored
Introducing map keys discovering (#1364)
This PR adds an automatic map key discovery feature needed for field caps and dashboard creation. ``` CREATE TABLE IF NOT EXISTS "foo" ( "@timestamp" DateTime64 DEFAULT now64(), `bar` Map(String, Nullable(String)) ) ENGINE = MergeTree ORDER BY ("@timestamp"); insert into foo values ('2020-01-01 00:00:00', {'a': 'b'}); insert into foo values ('2020-01-01 00:00:00', {'c': 'b'}); insert into foo values ('2020-01-01 00:00:00', {'e': 'b'}); ``` Feature is hidden behind the global flag: ``` mapFieldsDiscoveringEnabled: true ``` <img width="1688" alt="image" src="https://github.com/user-attachments/assets/0ca89753-907d-4bb0-b536-39a735b06742" /> <!-- A note on testing your PR --> <!-- Basic unit test run is executed against each commit in the PR. If you want to run a full integration test suite, you can trigger it by commenting with '/run-integration-tests' -->
1 parent e471904 commit 37b6fd8

File tree

5 files changed

+90
-10
lines changed

5 files changed

+90
-10
lines changed

platform/clickhouse/schema.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"github.com/QuesmaOrg/quesma/platform/config"
88
"github.com/QuesmaOrg/quesma/platform/logger"
9+
"github.com/QuesmaOrg/quesma/platform/schema"
910
"github.com/QuesmaOrg/quesma/platform/util"
1011
"math"
1112
"reflect"
@@ -60,6 +61,7 @@ type (
6061
Modifiers string
6162
Codec Codec // TODO currently not used, it's part of Modifiers
6263
Comment string
64+
Origin schema.FieldSource // TODO this field is just added to have way to forward information to the schema registry and should be considered as a technical debt
6365
}
6466
DateTimeType int
6567
)

platform/clickhouse/table_discovery.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/QuesmaOrg/quesma/platform/util"
1717
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
1818
"github.com/goccy/go-json"
19+
"regexp"
1920
"strings"
2021
"sync"
2122
"sync/atomic"
@@ -67,6 +68,7 @@ type columnMetadata struct {
6768
// we use it as persistent storage and load it
6869
// in the case when we don't control ingest
6970
comment string
71+
origin schema.FieldSource // TODO this field is just added to have way to forward information to the schema registry and should be considered as a technical debt
7072
}
7173

7274
func NewTableDiscovery(cfg *config.QuesmaConfiguration, dbConnPool quesma_api.BackendConnector, virtualTablesDB persistence.JSONDatabase) TableDiscovery {
@@ -115,6 +117,7 @@ func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema
115117
Name: column.Name,
116118
Type: column.Type.String(),
117119
Comment: column.Comment,
120+
Origin: column.Origin,
118121
}
119122
}
120123
table.DatabaseName = value.DatabaseName
@@ -205,6 +208,9 @@ func (td *tableDiscovery) ReloadTableDefinitions() {
205208
td.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix())
206209
return
207210
} else {
211+
if td.cfg.MapFieldsDiscoveringEnabled {
212+
tables = td.enrichTableWithMapFields(tables)
213+
}
208214
if td.AutodiscoveryEnabled() {
209215
configuredTables = td.autoConfigureTables(tables, databaseName)
210216
} else {
@@ -379,6 +385,7 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
379385
column := resolveColumn(col, columnMeta.colType)
380386
if column != nil {
381387
column.Comment = columnMeta.comment
388+
column.Origin = columnMeta.origin
382389
columnsMap[col] = column
383390
} else {
384391
logger.Warn().Msgf("column '%s.%s' type: '%s' not resolved. table will be skipped", tableName, col, columnMeta.colType)
@@ -606,6 +613,70 @@ func removePrecision(str string) string {
606613
}
607614
}
608615

616+
// extractMapValueType extracts the value type from a ClickHouse Map definition
617+
func extractMapValueType(mapType string) (string, error) {
618+
// Regular expression to match Map(String, valueType)
619+
re := regexp.MustCompile(`Map\(String,\s*([^)]+)\)`)
620+
matches := re.FindStringSubmatch(mapType)
621+
622+
if len(matches) < 2 {
623+
return "", fmt.Errorf("invalid map type format: %s", mapType)
624+
}
625+
626+
return strings.TrimSpace(matches[1]), nil
627+
}
628+
629+
func (td *tableDiscovery) enrichTableWithMapFields(inputTable map[string]map[string]columnMetadata) map[string]map[string]columnMetadata {
630+
outputTable := make(map[string]map[string]columnMetadata)
631+
for table, columns := range inputTable {
632+
for colName, columnMeta := range columns {
633+
if strings.HasPrefix(columnMeta.colType, "Map(String") {
634+
// Query ClickHouse for map keys in the given column
635+
rows, err := td.dbConnPool.Query(context.Background(), fmt.Sprintf("SELECT arrayJoin(mapKeys(%s)) FROM %s", colName, table))
636+
if err != nil {
637+
fmt.Println("Error querying map keys:", err)
638+
continue
639+
}
640+
641+
// Ensure the table exists in outputTable
642+
if _, ok := outputTable[table]; !ok {
643+
outputTable[table] = make(map[string]columnMetadata)
644+
}
645+
646+
// Process returned keys and add them as virtual columns
647+
for rows.Next() {
648+
var key string
649+
if err := rows.Scan(&key); err != nil {
650+
fmt.Println("Error scanning key:", err)
651+
continue
652+
}
653+
// Update origin for incoming map column
654+
columnMeta.origin = schema.FieldSourceIngest
655+
outputTable[table][colName] = columnMeta
656+
// Add virtual column for each key in the map
657+
// with origin set to mapping
658+
virtualColName := colName + "." + key
659+
valueType, err := extractMapValueType(columnMeta.colType)
660+
if err == nil {
661+
outputTable[table][virtualColName] = columnMetadata{
662+
colType: valueType,
663+
origin: schema.FieldSourceMapping,
664+
}
665+
}
666+
}
667+
rows.Close() // Close after processing
668+
} else {
669+
// Copy other columns as-is
670+
if _, ok := outputTable[table]; !ok {
671+
outputTable[table] = make(map[string]columnMetadata)
672+
}
673+
outputTable[table][colName] = columnMeta
674+
}
675+
}
676+
}
677+
return outputTable
678+
}
679+
609680
func (td *tableDiscovery) readTables(database string) (map[string]map[string]columnMetadata, error) {
610681
logger.Debug().Msgf("describing tables: %s", database)
611682

platform/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type QuesmaConfiguration struct {
5656
DefaultQueryTarget []string
5757
DefaultIngestOptimizers map[string]OptimizerConfiguration
5858
DefaultQueryOptimizers map[string]OptimizerConfiguration
59+
MapFieldsDiscoveringEnabled bool
5960
}
6061

6162
func (c *QuesmaConfiguration) AliasFields(indexName string) map[string]string {
@@ -264,6 +265,7 @@ Quesma Configuration:
264265
UseCommonTableForWildcard: %t,
265266
DefaultIngestTarget: %v,
266267
DefaultQueryTarget: %v,
268+
MapFieldsDiscoveringEnabled: %t
267269
`,
268270
c.TransparentProxy,
269271
elasticUrl,
@@ -285,6 +287,7 @@ Quesma Configuration:
285287
c.UseCommonTableForWildcard,
286288
c.DefaultIngestTarget,
287289
c.DefaultQueryTarget,
290+
c.MapFieldsDiscoveringEnabled,
288291
)
289292
}
290293

platform/config/config_v2.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,16 @@ const (
3939
)
4040

4141
type QuesmaNewConfiguration struct {
42-
BackendConnectors []BackendConnector `koanf:"backendConnectors"`
43-
FrontendConnectors []FrontendConnector `koanf:"frontendConnectors"`
44-
InstallationId string `koanf:"installationId"`
45-
LicenseKey string `koanf:"licenseKey"`
46-
Logging LoggingConfiguration `koanf:"logging"`
47-
IngestStatistics bool `koanf:"ingestStatistics"`
48-
Processors []Processor `koanf:"processors"`
49-
Pipelines []Pipeline `koanf:"pipelines"`
50-
DisableTelemetry bool `koanf:"disableTelemetry"`
42+
BackendConnectors []BackendConnector `koanf:"backendConnectors"`
43+
FrontendConnectors []FrontendConnector `koanf:"frontendConnectors"`
44+
InstallationId string `koanf:"installationId"`
45+
LicenseKey string `koanf:"licenseKey"`
46+
Logging LoggingConfiguration `koanf:"logging"`
47+
IngestStatistics bool `koanf:"ingestStatistics"`
48+
Processors []Processor `koanf:"processors"`
49+
Pipelines []Pipeline `koanf:"pipelines"`
50+
DisableTelemetry bool `koanf:"disableTelemetry"`
51+
MapFieldsDiscoveringEnabled bool `koanf:"mapFieldsDiscoveringEnabled"`
5152
}
5253

5354
type LoggingConfiguration struct {
@@ -576,6 +577,8 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
576577
conf.InstallationId = c.InstallationId
577578
conf.LicenseKey = c.LicenseKey
578579

580+
conf.MapFieldsDiscoveringEnabled = c.MapFieldsDiscoveringEnabled
581+
579582
conf.AutodiscoveryEnabled = false
580583
conf.Connectors = make(map[string]RelationalDbConfiguration)
581584
relDBConn, connType, relationalDBErr := c.getRelationalDBConf()

platform/schema/registry.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type (
6565
Name string
6666
Type string // FIXME: change to schema.Type
6767
Comment string
68+
Origin FieldSource // TODO this field is just added to have way to forward information to the schema registry and should be considered as a technical debt
6869
}
6970
)
7071

@@ -327,7 +328,7 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin
327328
}
328329
if existing, exists := fields[propertyName]; !exists {
329330
if quesmaType, resolved := s.dataSourceTypeAdapter.Convert(column.Type); resolved {
330-
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: quesmaType}
331+
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: quesmaType, Origin: column.Origin}
331332
} else {
332333
logger.Debug().Msgf("type %s not supported, falling back to keyword", column.Type)
333334
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: QuesmaTypeKeyword}

0 commit comments

Comments
 (0)