Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 792408f

Browse files
authored
Remove the need for SQL-type backend connector in query/ingest processors and pipelines (#861)
Previously, the user always had to provide two backend connectors to the query/ingest processor: Elastic connector and SQL-type connector (aka ClickHouse/Hydrolix). Remove that requirement - now only the Elastic connector is mandatory (for Kibana internal queries) and the user doesn't need to have SQL-type backend connector in the pipeline (in such case they won't be able to use it in `indexes` configuration, thus only targeting queries/ingest to Elastic). However, this doesn't yet solve the more tricky scenario of completely removing SQL-type backend connector from the entire configuration (it's possible in transparent proxy mode, but not yet in dual pipeline mode). Put another way: ClickHouse/Hydrolix is still required in the `backendConnectors` section (except in no-op transparent proxy), but you can now omit it from `pipelines`/`processors` sections. The more tricky scenario is tested by (currently skipped) `TestQuesmaTransparentProxyWithoutNoopConfiguration`.
1 parent e3ca75a commit 792408f

File tree

4 files changed

+187
-20
lines changed

4 files changed

+187
-20
lines changed

quesma/quesma/config/config_v2.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,9 @@ func (c *QuesmaNewConfiguration) validatePipelines() error {
313313
}
314314

315315
func (c *QuesmaNewConfiguration) validateFrontendConnector(fc FrontendConnector) error {
316+
if len(fc.Name) == 0 {
317+
return fmt.Errorf("frontend connector must have a non-empty name")
318+
}
316319
if fc.Type != ElasticsearchFrontendIngestConnectorName && fc.Type != ElasticsearchFrontendQueryConnectorName {
317320
return fmt.Errorf(fmt.Sprintf("frontend connector's [%s] type not recognized, only `%s` and `%s` are supported at this moment", fc.Name, ElasticsearchFrontendIngestConnectorName, ElasticsearchFrontendQueryConnectorName))
318321
}
@@ -344,6 +347,9 @@ func (c *QuesmaNewConfiguration) definedProcessorNames() []string {
344347
}
345348

346349
func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
350+
if len(p.Name) == 0 {
351+
return fmt.Errorf("processor must have a non-empty name")
352+
}
347353
if !slices.Contains(getAllowedProcessorTypes(), p.Type) {
348354
return fmt.Errorf("processor type not recognized, only `quesma-v1-processor-noop`, `quesma-v1-processor-query` and `quesma-v1-processor-ingest` are supported at this moment")
349355
}
@@ -374,6 +380,9 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
374380

375381
func (c *QuesmaNewConfiguration) validatePipeline(pipeline Pipeline) error {
376382
var _, errAcc error
383+
if len(pipeline.Name) == 0 {
384+
errAcc = multierror.Append(errAcc, fmt.Errorf("pipeline must have a non-empty name"))
385+
}
377386
if len(pipeline.FrontendConnectors) != 1 {
378387
errAcc = multierror.Append(errAcc, fmt.Errorf("pipeline must have exactly one frontend connector"))
379388
} else if len(pipeline.FrontendConnectors) == 0 {
@@ -411,23 +420,19 @@ func (c *QuesmaNewConfiguration) validatePipeline(pipeline Pipeline) error {
411420
}
412421
}
413422
if onlyProcessorInPipeline.Type == QuesmaV1ProcessorQuery || onlyProcessorInPipeline.Type == QuesmaV1ProcessorIngest {
414-
if len(pipeline.BackendConnectors) != 2 {
415-
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires two backend connectors", pipeline.Name, onlyProcessorInPipeline.Type)))
416-
}
417-
bConn1, bConn2 := c.getBackendConnectorByName(pipeline.BackendConnectors[0]), c.getBackendConnectorByName(pipeline.BackendConnectors[1])
418-
if bConn1 == nil {
419-
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", pipeline.BackendConnectors[0], pipeline.Name)))
420-
}
421-
if bConn2 == nil {
422-
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", pipeline.BackendConnectors[1], pipeline.Name)))
423+
foundElasticBackendConnector := false
424+
for _, backendConnectorName := range pipeline.BackendConnectors {
425+
backendConnector := c.getBackendConnectorByName(backendConnectorName)
426+
if backendConnector == nil {
427+
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", backendConnectorName, pipeline.Name)))
428+
}
429+
if backendConnector.Type == ElasticsearchBackendConnectorName {
430+
foundElasticBackendConnector = true
431+
}
423432
}
424-
backendConnTypes := []string{bConn1.Type, bConn2.Type}
425-
if !slices.Contains(backendConnTypes, ElasticsearchBackendConnectorName) {
433+
if !foundElasticBackendConnector {
426434
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires having one elasticsearch backend connector", pipeline.Name, onlyProcessorInPipeline.Type)))
427435
}
428-
if !slices.Contains(backendConnTypes, ClickHouseBackendConnectorName) && !slices.Contains(backendConnTypes, ClickHouseOSBackendConnectorName) && !slices.Contains(backendConnTypes, HydrolixBackendConnectorName) {
429-
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires having one Clickhouse-compatible backend connector", pipeline.Name, onlyProcessorInPipeline.Type)))
430-
}
431436
}
432437
}
433438

@@ -711,6 +716,9 @@ func (c *QuesmaNewConfiguration) getRelationalDBConf() (*RelationalDbConfigurati
711716
func (c *QuesmaNewConfiguration) validateBackendConnectors() error {
712717
elasticBackendConnectors, clickhouseBackendConnectors := 0, 0
713718
for _, backendConn := range c.BackendConnectors {
719+
if len(backendConn.Name) == 0 {
720+
return fmt.Errorf("backend connector must have a non-empty name")
721+
}
714722
if backendConn.Type == ElasticsearchBackendConnectorName {
715723
elasticBackendConnectors += 1
716724
} else if backendConn.Type == ClickHouseBackendConnectorName || backendConn.Type == ClickHouseOSBackendConnectorName || backendConn.Type == HydrolixBackendConnectorName {

quesma/quesma/config/config_v2_test.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,29 @@ func TestQuesmaTransparentProxyConfiguration(t *testing.T) {
7575
assert.Equal(t, false, legacyConf.CreateCommonTable)
7676
}
7777

78+
func TestQuesmaTransparentProxyWithoutNoopConfiguration(t *testing.T) {
79+
t.Skip("not working yet")
80+
81+
os.Setenv(configFileLocationEnvVar, "./test_configs/quesma_as_transparent_proxy_without_noop.yml")
82+
cfg := LoadV2Config()
83+
if err := cfg.Validate(); err != nil {
84+
t.Fatalf("error validating config: %v", err)
85+
}
86+
legacyConf := cfg.TranslateToLegacyConfig()
87+
assert.False(t, legacyConf.TransparentProxy) // even though transparent proxy would work similarly, the user explicitly requested two Quesma pipelines
88+
assert.Equal(t, 2, len(legacyConf.IndexConfig))
89+
siemIndexConf := legacyConf.IndexConfig["siem"]
90+
logsIndexConf := legacyConf.IndexConfig["logs"]
91+
92+
assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.QueryTarget)
93+
assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget)
94+
95+
assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.QueryTarget)
96+
assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget)
97+
assert.Equal(t, true, legacyConf.EnableIngest)
98+
assert.Equal(t, false, legacyConf.CreateCommonTable)
99+
}
100+
78101
func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) {
79102
os.Setenv(configFileLocationEnvVar, "./test_configs/quesma_adding_two_hydrolix_tables.yaml")
80103
cfg := LoadV2Config()
@@ -87,11 +110,32 @@ func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) {
87110
siemIndexConf := legacyConf.IndexConfig["siem"]
88111
logsIndexConf := legacyConf.IndexConfig["logs"]
89112

90-
assert.Equal(t, []string{"clickhouse"}, siemIndexConf.QueryTarget)
91-
assert.Equal(t, []string{"elasticsearch"}, siemIndexConf.IngestTarget)
113+
assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget)
114+
assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget)
115+
116+
assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget)
117+
assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget)
118+
assert.Equal(t, true, legacyConf.EnableIngest)
119+
assert.Equal(t, false, legacyConf.CreateCommonTable)
120+
}
121+
122+
func TestIngestWithSingleConnector(t *testing.T) {
123+
os.Setenv(configFileLocationEnvVar, "./test_configs/ingest_with_single_connector.yaml")
124+
cfg := LoadV2Config()
125+
if err := cfg.Validate(); err != nil {
126+
t.Fatalf("error validating config: %v", err)
127+
}
128+
legacyConf := cfg.TranslateToLegacyConfig()
129+
assert.False(t, legacyConf.TransparentProxy)
130+
assert.Equal(t, 2, len(legacyConf.IndexConfig))
131+
siemIndexConf := legacyConf.IndexConfig["siem"]
132+
logsIndexConf := legacyConf.IndexConfig["logs"]
133+
134+
assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget)
135+
assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget)
92136

