Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ processors:
target: [ c ]
logs-3:
target: [ c, e ]
logs-dual-query:
## WARNING `logs-dual-query` (with two dashes) falls under default index pattern for logs in Elasticsearch and results in not index, but datastream creation
logs-dual_query:
target: [ c, e ]
logs-4:
target:
Expand All @@ -52,7 +53,7 @@ processors:
target: [ c ]
logs-3:
target: [ c, e ]
logs-dual-query:
logs-dual_query:
target: [ c, e ]
logs-4:
target:
Expand Down
64 changes: 60 additions & 4 deletions ci/it/testcases/test_dual_write_and_common_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testi
t.Run("test dual query returns data from clickhouse", func(t *testing.T) { a.testDualQueryReturnsDataFromClickHouse(ctx, t) })
t.Run("test dual writes work", func(t *testing.T) { a.testDualWritesWork(ctx, t) })
t.Run("test wildcard goes to elastic", func(t *testing.T) { a.testWildcardGoesToElastic(ctx, t) })
t.Run("test _resolve/index/* works properly", func(t *testing.T) { a.testResolveEndpointInQuesma(ctx, t) })
return nil
}

Expand Down Expand Up @@ -100,10 +101,10 @@ func (a *DualWriteAndCommonTableTestcase) testIngestToCommonTableWorks(ctx conte
}

