Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit aa77935

Browse files
authored
Moving Decision to frontend_connectors (#1062)
This is prerequisite for moving `mux.PathRouter` to `frontend_connectors` and integrate it with the new API --------- Signed-off-by: Przemyslaw Delewski <[email protected]>
1 parent f7a01e6 commit aa77935

21 files changed

+377
-353
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package frontend_connectors
4+
5+
import (
6+
"fmt"
7+
"strings"
8+
)
9+
10+
type Decision struct {
11+
// input
12+
IndexPattern string "json:\"index_pattern\""
13+
14+
// obvious fields
15+
IsClosed bool "json:\"is_closed\""
16+
Err error "json:\"error\""
17+
IsEmpty bool "json:\"is_empty\""
18+
19+
EnableABTesting bool "json:\"enable_ab_testing\""
20+
21+
// which connector to use, and how
22+
UseConnectors []ConnectorDecision "json:\"use_connectors\""
23+
24+
// who made the decision and why
25+
Reason string "json:\"reason\""
26+
ResolverName string "json:\"resolver_name\""
27+
}
28+
29+
func (d *Decision) String() string {
30+
31+
var lines []string
32+
33+
if d.IsClosed {
34+
lines = append(lines, "Returns a closed index message.")
35+
}
36+
37+
if d.IsEmpty {
38+
lines = append(lines, "Returns an empty result.")
39+
}
40+
41+
if d.Err != nil {
42+
lines = append(lines, fmt.Sprintf("Returns error: '%v'.", d.Err))
43+
}
44+
45+
for _, connector := range d.UseConnectors {
46+
lines = append(lines, connector.Message())
47+
}
48+
49+
if d.EnableABTesting {
50+
lines = append(lines, "Enable AB testing.")
51+
}
52+
53+
lines = append(lines, fmt.Sprintf("%s (%s).", d.Reason, d.ResolverName))
54+
55+
return strings.Join(lines, " ")
56+
}
57+
58+
type ConnectorDecision interface {
59+
Message() string
60+
}
61+
62+
type ConnectorDecisionElastic struct {
63+
// TODO instance of elastic connector
64+
ManagementCall bool "json:\"management_call\""
65+
}
66+
67+
func (d *ConnectorDecisionElastic) Message() string {
68+
var lines []string
69+
lines = append(lines, "Pass to Elasticsearch.")
70+
if d.ManagementCall {
71+
lines = append(lines, "Management call.")
72+
}
73+
return strings.Join(lines, " ")
74+
}
75+
76+
type ConnectorDecisionClickhouse struct {
77+
// TODO instance of clickhouse connector
78+
79+
ClickhouseTableName string "json:\"clickhouse_table_name\""
80+
ClickhouseTables []string "json:\"clickhouse_tables\""
81+
IsCommonTable bool "json:\"is_common_table\""
82+
}
83+
84+
func (d *ConnectorDecisionClickhouse) Message() string {
85+
var lines []string
86+
87+
lines = append(lines, "Pass to clickhouse.")
88+
if len(d.ClickhouseTableName) > 0 {
89+
lines = append(lines, fmt.Sprintf("Table: '%s' .", d.ClickhouseTableName))
90+
}
91+
if d.IsCommonTable {
92+
lines = append(lines, "Common table.")
93+
}
94+
if len(d.ClickhouseTables) > 0 {
95+
lines = append(lines, fmt.Sprintf("Indexes: %v.", d.ClickhouseTables))
96+
}
97+
98+
return strings.Join(lines, " ")
99+
}
100+
101+
// PatternDecisions is a struct that holds the pattern and the decisions made for that pattern
102+
type PatternDecisions struct {
103+
Pattern string
104+
Decisions map[string]*Decision
105+
}
106+
107+
// TODO hardcoded pipeline names
108+
const (
109+
QueryPipeline = "Query"
110+
IngestPipeline = "Ingest"
111+
)

quesma/ingest/common_table_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/assert"
1010
"quesma/clickhouse"
1111
"quesma/common_table"
12+
"quesma/frontend_connectors"
1213
"quesma/jsonprocessor"
1314
"quesma/persistence"
1415
"quesma/quesma/config"
@@ -191,9 +192,9 @@ func TestIngestToCommonTable(t *testing.T) {
191192

192193
resolver := table_resolver.NewEmptyTableResolver()
193194

194-
decision := &table_resolver.Decision{
195-
UseConnectors: []table_resolver.ConnectorDecision{
196-
&table_resolver.ConnectorDecisionClickhouse{
195+
decision := &frontend_connectors.Decision{
196+
UseConnectors: []frontend_connectors.ConnectorDecision{
197+
&frontend_connectors.ConnectorDecisionClickhouse{
197198
ClickhouseTableName: common_table.TableName,
198199
ClickhouseTables: []string{indexName},
199200
IsCommonTable: true,

quesma/ingest/ingest_validator_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/DATA-DOG/go-sqlmock"
1010
"github.com/stretchr/testify/assert"
1111
"quesma/clickhouse"
12+
"quesma/frontend_connectors"
1213
"quesma/quesma/config"
1314
"quesma/quesma/types"
1415
"quesma/table_resolver"
@@ -171,8 +172,8 @@ func TestIngestValidation(t *testing.T) {
171172
ip.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMap)
172173

173174
resolver := table_resolver.NewEmptyTableResolver()
174-
decision := &table_resolver.Decision{
175-
UseConnectors: []table_resolver.ConnectorDecision{&table_resolver.ConnectorDecisionClickhouse{
175+
decision := &frontend_connectors.Decision{
176+
UseConnectors: []frontend_connectors.ConnectorDecision{&frontend_connectors.ConnectorDecisionClickhouse{
176177
ClickhouseTableName: "test_table",
177178
}}}
178179
resolver.Decisions["test_table"] = decision

quesma/ingest/insert_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/DATA-DOG/go-sqlmock"
99
"github.com/stretchr/testify/assert"
1010
"quesma/clickhouse"
11+
"quesma/frontend_connectors"
1112
"quesma/jsonprocessor"
1213
"quesma/persistence"
1314
"quesma/quesma/config"
@@ -239,8 +240,8 @@ func TestProcessInsertQuery(t *testing.T) {
239240
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
240241
ip.ip.chDb = db
241242
resolver := table_resolver.NewEmptyTableResolver()
242-
decision := &table_resolver.Decision{
243-
UseConnectors: []table_resolver.ConnectorDecision{&table_resolver.ConnectorDecisionClickhouse{
243+
decision := &frontend_connectors.Decision{
244+
UseConnectors: []frontend_connectors.ConnectorDecision{&frontend_connectors.ConnectorDecisionClickhouse{
244245
ClickhouseTableName: "test_table",
245246
}}}
246247
resolver.Decisions["test_table"] = decision
@@ -424,8 +425,8 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
424425
schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema
425426

426427
resolver := table_resolver.NewEmptyTableResolver()
427-
decision := &table_resolver.Decision{
428-
UseConnectors: []table_resolver.ConnectorDecision{&table_resolver.ConnectorDecisionClickhouse{
428+
decision := &frontend_connectors.Decision{
429+
UseConnectors: []frontend_connectors.ConnectorDecision{&frontend_connectors.ConnectorDecisionClickhouse{
429430
ClickhouseTableName: "test_index",
430431
}}}
431432
resolver.Decisions["test_index"] = decision

quesma/ingest/processor.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"quesma/comment_metadata"
1313
"quesma/common_table"
1414
"quesma/end_user_errors"
15+
"quesma/frontend_connectors"
1516
"quesma/jsonprocessor"
1617
"quesma/logger"
1718
"quesma/model"
@@ -695,7 +696,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
695696
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
696697
tableFormatter TableColumNameFormatter) error {
697698

698-
decision := lm.tableResolver.Resolve(table_resolver.IngestPipeline, tableName)
699+
decision := lm.tableResolver.Resolve(frontend_connectors.IngestPipeline, tableName)
699700

700701
if decision.Err != nil {
701702
return decision.Err
@@ -711,10 +712,10 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
711712

712713
for _, connectorDecision := range decision.UseConnectors {
713714

714-
var clickhouseDecision *table_resolver.ConnectorDecisionClickhouse
715+
var clickhouseDecision *frontend_connectors.ConnectorDecisionClickhouse
715716

716717
var ok bool
717-
if clickhouseDecision, ok = connectorDecision.(*table_resolver.ConnectorDecisionClickhouse); !ok {
718+
if clickhouseDecision, ok = connectorDecision.(*frontend_connectors.ConnectorDecisionClickhouse); !ok {
718719
continue
719720
}
720721

quesma/quesma/dual_write_proxy.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"quesma/elasticsearch"
1616
"quesma/end_user_errors"
1717
"quesma/feature"
18+
"quesma/frontend_connectors"
1819
"quesma/ingest"
1920
"quesma/logger"
2021
"quesma/queryparser"
@@ -368,7 +369,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
368369
}
369370

370371
for _, connector := range decision.UseConnectors {
371-
if _, ok := connector.(*table_resolver.ConnectorDecisionElastic); ok {
372+
if _, ok := connector.(*frontend_connectors.ConnectorDecisionElastic); ok {
372373
// this is desired elastic call
373374
sendToElastic = true
374375
break

quesma/quesma/dual_write_proxy_v2.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"quesma/elasticsearch"
1616
"quesma/end_user_errors"
1717
"quesma/feature"
18+
"quesma/frontend_connectors"
1819
"quesma/ingest"
1920
"quesma/logger"
2021
"quesma/queryparser"
@@ -276,7 +277,7 @@ func (*routerV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter,
276277

277278
}
278279

279-
func (r *routerV2) elasticFallback(decision *table_resolver.Decision,
280+
func (r *routerV2) elasticFallback(decision *frontend_connectors.Decision,
280281
ctx context.Context, w http.ResponseWriter,
281282
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager) {
282283

@@ -307,7 +308,7 @@ func (r *routerV2) elasticFallback(decision *table_resolver.Decision,
307308
}
308309

309310
for _, connector := range decision.UseConnectors {
310-
if _, ok := connector.(*table_resolver.ConnectorDecisionElastic); ok {
311+
if _, ok := connector.(*frontend_connectors.ConnectorDecisionElastic); ok {
311312
// this is desired elastic call
312313
sendToElastic = true
313314
break
@@ -360,7 +361,7 @@ func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http
360361

361362
quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)
362363
var handler mux.Handler
363-
var decision *table_resolver.Decision
364+
var decision *frontend_connectors.Decision
364365
searchHandler, searchDecision := searchRouter.Matches(quesmaRequest)
365366
if searchDecision != nil {
366367
decision = searchDecision

quesma/quesma/functionality/bulk/bulk.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"quesma/clickhouse"
1212
"quesma/elasticsearch"
1313
"quesma/end_user_errors"
14+
"quesma/frontend_connectors"
1415
"quesma/ingest"
1516
"quesma/logger"
1617
"quesma/queryparser"
@@ -138,7 +139,7 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
138139
}
139140
}
140141

141-
decision := tableResolver.Resolve(table_resolver.IngestPipeline, index)
142+
decision := tableResolver.Resolve(frontend_connectors.IngestPipeline, index)
142143

143144
if decision.Err != nil {
144145
return decision.Err
@@ -180,7 +181,7 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
180181

181182
switch connector.(type) {
182183

183-
case *table_resolver.ConnectorDecisionElastic:
184+
case *frontend_connectors.ConnectorDecisionElastic:
184185
// Bulk entry for Elastic - forward the request as-is
185186
opBytes, err := rawOp.Bytes()
186187
if err != nil {
@@ -198,7 +199,7 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
198199

199200
elasticBulkEntries = append(elasticBulkEntries, entryWithResponse)
200201

201-
case *table_resolver.ConnectorDecisionClickhouse:
202+
case *frontend_connectors.ConnectorDecisionClickhouse:
202203

203204
// Bulk entry for Clickhouse
204205
if operation != "create" && operation != "index" {

quesma/quesma/matchers.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package quesma
44

55
import (
6+
"quesma/frontend_connectors"
67
"quesma/logger"
78
"quesma/quesma/config"
89
"quesma/quesma/mux"
@@ -33,15 +34,15 @@ func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration, tableReso
3334
if idx%2 == 0 {
3435
name := extractIndexName(s)
3536

36-
decision := tableResolver.Resolve(table_resolver.IngestPipeline, name)
37+
decision := tableResolver.Resolve(frontend_connectors.IngestPipeline, name)
3738

3839
if decision.IsClosed {
3940
return mux.MatchResult{Matched: true, Decision: decision}
4041
}
4142

4243
// if have any enabled Clickhouse connector, then return true
4344
for _, connector := range decision.UseConnectors {
44-
if _, ok := connector.(*table_resolver.ConnectorDecisionClickhouse); ok {
45+
if _, ok := connector.(*frontend_connectors.ConnectorDecisionClickhouse); ok {
4546
return mux.MatchResult{Matched: true, Decision: decision}
4647
}
4748
}
@@ -56,7 +57,7 @@ func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration, tableReso
5657

5758
// Query path only (looks at QueryTarget)
5859
func matchedAgainstPattern(indexRegistry table_resolver.TableResolver) mux.RequestMatcher {
59-
return matchAgainstTableResolver(indexRegistry, table_resolver.QueryPipeline)
60+
return matchAgainstTableResolver(indexRegistry, frontend_connectors.QueryPipeline)
6061
}
6162

6263
// check whether exact index name is enabled
@@ -70,7 +71,7 @@ func matchAgainstTableResolver(indexRegistry table_resolver.TableResolver, pipel
7071
return mux.MatchResult{Matched: false, Decision: decision}
7172
}
7273
for _, connector := range decision.UseConnectors {
73-
if _, ok := connector.(*table_resolver.ConnectorDecisionClickhouse); ok {
74+
if _, ok := connector.(*frontend_connectors.ConnectorDecisionClickhouse); ok {
7475
return mux.MatchResult{Matched: true, Decision: decision}
7576
}
7677
}
@@ -79,11 +80,11 @@ func matchAgainstTableResolver(indexRegistry table_resolver.TableResolver, pipel
7980
}
8081

8182
func matchedExactQueryPath(indexRegistry table_resolver.TableResolver) mux.RequestMatcher {
82-
return matchAgainstTableResolver(indexRegistry, table_resolver.QueryPipeline)
83+
return matchAgainstTableResolver(indexRegistry, frontend_connectors.QueryPipeline)
8384
}
8485

8586
func matchedExactIngestPath(indexRegistry table_resolver.TableResolver) mux.RequestMatcher {
86-
return matchAgainstTableResolver(indexRegistry, table_resolver.IngestPipeline)
87+
return matchAgainstTableResolver(indexRegistry, frontend_connectors.IngestPipeline)
8788
}
8889

8990
// Returns false if the body contains a Kibana internal search.

0 commit comments

Comments
 (0)