Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 99fe066

Browse files
committed
data streams too
simplify the `_resolve` algorithm by showing all the sources
1 parent 0b40e2c commit 99fe066

File tree

1 file changed

+26
-51
lines changed

1 file changed

+26
-51
lines changed

quesma/quesma/functionality/resolve/resolve.go

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,73 +4,48 @@ package resolve
44

55
import (
66
"quesma/elasticsearch"
7+
"quesma/logger"
78
"quesma/quesma/config"
89
"quesma/schema"
9-
"slices"
1010
)
1111

12+
// HandleResolve combines results from both schema.Registry (ClickHouse) and Elasticsearch,
13+
// This endpoint is used in Kibana/OSD when creating Data Views/Index Patterns.
1214
func HandleResolve(pattern string, sr schema.Registry, cfg *config.QuesmaConfiguration) (elasticsearch.Sources, error) {
13-
// In the _resolve endpoint we want to combine the results from both schema.Registry and Elasticsearch
15+
sourcesToShow := &elasticsearch.Sources{}
1416

15-
normalizedPattern := elasticsearch.NormalizePattern(pattern)
17+
normalizedPattern := elasticsearch.NormalizePattern(pattern) // changes `_all` to `*`
1618

17-
// Optimization: if it's not a pattern, let's try avoiding querying Elasticsearch - let's first try
18-
// finding that index in schema.Registry:
19-
if !elasticsearch.IsIndexPattern(normalizedPattern) {
20-
if foundSchema, found := sr.FindSchema(schema.TableName(normalizedPattern)); found {
21-
if !foundSchema.ExistsInDataSource {
22-
// index configured by the user, but not present in the data source
23-
return elasticsearch.Sources{}, nil
24-
}
25-
26-
return elasticsearch.Sources{
27-
Indices: []elasticsearch.Index{},
28-
Aliases: []elasticsearch.Alias{},
29-
DataStreams: []elasticsearch.DataStream{
30-
{
31-
Name: normalizedPattern,
32-
BackingIndices: []string{normalizedPattern},
33-
TimestampField: `@timestamp`,
34-
},
35-
},
36-
}, nil
37-
}
38-
39-
// ...index not found in schema.Registry (meaning the user did not configure it), but it could exist in Elastic
40-
}
41-
42-
// Combine results from both schema.Registry and Elasticsearch:
43-
44-
// todo avoid creating new instances all the time
45-
sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
19+
sourcesFromElasticsearch, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
4620
if err != nil {
47-
return elasticsearch.Sources{}, err
21+
logger.Warn().Msgf("Failed fetching resolving sources matching `%s`: %v", pattern, err)
22+
} else {
23+
sourcesToShow = &sourcesFromElasticsearch
4824
}
4925

50-
combineSourcesFromElasticWithRegistry(&sourcesFromElastic, sr.AllSchemas(), normalizedPattern)
51-
return sourcesFromElastic, nil
52-
}
26+
tablesFromClickHouse := getMatchingClickHouseTables(sr.AllSchemas(), normalizedPattern)
5327

54-
func combineSourcesFromElasticWithRegistry(sourcesFromElastic *elasticsearch.Sources, schemas map[schema.TableName]schema.Schema, normalizedPattern string) {
55-
sourcesFromElastic.Indices =
56-
slices.DeleteFunc(sourcesFromElastic.Indices, func(i elasticsearch.Index) bool {
57-
_, exists := schemas[schema.TableName(i.Name)]
58-
return exists
59-
})
60-
sourcesFromElastic.DataStreams = slices.DeleteFunc(sourcesFromElastic.DataStreams, func(i elasticsearch.DataStream) bool {
61-
_, exists := schemas[schema.TableName(i.Name)]
62-
return exists
63-
})
28+
addClickHouseTablesToSourcesFromElastic(sourcesToShow, tablesFromClickHouse)
29+
return *sourcesToShow, nil
30+
}
6431

32+
func getMatchingClickHouseTables(schemas map[schema.TableName]schema.Schema, normalizedPattern string) (tables []string) {
6533
for name, currentSchema := range schemas {
6634
indexName := name.AsString()
6735

6836
if config.MatchName(normalizedPattern, indexName) && currentSchema.ExistsInDataSource {
69-
sourcesFromElastic.DataStreams = append(sourcesFromElastic.DataStreams, elasticsearch.DataStream{
70-
Name: indexName,
71-
BackingIndices: []string{indexName},
72-
TimestampField: `@timestamp`,
73-
})
37+
tables = append(tables, indexName)
7438
}
7539
}
40+
return tables
41+
}
42+
43+
func addClickHouseTablesToSourcesFromElastic(sourcesFromElastic *elasticsearch.Sources, chTableNames []string) {
44+
for _, name := range chTableNames { // Quesma presents CH tables as Elasticsearch Data Streams.
45+
sourcesFromElastic.DataStreams = append(sourcesFromElastic.DataStreams, elasticsearch.DataStream{
46+
Name: name,
47+
BackingIndices: []string{name},
48+
TimestampField: `@timestamp`,
49+
})
50+
}
7651
}

0 commit comments

Comments
 (0)