Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit cc0bcbd

Browse files
authored
Support _msearch queries targetting mixed sources (#1278)
This is a very basic support for `_msearch` targeting Elasticsearch, not ClickHouse. The main goal is to provide better Grafana experience which almost exclusively relies on `_msearch` endpoint. In the future we might expand on this, do a bigger refactor or support parrallel execution of multi search queries.
1 parent cdc3470 commit cc0bcbd

File tree

4 files changed

+56
-41
lines changed

4 files changed

+56
-41
lines changed

quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
package es_to_ch_ingest
55

66
import (
7-
"bytes"
87
"context"
98
"fmt"
109
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
1110
"github.com/QuesmaOrg/quesma/quesma/frontend_connectors"
1211
"github.com/QuesmaOrg/quesma/quesma/ingest"
1312
"github.com/QuesmaOrg/quesma/quesma/logger"
13+
"github.com/QuesmaOrg/quesma/quesma/util"
1414
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
1515

1616
"github.com/QuesmaOrg/quesma/quesma/processors"
@@ -20,7 +20,6 @@ import (
2020
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
2121
"github.com/QuesmaOrg/quesma/quesma/schema"
2222
"github.com/rs/zerolog/log"
23-
"io"
2423
"net/http"
2524
)
2625

@@ -165,15 +164,6 @@ func (p *ElasticsearchToClickHouseIngestProcessor) GetSupportedBackendConnectors
165164
return []quesma_api.BackendConnectorType{quesma_api.ClickHouseSQLBackend, quesma_api.ElasticsearchBackend}
166165
}
167166

168-
func ReadResponseBody(resp *http.Response) ([]byte, error) {
169-
respBody, err := io.ReadAll(resp.Body)
170-
if err != nil {
171-
return nil, err
172-
}
173-
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
174-
return respBody, nil
175-
}
176-
177167
func (p *ElasticsearchToClickHouseIngestProcessor) routeToElasticsearch(metadata map[string]interface{}, req *http.Request) (map[string]interface{}, *quesma_api.Result, error) {
178168
metadata[es_to_ch_common.RealSourceHeader] = es_to_ch_common.RealSourceElasticsearch
179169
esConn, err := p.getElasticsearchBackendConnector()
@@ -184,7 +174,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) routeToElasticsearch(metadata
184174
if err != nil {
185175
return metadata, nil, err
186176
}
187-
respBody, err := ReadResponseBody(resp)
177+
respBody, err := util.ReadResponseBody(resp)
188178
if err != nil {
189179
return metadata, nil, fmt.Errorf("failed to read response body from Elastic")
190180
}

quesma/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
package es_to_ch_query
55

66
import (
7-
"bytes"
8-
"compress/gzip"
97
"context"
108
"fmt"
119
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
@@ -17,6 +15,7 @@ import (
1715
quesm "github.com/QuesmaOrg/quesma/quesma/quesma"
1816
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
1917
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
18+
"github.com/QuesmaOrg/quesma/quesma/util"
2019
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
2120
"github.com/QuesmaOrg/quesma/quesma/v2/core/tracing"
2221
"io"
@@ -251,7 +250,7 @@ func (p *ElasticsearchToClickHouseQueryProcessor) routeToElasticsearch(metadata
251250
if err != nil {
252251
return metadata, nil, fmt.Errorf("failed sending request to Elastic")
253252
}
254-
respBody, err := ReadResponseBody(resp)
253+
respBody, err := util.ReadResponseBody(resp)
255254
if err != nil {
256255
return metadata, nil, fmt.Errorf("failed to read response body from Elastic")
257256
}
@@ -276,26 +275,6 @@ func findQueryTarget(index string, processorConfig config.QuesmaProcessorConfig)
276275
}
277276
}
278277

279-
func ReadResponseBody(resp *http.Response) ([]byte, error) {
280-
var reader io.Reader
281-
if resp.Header.Get("Content-Encoding") == "gzip" {
282-
gzipReader, err := gzip.NewReader(resp.Body)
283-
if err != nil {
284-
return nil, err
285-
}
286-
defer gzipReader.Close()
287-
reader = gzipReader
288-
} else {
289-
reader = resp.Body
290-
}
291-
respBody, err := io.ReadAll(reader)
292-
if err != nil {
293-
return nil, err
294-
}
295-
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
296-
return respBody, nil
297-
}
298-
299278
func GetQueryFromRequest(req *http.Request) (types.JSON, error) {
300279
reqBody, err := io.ReadAll(req.Body)
301280
if err != nil {

quesma/quesma/search.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,13 @@ func (q *QueryRunner) HandleMultiSearch(ctx context.Context, defaultIndexName st
235235
}
236236
logger.DebugWithCtx(ctx).Msgf("handling multisearch: queries=%d, indices=[%s], defaultIndex=[%s]", len(queries), queriedIndices, defaultIndexName)
237237
for _, query := range queries {
238-
239-
// TODO ask table resolver here and go to the right connector or connectors
240-
241-
responseBody, err := q.HandleSearch(ctx, query.indexName, query.query)
238+
var responseBody []byte
239+
if q.shouldRouteQueryToElasticsearch(query) { // this branch is here to get response from multi-search query targeted an index not stored in Clickhouse
240+
// this is also a shortcut that we took to delay a bigger refactor, eventually HandleMultiSearch should dispatch all individual queries to proper connector, similarly to `_bulk` endpoint
241+
responseBody, err = q.forwardToElasticsearch(ctx, query.indexName, query.query)
242+
} else {
243+
responseBody, err = q.HandleSearch(ctx, query.indexName, query.query)
244+
}
242245

243246
if err != nil {
244247

@@ -290,6 +293,27 @@ func (q *QueryRunner) HandleMultiSearch(ctx context.Context, defaultIndexName st
290293
return responseBody, nil
291294
}
292295

296+
func (q *QueryRunner) forwardToElasticsearch(ctx context.Context, indexName string, query types.JSON) ([]byte, error) {
297+
logger.DebugWithCtx(ctx).Msgf("_msearch query to index=%s forwarded to Elasticsearch", indexName)
298+
esClient := elasticsearch.NewSimpleClient(&q.cfg.Elasticsearch)
299+
queryBody, _ := query.Bytes()
300+
if resp, err := esClient.Request(ctx, "POST", "/_search", queryBody); err != nil {
301+
return []byte{}, err
302+
} else {
303+
respBody, errRead := util.ReadResponseBody(resp)
304+
return respBody, errRead
305+
}
306+
}
307+
308+
func (q *QueryRunner) shouldRouteQueryToElasticsearch(query msearchQuery) bool {
309+
decision := q.tableResolver.Resolve(quesma_api.QueryPipeline, query.indexName)
310+
if len(decision.UseConnectors) == 1 {
311+
_, useElastic := decision.UseConnectors[0].(*quesma_api.ConnectorDecisionElastic)
312+
return useElastic
313+
}
314+
return false
315+
}
316+
293317
func (q *QueryRunner) HandleSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error) {
294318
return q.handleSearchCommon(ctx, indexPattern, body, nil)
295319
}
@@ -457,15 +481,16 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
457481
clickhouseConnector = c
458482

459483
case *quesma_api.ConnectorDecisionElastic:
460-
// NOP
484+
// After https://github.com/QuesmaOrg/quesma/pull/1278 we should never land in this situation,
485+
// previously this was an escape hatch for `_msearch` payload containing Elasticsearch-targetted query
486+
// This code lives only to postpone bigger refactor of `handleSearchCommon` which also supports async and A/B testing
461487

462488
default:
463489
return nil, fmt.Errorf("unknown connector type: %T", c)
464490
}
465491
}
466492

467493
if clickhouseConnector == nil {
468-
// TODO: at this moment it's possible to land in this situation if `_msearch` payload contains Elasticsearch-targetted query
469494
logger.Warn().Msgf("multi-search payload contains Elasticsearch-targetted query")
470495
return nil, fmt.Errorf("quesma-processed _msearch payload contains Elasticsearch-targetted query")
471496
}

quesma/util/utils.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package util
44

55
import (
66
"bytes"
7+
"compress/gzip"
78
"database/sql"
89
"encoding/base64"
910
"fmt"
@@ -996,3 +997,23 @@ func TableNamePatternRegexp(indexPattern string) *regexp.Regexp {
996997
patternCache[indexPattern] = result
997998
return result
998999
}
1000+
1001+
func ReadResponseBody(resp *http.Response) ([]byte, error) {
1002+
var reader io.Reader
1003+
if resp.Header.Get("Content-Encoding") == "gzip" {
1004+
gzipReader, err := gzip.NewReader(resp.Body)
1005+
if err != nil {
1006+
return nil, err
1007+
}
1008+
defer gzipReader.Close()
1009+
reader = gzipReader
1010+
} else {
1011+
reader = resp.Body
1012+
}
1013+
respBody, err := io.ReadAll(reader)
1014+
if err != nil {
1015+
return nil, err
1016+
}
1017+
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
1018+
return respBody, nil
1019+
}

0 commit comments

Comments
 (0)