Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ci/it/configs/quesma-wildcard-clickhouse.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-instance
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
"*":
target: [ my-clickhouse-instance ]
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
"*":
target: [ my-clickhouse-instance ]
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]

56 changes: 56 additions & 0 deletions ci/it/configs/quesma-wildcard-disabled.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-instance
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
"explicitly_disabled1":
target: [ ]
"explicitly_disabled3":
target: [ ]
"query_enabled":
target: [ my-clickhouse-instance ]
"*":
target: [ ]
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
"explicitly_disabled2":
target: [ ]
"explicitly_disabled3":
target: [ ]
"ingest_enabled":
target: [ my-clickhouse-instance ]
"*":
target: [ ]
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]

10 changes: 10 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ func TestDualWriteAndCommonTableTestcase(t *testing.T) {
testCase := testcases.NewDualWriteAndCommonTableTestcase()
runIntegrationTest(t, testCase)
}

func TestWildcardDisabledTestcase(t *testing.T) {
testCase := testcases.NewWildcardDisabledTestcase()
runIntegrationTest(t, testCase)
}

func TestWildcardClickhouseTestcase(t *testing.T) {
testCase := testcases.NewWildcardClickhouseTestcase()
runIntegrationTest(t, testCase)
}
16 changes: 14 additions & 2 deletions ci/it/testcases/test_reading_clickhouse_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package testcases
import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
Expand Down Expand Up @@ -36,6 +37,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) RunTests(ctx context.Contex
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test random thing", func(t *testing.T) { a.testRandomThing(ctx, t) })
t.Run("test wildcard goes to elastic", func(t *testing.T) { a.testWildcardGoesToElastic(ctx, t) })
t.Run("test ingest is disabled", func(t *testing.T) { a.testIngestIsDisabled(ctx, t) })
return nil
}

Expand Down Expand Up @@ -91,5 +93,15 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(c
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

// At this moment this configuration does not disable ingest (ingest req's will get routed to ES and handled normally)
// Future test idea -> ensure ingest req gets rejected.
func (a *ReadingClickHouseTablesIntegrationTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) {
// There is no ingest pipeline, so Quesma should reject all ingest requests
for _, tt := range []string{"test_table", "extra_index"} {
t.Run(tt, func(t *testing.T) {
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 11111}`))
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}
}
7 changes: 3 additions & 4 deletions ci/it/testcases/test_two_pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ func (a *QueryAndIngestPipelineTestcase) testBasicRequest(ctx context.Context, t

func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) {
// Given an index in Elasticsearch which falls under `*` in the configuration
var err error
if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil {
if _, err := a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil {
if _, err := a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil {
t.Fatalf("Failed to insert document: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil {
if _, err := a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}
// When Quesma searches for that document
Expand Down
99 changes: 99 additions & 0 deletions ci/it/testcases/test_wildcard_clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"testing"
)

type WildcardClickhouseTestcase struct {
IntegrationTestcaseBase
}

func NewWildcardClickhouseTestcase() *WildcardClickhouseTestcase {
return &WildcardClickhouseTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-wildcard-clickhouse.yml.template",
},
}
}

func (a *WildcardClickhouseTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
if err != nil {
return err
}
a.Containers = containers
return nil
}

func (a *WildcardClickhouseTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test ingest+query works", func(t *testing.T) { a.testIngestQueryWorks(ctx, t) })
t.Run("test clickhouse table autodiscovery", func(t *testing.T) { a.testClickHouseTableAutodiscovery(ctx, t) })
return nil
}

func (a *WildcardClickhouseTestcase) testBasicRequest(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "GET", "/", nil)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

func (a *WildcardClickhouseTestcase) testIngestQueryWorks(ctx context.Context, t *testing.T) {
// First ingest...
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/test_index/_doc", []byte(`{"name": "Piotr", "age": 22222}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))

// ...then query inserted data
resp, bodyBytes = a.RequestToQuesma(ctx, t, "POST", "/test_index/_search", []byte(`{"query": {"match_all": {}}}`))
assert.Contains(t, string(bodyBytes), "Piotr")
assert.Contains(t, string(bodyBytes), "22222")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))

// Also make sure no such index got created in Elasticsearch
resp, err := a.RequestToElasticsearch(ctx, "GET", "/test_index/_refresh", nil)
if err != nil {
t.Fatalf("Failed to make GET request: %s", err)
}
defer resp.Body.Close()
bodyBytes, err = io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %s", err)
}

assert.Equal(t, http.StatusNotFound, resp.StatusCode)
assert.Contains(t, string(bodyBytes), "no such index [test_index]")
}