93-
assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget)
94-
assert.Equal(t, []string{"elasticsearch"}, logsIndexConf.IngestTarget)
137+
assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget)
138+
assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget)
95139
assert.Equal(t, true, legacyConf.EnableIngest)
96140
assert.Equal(t, false, legacyConf.CreateCommonTable)
97141
}
@@ -111,9 +155,9 @@ func TestQuesmaHydrolixQueryOnly(t *testing.T) {
111155
logsIndexConf, ok := legacyConf.IndexConfig["logs"]
112156
assert.True(t, ok)
113157

114-
assert.Equal(t, []string{"clickhouse"}, siemIndexConf.QueryTarget)
158+
assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget)
115159

116-
assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget)
160+
assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget)
117161

118162
assert.Equal(t, false, legacyConf.EnableIngest)
119163
assert.Equal(t, false, legacyConf.IngestStatistics)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Similar to quesma_adding_two_hydrolix_tables.yaml,
2+
# but the ingest processor has only a single backend connector.
3+
4+
logging:
5+
level: info
6+
frontendConnectors:
7+
- name: elastic-ingest
8+
type: elasticsearch-fe-ingest
9+
config:
10+
listenPort: 8080
11+
- name: elastic-query
12+
type: elasticsearch-fe-query
13+
config:
14+
listenPort: 8080
15+
backendConnectors:
16+
- name: my-minimal-elasticsearch
17+
type: elasticsearch
18+
config:
19+
url: "http://elasticsearch:9200"
20+
user: elastic
21+
password: quesmaquesma
22+
- name: my-hydrolix-instance
23+
type: hydrolix
24+
config:
25+
url: "clickhouse://localhost:9000"
26+
user: "u"
27+
password: "p"
28+
database: "d"
29+
ingestStatistics: true
30+
processors:
31+
- name: my-query-processor
32+
type: quesma-v1-processor-query
33+
config:
34+
indexes:
35+
siem:
36+
target: [my-hydrolix-instance]
37+
logs:
38+
target: [my-hydrolix-instance]
39+
"*":
40+
target: [ my-minimal-elasticsearch ]
41+
- name: my-ingest-processor
42+
type: quesma-v1-processor-ingest
43+
config:
44+
indexes:
45+
siem:
46+
target: [ my-minimal-elasticsearch ]
47+
logs:
48+
target: [ my-minimal-elasticsearch ]
49+
"*":
50+
target: [ my-minimal-elasticsearch ]
51+
pipelines:
52+
- name: my-elasticsearch-proxy-read
53+
frontendConnectors: [ elastic-query ]
54+
processors: [ my-query-processor ]
55+
backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ]
56+
- name: my-elasticsearch-proxy-write
57+
frontendConnectors: [ elastic-ingest ]
58+
processors: [ my-ingest-processor ]
59+
backendConnectors: [ my-minimal-elasticsearch ] # my-hydrolix-instance is not needed here, as we don't ingest to it
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# The recommended way to start Quesma in transparent proxy
2+
# is to use the noop processor. However, the user can achieve
3+
# the same thing by specifying query/ingest processors without
4+
# routing anything to ClickHouse/Hydrolix - that should be supported,
5+
# even if not recommended.
6+
7+
logging:
8+
level: info
9+
frontendConnectors:
10+
- name: elastic-ingest
11+
type: elasticsearch-fe-ingest
12+
config:
13+
listenPort: 8080
14+
- name: elastic-query
15+
type: elasticsearch-fe-query
16+
config:
17+
listenPort: 8080
18+
backendConnectors:
19+
- name: my-minimal-elasticsearch
20+
type: elasticsearch
21+
config:
22+
url: "http://elasticsearch:9200"
23+
user: elastic
24+
password: quesmaquesma
25+
# No ClickHouse, Hydrolix connector needed!
26+
ingestStatistics: true
27+
processors:
28+
- name: my-query-processor
29+
type: quesma-v1-processor-query
30+
config:
31+
indexes:
32+
siem:
33+
target: [ my-minimal-elasticsearch ]
34+
logs:
35+
target: [ my-minimal-elasticsearch ]
36+
"*":
37+
target: [ my-minimal-elasticsearch ]
38+
- name: my-ingest-processor
39+
type: quesma-v1-processor-ingest
40+
config:
41+
indexes:
42+
siem:
43+
target: [ my-minimal-elasticsearch ]
44+
logs:
45+
target: [ my-minimal-elasticsearch ]
46+
"*":
47+
target: [ my-minimal-elasticsearch ]
48+
pipelines:
49+
- name: my-elasticsearch-transparent-proxy-read
50+
frontendConnectors: [ elastic-query ]
51+
processors: [ my-query-processor ]
52+
backendConnectors: [ my-minimal-elasticsearch ]
53+
- name: my-elasticsearch-transparent-proxy-write
54+
frontendConnectors: [ elastic-ingest ]
55+
processors: [ my-ingest-processor ]
56+
backendConnectors: [ my-minimal-elasticsearch ]

0 commit comments

Comments
 (0)