Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
with:
channel-id: ${{ secrets.SLACK_CHANNEL_ID }}
slack-message: |
:exclamation: *Integration tests failed.* :exclamation: @channel
:exclamation: *Integration tests failed.* :exclamation: <!channel>
Last commit by: ${{ steps.get_author.outputs.author }}
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
Expand Down
54 changes: 54 additions & 0 deletions ci/it/configs/quesma-with-two-pipelines.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-hydrolix-instance
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
siem:
target: [my-hydrolix-instance]
logs:
target: [my-hydrolix-instance]
test_index:
target: [my-minimal-elasticsearch]
"*":
target: [ my-minimal-elasticsearch ]
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
test_index:
target: [ my-minimal-elasticsearch ]
logs_disabled:
target: [ ]
"*":
target: [ my-minimal-elasticsearch ]
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ]

5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ func TestReadingClickHouseTablesIntegrationTestcase(t *testing.T) {
testCase := testcases.NewReadingClickHouseTablesIntegrationTestcase()
runIntegrationTest(t, testCase)
}

func TestQueryAndIngestPipelineTestcase(t *testing.T) {
testCase := testcases.NewQueryAndIngestPipelineTestcase()
runIntegrationTest(t, testCase)
}
43 changes: 43 additions & 0 deletions ci/it/testcases/test_reading_clickhouse_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package testcases

import (
"context"
"encoding/json"
"github.com/stretchr/testify/assert"
"io"
"testing"
)

Expand Down Expand Up @@ -30,6 +32,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) SetupContainers(ctx context
func (a *ReadingClickHouseTablesIntegrationTestcase) RunTests(ctx context.Context, t *testing.T) error {
a.testBasicRequest(ctx, t)
a.testRandomThing(ctx, t)
a.testWildcardGoesToElastic(ctx, t)
return nil
}

Expand Down Expand Up @@ -63,3 +66,43 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testRandomThing(ctx context
defer resp.Body.Close()
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
}

func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) {
// Given an index in Elasticsearch which falls under `*` in the configuration
var err error
if _, err = a.RequestToElasticsearch(ctx, "PUT", "/extra_index", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/extra_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil {
t.Fatalf("Failed to insert document: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/extra_index/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}
// When Quesma searches for that document
resp, err := a.RequestToQuesma(ctx, "POST", "/extra_index/_search", []byte(`{"query": {"match_all": {}}}`))
if err != nil {
t.Fatalf("Failed to make GET request: %s", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %s", err)
}
var jsonResponse map[string]interface{}
if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil {
t.Fatalf("Failed to unmarshal response body: %s", err)
}
hits, _ := jsonResponse["hits"].(map[string]interface{})
// We should get proper search result from Elasticsearch
hit := hits["total"]
hitValue := hit.(map[string]interface{})["value"]
assert.Equal(t, float64(1), hitValue)
assert.Contains(t, string(bodyBytes), "Alice")
assert.Equal(t, 200, resp.StatusCode)
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

// At this moment this configuration does not disable ingest (ingest req's will get routed to ES and handled normally)
// Future test idea -> ensure ingest req gets rejected.
128 changes: 128 additions & 0 deletions ci/it/testcases/test_two_pipelines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package testcases

import (
"context"
"encoding/json"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"testing"
)

type QueryAndIngestPipelineTestcase struct {
IntegrationTestcaseBase
}

func NewQueryAndIngestPipelineTestcase() *QueryAndIngestPipelineTestcase {
return &QueryAndIngestPipelineTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-with-two-pipelines.yml.template",
},
}
}

func (a *QueryAndIngestPipelineTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
if err != nil {
return err
}
a.Containers = containers
return nil
}

func (a *QueryAndIngestPipelineTestcase) RunTests(ctx context.Context, t *testing.T) error {
a.testBasicRequest(ctx, t)
a.testWildcardGoesToElastic(ctx, t)
a.testEmptyTargetDoc(ctx, t)
a.testEmptyTargetBulk(ctx, t)
return nil
}

func (a *QueryAndIngestPipelineTestcase) testBasicRequest(ctx context.Context, t *testing.T) {
resp, err := a.RequestToQuesma(ctx, "GET", "/", nil)
if err != nil {
t.Fatalf("Failed to make GET request: %s", err)
}
defer resp.Body.Close()
assert.Equal(t, 200, resp.StatusCode)
}

func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) {
// Given an index in Elasticsearch which falls under `*` in the configuration
var err error
if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil {
t.Fatalf("Failed to insert document: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}
// When Quesma searches for that document
resp, err := a.RequestToQuesma(ctx, "POST", "/unmentioned_index/_search", []byte(`{"query": {"match_all": {}}}`))
if err != nil {
t.Fatalf("Failed to make GET request: %s", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %s", err)
}
var jsonResponse map[string]interface{}
if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil {
t.Fatalf("Failed to unmarshal response body: %s", err)
}
hits, _ := jsonResponse["hits"].(map[string]interface{})
// We should get proper search result from Elasticsearch
hit := hits["total"]
hitValue := hit.(map[string]interface{})["value"]
assert.Equal(t, float64(1), hitValue)
assert.Contains(t, string(bodyBytes), "Alice")
assert.Equal(t, 200, resp.StatusCode)
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

func (a *QueryAndIngestPipelineTestcase) testEmptyTargetDoc(ctx context.Context, t *testing.T) {
resp, err := a.RequestToQuesma(ctx, "POST", "/logs_disabled/_doc", []byte(`{"name": "Alice"}`))
if err != nil {
t.Fatalf("Error sending POST request: %s", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %s", err)
}

assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

func (a *QueryAndIngestPipelineTestcase) testEmptyTargetBulk(ctx context.Context, t *testing.T) {
bulkPayload := []byte(`
{ "index": { "_index": "logs_disabled", "_id": "1" } }
{ "name": "Alice", "age": 30 }
{ "index": { "_index": "logs_disabled", "_id": "2" } }
{ "name": "Bob", "age": 25 }

`)
resp, err := a.RequestToQuesma(ctx, "POST", "/_bulk", bulkPayload)
if err != nil {
t.Fatalf("Error sending POST request: %s", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %s", err)
}

assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

// TODO: A POST to /logs_disabled/_doc/:id is going to be routed to Elasticsearch and will return result in writing to the index.