func (a *DualWriteAndCommonTableTestcase) testDualQueryReturnsDataFromClickHouse(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "POST", "/logs-dual-query/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
resp, _ := a.RequestToQuesma(ctx, t, "POST", "/logs-dual_query/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)

chQuery := "SELECT * FROM 'logs-dual-query'"
chQuery := "SELECT * FROM 'logs-dual_query'"
rows, err := a.ExecuteClickHouseQuery(ctx, chQuery)
if err != nil {
t.Fatalf("Failed to execute query: %s", err)
Expand Down Expand Up @@ -140,12 +141,12 @@ func (a *DualWriteAndCommonTableTestcase) testDualQueryReturnsDataFromClickHouse
assert.Equal(t, 31337, age)

// In the meantime let's delete the index from Elasticsearch
_, _ = a.RequestToElasticsearch(ctx, "DELETE", "/logs-dual-query", nil)
resp, _ = a.RequestToElasticsearch(ctx, "DELETE", "/logs-dual_query", nil)
if err != nil {
t.Fatalf("Failed to make DELETE request: %s", err)
}
// FINAL TEST - WHETHER QUESMA RETURNS DATA FROM CLICKHOUSE
resp, bodyBytes := a.RequestToQuesma(ctx, t, "GET", "/logs-dual-query/_search", []byte(`{"query": {"match_all": {}}}`))
resp, bodyBytes := a.RequestToQuesma(ctx, t, "GET", "/logs-dual_query/_search", []byte(`{"query": {"match_all": {}}}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Contains(t, string(bodyBytes), "Przemyslaw")
assert.Contains(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
Expand Down Expand Up @@ -291,3 +292,58 @@ func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context.
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

func (a *DualWriteAndCommonTableTestcase) testResolveEndpointInQuesma(ctx context.Context, t *testing.T) {
// When Quesma searches for that document
resp, bodyBytes := a.RequestToQuesma(ctx, t, "GET", "/_resolve/index/*", nil)

var jsonResponse map[string]interface{}
if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil {
t.Fatalf("Failed to unmarshal response body: %s", err)
}

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
expectedResponse := map[string]interface{}{
"indices": []interface{}{
map[string]interface{}{
"name": "logs-3",
"attributes": []interface{}{"open"},
},
map[string]interface{}{
"name": "quesma_virtual_tables",
"attributes": []interface{}{"open"},
},
map[string]interface{}{
"name": "unmentioned_index",
"attributes": []interface{}{"open"},
},
},
"aliases": []interface{}{},
"data_streams": []interface{}{
map[string]interface{}{
"name": "logs-2",
"backing_indices": []interface{}{"logs-2"},
"timestamp_field": "@timestamp",
},
map[string]interface{}{
"name": "logs-3",
"backing_indices": []interface{}{"logs-3"},
"timestamp_field": "@timestamp",
},
map[string]interface{}{
"name": "logs-4",
"backing_indices": []interface{}{"logs-4"},
"timestamp_field": "@timestamp",
},
map[string]interface{}{
"name": "logs-dual_query",
"backing_indices": []interface{}{"logs-dual_query"},
"timestamp_field": "@timestamp",
},
},
}
assert.ElementsMatchf(t, expectedResponse["indices"], jsonResponse["indices"], "indices do not match")
assert.ElementsMatchf(t, expectedResponse["aliases"], jsonResponse["aliases"], "aliases do not match")
assert.ElementsMatchf(t, expectedResponse["data_streams"], jsonResponse["data_streams"], "data_streams do not match")
}
77 changes: 26 additions & 51 deletions quesma/quesma/functionality/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,48 @@ package resolve

import (
"quesma/elasticsearch"
"quesma/logger"
"quesma/quesma/config"
"quesma/schema"
"slices"
)

// HandleResolve combines results from both schema.Registry (ClickHouse) and Elasticsearch,
// This endpoint is used in Kibana/OSD when creating Data Views/Index Patterns.
func HandleResolve(pattern string, sr schema.Registry, cfg *config.QuesmaConfiguration) (elasticsearch.Sources, error) {
// In the _resolve endpoint we want to combine the results from both schema.Registry and Elasticsearch
sourcesToShow := &elasticsearch.Sources{}

normalizedPattern := elasticsearch.NormalizePattern(pattern)
normalizedPattern := elasticsearch.NormalizePattern(pattern) // changes `_all` to `*`

// Optimization: if it's not a pattern, let's try avoiding querying Elasticsearch - let's first try
// finding that index in schema.Registry:
if !elasticsearch.IsIndexPattern(normalizedPattern) {
if foundSchema, found := sr.FindSchema(schema.TableName(normalizedPattern)); found {
if !foundSchema.ExistsInDataSource {
// index configured by the user, but not present in the data source
return elasticsearch.Sources{}, nil
}

return elasticsearch.Sources{
Indices: []elasticsearch.Index{},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{
{
Name: normalizedPattern,
BackingIndices: []string{normalizedPattern},
TimestampField: `@timestamp`,
},
},
}, nil
}

// ...index not found in schema.Registry (meaning the user did not configure it), but it could exist in Elastic
}

// Combine results from both schema.Registry and Elasticsearch:

// todo avoid creating new instances all the time
sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
sourcesFromElasticsearch, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
if err != nil {
return elasticsearch.Sources{}, err
logger.Warn().Msgf("Failed fetching resolving sources matching `%s`: %v", pattern, err)
} else {
sourcesToShow = &sourcesFromElasticsearch
}

combineSourcesFromElasticWithRegistry(&sourcesFromElastic, sr.AllSchemas(), normalizedPattern)
return sourcesFromElastic, nil
}
tablesFromClickHouse := getMatchingClickHouseTables(sr.AllSchemas(), normalizedPattern)

func combineSourcesFromElasticWithRegistry(sourcesFromElastic *elasticsearch.Sources, schemas map[schema.TableName]schema.Schema, normalizedPattern string) {
sourcesFromElastic.Indices =
slices.DeleteFunc(sourcesFromElastic.Indices, func(i elasticsearch.Index) bool {
_, exists := schemas[schema.TableName(i.Name)]
return exists
})
sourcesFromElastic.DataStreams = slices.DeleteFunc(sourcesFromElastic.DataStreams, func(i elasticsearch.DataStream) bool {
_, exists := schemas[schema.TableName(i.Name)]
return exists
})
addClickHouseTablesToSourcesFromElastic(sourcesToShow, tablesFromClickHouse)
return *sourcesToShow, nil
}

func getMatchingClickHouseTables(schemas map[schema.TableName]schema.Schema, normalizedPattern string) (tables []string) {
for name, currentSchema := range schemas {
indexName := name.AsString()

if config.MatchName(normalizedPattern, indexName) && currentSchema.ExistsInDataSource {
sourcesFromElastic.DataStreams = append(sourcesFromElastic.DataStreams, elasticsearch.DataStream{
Name: indexName,
BackingIndices: []string{indexName},
TimestampField: `@timestamp`,
})
tables = append(tables, indexName)
}
}
return tables
}

func addClickHouseTablesToSourcesFromElastic(sourcesFromElastic *elasticsearch.Sources, chTableNames []string) {
for _, name := range chTableNames { // Quesma presents CH tables as Elasticsearch Data Streams.
sourcesFromElastic.DataStreams = append(sourcesFromElastic.DataStreams, elasticsearch.DataStream{
Name: name,
BackingIndices: []string{name},
TimestampField: `@timestamp`,
})
}
}
7 changes: 4 additions & 3 deletions quesma/quesma/functionality/resolve/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Test_combineSourcesFromElasticWithRegistry(t *testing.T) {
},
normalizedPattern: "index*",
expectedResult: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index3"}},
Indices: []elasticsearch.Index{{Name: "index1"}, {Name: "index3"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{},
},
Expand All @@ -81,9 +81,10 @@ func Test_combineSourcesFromElasticWithRegistry(t *testing.T) {
},
normalizedPattern: "index*",
expectedResult: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index4"}},
Indices: []elasticsearch.Index{{Name: "index1"}, {Name: "index4"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{
{Name: "index3"},
{Name: "index5"},
{Name: "index1", BackingIndices: []string{"index1"}, TimestampField: `@timestamp`},
{Name: "index2", BackingIndices: []string{"index2"}, TimestampField: `@timestamp`},
Expand All @@ -95,7 +96,7 @@ func Test_combineSourcesFromElasticWithRegistry(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
combineSourcesFromElasticWithRegistry(&tt.sourcesFromElastic, tt.schemas, tt.normalizedPattern)
addClickHouseTablesToSourcesFromElastic(&tt.sourcesFromElastic, getMatchingClickHouseTables(tt.schemas, tt.normalizedPattern))
assert.ElementsMatchf(t, tt.sourcesFromElastic.Aliases, tt.expectedResult.Aliases, "Aliases don't match")
assert.ElementsMatchf(t, tt.sourcesFromElastic.Indices, tt.expectedResult.Indices, "Indices don't match")
assert.ElementsMatchf(t, tt.sourcesFromElastic.DataStreams, tt.expectedResult.DataStreams, "DataStreams don't match")
Expand Down
Loading