func (a *WildcardClickhouseTestcase) testClickHouseTableAutodiscovery(ctx context.Context, t *testing.T) {
// Create test table in ClickHouse
createTableQuery := "CREATE TABLE IF NOT EXISTS existing_clickhouse_table (id UInt32, name String) ENGINE = Memory"
if _, err := a.ExecuteClickHouseStatement(ctx, createTableQuery); err != nil {
t.Fatalf("Failed to create table: %s", err)
}
insertRowsQuery := "INSERT INTO existing_clickhouse_table (id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')"
if _, err := a.ExecuteClickHouseStatement(ctx, insertRowsQuery); err != nil {
t.Fatalf("Failed to insert rows: %s", err)
}

resp, _ := a.RequestToQuesma(ctx, t, "POST", "/existing_clickhouse_table/_search", []byte(`{"query": {"match_all": {}}}`))
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))

// This returns 500 Internal Server Error, but will be tackled in separate PR.
// (The table has not yet been discovered by Quesma )
//
// assert.Equal(t, http.StatusOK, resp.StatusCode)
// assert.Contains(t, string(bodyBytes), "Alice")
// assert.Contains(t, string(bodyBytes), "Bob")
// assert.Contains(t, string(bodyBytes), "Charlie")
// assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}
104 changes: 104 additions & 0 deletions ci/it/testcases/test_wildcard_disabled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
)

type WildcardDisabledTestcase struct {
IntegrationTestcaseBase
}

func NewWildcardDisabledTestcase() *WildcardDisabledTestcase {
return &WildcardDisabledTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-wildcard-disabled.yml.template",
},
}
}

func (a *WildcardDisabledTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
if err != nil {
return err
}
a.Containers = containers
return nil
}

func (a *WildcardDisabledTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test query is disabled", func(t *testing.T) { a.testQueryIsDisabled(ctx, t) })
t.Run("test ingest is disabled", func(t *testing.T) { a.testIngestIsDisabled(ctx, t) })
t.Run("test explicit index query enabled", func(t *testing.T) { a.testExplicitIndexQueryIsEnabled(ctx, t) })
t.Run("test explicit index ingest enabled", func(t *testing.T) { a.testExplicitIndexIngestIsEnabled(ctx, t) })
return nil
}

func (a *WildcardDisabledTestcase) testBasicRequest(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "GET", "/", nil)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

func (a *WildcardDisabledTestcase) testQueryIsDisabled(ctx context.Context, t *testing.T) {
if _, err := a.RequestToElasticsearch(ctx, "PUT", "/elastic_index1", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err := a.RequestToElasticsearch(ctx, "POST", "/elastic_index1/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}

// Quesma should reject all queries
for _, tt := range []string{"test_table", "extra_index", "explicitly_disabled1", "explicitly_disabled2", "explicitly_disabled3", "ingest_enabled", "elastic_index1"} {
t.Run(tt, func(t *testing.T) {
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_search", tt), []byte(`{"query": {"match_all": {}}}`))
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}
}

func (a *WildcardDisabledTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) {
if _, err := a.RequestToElasticsearch(ctx, "PUT", "/elastic_index2", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err := a.RequestToElasticsearch(ctx, "POST", "/elastic_index2/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}

// Quesma should reject all ingest requests
for _, tt := range []string{"test_table", "extra_index", "explicitly_disabled1", "explicitly_disabled2", "explicitly_disabled3", "query_enabled", "elastic_index2"} {
t.Run(tt, func(t *testing.T) {
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 22222}`))
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}
}

func (a *WildcardDisabledTestcase) testExplicitIndexQueryIsEnabled(ctx context.Context, t *testing.T) {
// query_enabled is the only index with query enabled
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/query_enabled/_search", []byte(`{"query": {"match_all": {}}}`))
assert.NotContains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
// TODO: the actual request currently fails since there's no such table in ClickHouse
}

func (a *WildcardDisabledTestcase) testExplicitIndexIngestIsEnabled(ctx context.Context, t *testing.T) {
// ingest_enabled is the only index with ingest enabled
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/ingest_enabled/_doc", []byte(`{"name": "Piotr", "age": 22222}`))
assert.NotContains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}
4 changes: 2 additions & 2 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
return fmt.Errorf("index name '%s' in processor configuration is an index pattern, not allowed", indexName)
}
if p.Type == QuesmaV1ProcessorQuery {
if len(indexConfig.Target) != 1 && len(indexConfig.Target) != 2 {
return fmt.Errorf("configuration of index %s must have one or two targets (query processor)", indexName)
if len(indexConfig.Target) > 2 {
return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName)
}
} else {
if len(indexConfig.Target) > 2 {
Expand Down
1 change: 1 addition & 0 deletions quesma/table_resolver/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string)

return &Decision{
UseConnectors: useConnectors,
IsClosed: len(useConnectors) == 0,
Reason: fmt.Sprintf("Using default wildcard ('%s') configuration for %s processor", config.DefaultWildcardIndexName, pipeline),
}
}
Expand Down
Loading