From 85d5a2f30469e42a296c00022ffa40c44aa41684 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 10:20:18 +0200 Subject: [PATCH 1/9] Use special syntax for mentions --- .github/workflows/integration-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index dce91702b..afb640499 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -57,7 +57,7 @@ jobs: with: channel-id: ${{ secrets.SLACK_CHANNEL_ID }} slack-message: | - :exclamation: *Integration tests failed.* :exclamation: @channel + :exclamation: *Integration tests failed.* :exclamation: Last commit by: ${{ steps.get_author.outputs.author }} env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} From 895ada61383834f15afc4845ab83745403e9d7d6 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 11:12:27 +0200 Subject: [PATCH 2/9] test --- .../test_reading_clickhouse_tables.go | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/ci/it/testcases/test_reading_clickhouse_tables.go b/ci/it/testcases/test_reading_clickhouse_tables.go index 6b2834656..986e87c97 100644 --- a/ci/it/testcases/test_reading_clickhouse_tables.go +++ b/ci/it/testcases/test_reading_clickhouse_tables.go @@ -2,7 +2,9 @@ package testcases import ( "context" + "encoding/json" "github.com/stretchr/testify/assert" + "io" "testing" ) @@ -30,6 +32,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) SetupContainers(ctx context func (a *ReadingClickHouseTablesIntegrationTestcase) RunTests(ctx context.Context, t *testing.T) error { a.testBasicRequest(ctx, t) a.testRandomThing(ctx, t) + a.testWildcardGoesToElastic(ctx, t) return nil } @@ -63,3 +66,43 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testRandomThing(ctx context defer resp.Body.Close() assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) } + +func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { + // Given an index in Elasticsearch which falls under `*` in the configuration + var err error + if _, err = a.RequestToElasticsearch(ctx, "PUT", "/extra_index", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/extra_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/extra_index/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + // When Quesma searches for that document + resp, err := a.RequestToQuesma(ctx, "POST", "/extra_index/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + var jsonResponse map[string]interface{} + if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil { + t.Fatalf("Failed to unmarshal response body: %s", err) + } + hits, _ := jsonResponse["hits"].(map[string]interface{}) + // We should get proper search result from Elasticsearch + hit := hits["total"] + hitValue := hit.(map[string]interface{})["value"] + assert.Equal(t, float64(1), hitValue) + assert.Contains(t, string(bodyBytes), "Alice") + assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +// At this moment this configuration does not disable ingest (ingest req's will get routed to ES and handled normally) +// Future test idea -> ensure ingest req gets rejected. From ba414cea86ecd1b5552046f6db6109e5c5adbdc0 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 12:48:31 +0200 Subject: [PATCH 3/9] test two pipelines and empty target --- .../quesma-with-two-pipelines.yml.template | 54 ++++++++ ci/it/integration_test.go | 5 + ci/it/testcases/test_two_pipelines.go | 128 ++++++++++++++++++ 3 files changed, 187 insertions(+) create mode 100644 ci/it/configs/quesma-with-two-pipelines.yml.template create mode 100644 ci/it/testcases/test_two_pipelines.go diff --git a/ci/it/configs/quesma-with-two-pipelines.yml.template b/ci/it/configs/quesma-with-two-pipelines.yml.template new file mode 100644 index 000000000..9a0ae7077 --- /dev/null +++ b/ci/it/configs/quesma-with-two-pipelines.yml.template @@ -0,0 +1,54 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: my-hydrolix-instance + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +ingestStatistics: true +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + siem: + target: [my-hydrolix-instance] + logs: + target: [my-hydrolix-instance] + test_index: + target: [my-minimal-elasticsearch] + "*": + target: [ my-minimal-elasticsearch ] + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + test_index: + target: [ my-minimal-elasticsearch ] + logs_disabled: + target: [ ] + "*": + target: [ my-minimal-elasticsearch ] +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ] + diff --git a/ci/it/integration_test.go b/ci/it/integration_test.go index 0579e58bc..3356abe93 100644 --- a/ci/it/integration_test.go +++ b/ci/it/integration_test.go @@ -26,3 +26,8 @@ func TestReadingClickHouseTablesIntegrationTestcase(t *testing.T) { testCase := testcases.NewReadingClickHouseTablesIntegrationTestcase() runIntegrationTest(t, testCase) } + +func TestQueryAndIngestPipelineTestcase(t *testing.T) { + testCase := testcases.NewQueryAndIngestPipelineTestcase() + runIntegrationTest(t, testCase) +} diff --git a/ci/it/testcases/test_two_pipelines.go b/ci/it/testcases/test_two_pipelines.go new file mode 100644 index 000000000..b73fded72 --- /dev/null +++ b/ci/it/testcases/test_two_pipelines.go @@ -0,0 +1,128 @@ +package testcases + +import ( + "context" + "encoding/json" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +type QueryAndIngestPipelineTestcase struct { + IntegrationTestcaseBase +} + +func NewQueryAndIngestPipelineTestcase() *QueryAndIngestPipelineTestcase { + return &QueryAndIngestPipelineTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-with-two-pipelines.yml.template", + }, + } +} + +func (a *QueryAndIngestPipelineTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + if err != nil { + return err + } + a.Containers = containers + return nil +} + +func (a *QueryAndIngestPipelineTestcase) RunTests(ctx context.Context, t *testing.T) error { + a.testBasicRequest(ctx, t) + a.testWildcardGoesToElastic(ctx, t) + a.testEmptyTargetDoc(ctx, t) + a.testEmptyTargetBulk(ctx, t) + return nil +} + +func (a *QueryAndIngestPipelineTestcase) testBasicRequest(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "GET", "/", nil) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, 200, resp.StatusCode) +} + +func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { + // Given an index in Elasticsearch which falls under `*` in the configuration + var err error + if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + // When Quesma searches for that document + resp, err := a.RequestToQuesma(ctx, "POST", "/unmentioned_index/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + var jsonResponse map[string]interface{} + if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil { + t.Fatalf("Failed to unmarshal response body: %s", err) + } + hits, _ := jsonResponse["hits"].(map[string]interface{}) + // We should get proper search result from Elasticsearch + hit := hits["total"] + hitValue := hit.(map[string]interface{})["value"] + assert.Equal(t, float64(1), hitValue) + assert.Contains(t, string(bodyBytes), "Alice") + assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +func (a *QueryAndIngestPipelineTestcase) testEmptyTargetDoc(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs_disabled/_doc", []byte(`{"name": "Alice"}`)) + if err != nil { + t.Fatalf("Error sending POST request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +func (a *QueryAndIngestPipelineTestcase) testEmptyTargetBulk(ctx context.Context, t *testing.T) { + bulkPayload := []byte(` + { "index": { "_index": "logs_disabled", "_id": "1" } } + { "name": "Alice", "age": 30 } + { "index": { "_index": "logs_disabled", "_id": "2" } } + { "name": "Bob", "age": 25 } + +`) + resp, err := a.RequestToQuesma(ctx, "POST", "/_bulk", bulkPayload) + if err != nil { + t.Fatalf("Error sending POST request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +// TODO: A POST to /logs_disabled/_doc/:id is going to be routed to Elasticsearch and will return result in writing to the index. From a72890f66d29392409c2509fdf46aa8e61fd18ba Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 12:59:02 +0200 Subject: [PATCH 4/9] one more example --- ...-dual-writes-and-common-table.yml.template | 68 ++++++++++ ci/it/integration_test.go | 5 + .../test_dual_write_and_common_table.go | 128 ++++++++++++++++++ 3 files changed, 201 insertions(+) create mode 100644 ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template create mode 100644 ci/it/testcases/test_dual_write_and_common_table.go diff --git a/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template new file mode 100644 index 000000000..4106dd58e --- /dev/null +++ b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template @@ -0,0 +1,68 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: e + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: c + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +ingestStatistics: true +processors: + - name: QP + type: quesma-v1-processor-query + config: + indexes: + logs-1: + target: [ e ] + logs-2: + target: [ e ] + logs-3: + target: [ c, e ] + logs-4: + useCommonTable: true + target: [ c ] + logs-5: + useCommonTable: true + target: [ c ] + "*": + target: [ e ] + - name: IP + type: quesma-v1-processor-ingest + config: + indexes: + logs-1: + target: [ e ] + logs-2: + target: [ e ] + logs-3: + target: [ c, e ] + logs-4: + useCommonTable: true + target: [ c ] + "*": + target: [ e ] + logs-5: + useCommonTable: true + target: [ ] + +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ QP ] + backendConnectors: [ e, c ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ IP ] + backendConnectors: [ e, c ] diff --git a/ci/it/integration_test.go b/ci/it/integration_test.go index 3356abe93..21f60f02d 100644 --- a/ci/it/integration_test.go +++ b/ci/it/integration_test.go @@ -31,3 +31,8 @@ func TestQueryAndIngestPipelineTestcase(t *testing.T) { testCase := testcases.NewQueryAndIngestPipelineTestcase() runIntegrationTest(t, testCase) } + +func TestDualWriteAndCommonTableTestcase(t *testing.T) { + testCase := testcases.NewDualWriteAndCommonTableTestcase() + runIntegrationTest(t, testCase) +} diff --git a/ci/it/testcases/test_dual_write_and_common_table.go b/ci/it/testcases/test_dual_write_and_common_table.go new file mode 100644 index 000000000..7fadfbfd2 --- /dev/null +++ b/ci/it/testcases/test_dual_write_and_common_table.go @@ -0,0 +1,128 @@ +package testcases + +import ( + "context" + "encoding/json" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +type DualWriteAndCommonTableTestcase struct { + IntegrationTestcaseBase +} + +func NewDualWriteAndCommonTableTestcase() *DualWriteAndCommonTableTestcase { + return &DualWriteAndCommonTableTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-with-dual-writes-and-common-table.yml.template", + }, + } +} + +func (a *DualWriteAndCommonTableTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + if err != nil { + return err + } + a.Containers = containers + return nil +} + +func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error { + //a.testBasicRequest(ctx, t) + //a.testWildcardGoesToElastic(ctx, t) + //a.testEmptyTargetDoc(ctx, t) + //a.testEmptyTargetBulk(ctx, t) + return nil +} + +func (a *DualWriteAndCommonTableTestcase) testBasicRequest(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "GET", "/", nil) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, 200, resp.StatusCode) +} + +func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { + // Given an index in Elasticsearch which falls under `*` in the configuration + var err error + if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + // When Quesma searches for that document + resp, err := a.RequestToQuesma(ctx, "POST", "/unmentioned_index/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + var jsonResponse map[string]interface{} + if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil { + t.Fatalf("Failed to unmarshal response body: %s", err) + } + hits, _ := jsonResponse["hits"].(map[string]interface{}) + // We should get proper search result from Elasticsearch + hit := hits["total"] + hitValue := hit.(map[string]interface{})["value"] + assert.Equal(t, float64(1), hitValue) + assert.Contains(t, string(bodyBytes), "Alice") + assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +func (a *DualWriteAndCommonTableTestcase) testEmptyTargetDoc(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs_disabled/_doc", []byte(`{"name": "Alice"}`)) + if err != nil { + t.Fatalf("Error sending POST request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +func (a *DualWriteAndCommonTableTestcase) testEmptyTargetBulk(ctx context.Context, t *testing.T) { + bulkPayload := []byte(` + { "index": { "_index": "logs_disabled", "_id": "1" } } + { "name": "Alice", "age": 30 } + { "index": { "_index": "logs_disabled", "_id": "2" } } + { "name": "Bob", "age": 25 } + +`) + resp, err := a.RequestToQuesma(ctx, "POST", "/_bulk", bulkPayload) + if err != nil { + t.Fatalf("Error sending POST request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +// TODO: A POST to /logs_disabled/_doc/:id is going to be routed to Elasticsearch and will return result in writing to the index. From 342024754d71de5c1dae9f48fe88db887af17873 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 13:05:26 +0200 Subject: [PATCH 5/9] one more example --- ci/it/testcases/test_dual_write_and_common_table.go | 4 ++-- ci/it/testcases/test_reading_clickhouse_tables.go | 4 ++-- ci/it/testcases/test_transparent_proxy.go | 4 ++-- ci/it/testcases/test_two_pipelines.go | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ci/it/testcases/test_dual_write_and_common_table.go b/ci/it/testcases/test_dual_write_and_common_table.go index 7fadfbfd2..22cfe1e53 100644 --- a/ci/it/testcases/test_dual_write_and_common_table.go +++ b/ci/it/testcases/test_dual_write_and_common_table.go @@ -44,7 +44,7 @@ func (a *DualWriteAndCommonTableTestcase) testBasicRequest(ctx context.Context, t.Fatalf("Failed to make GET request: %s", err) } defer resp.Body.Close() - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) } func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { @@ -79,7 +79,7 @@ func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context. hitValue := hit.(map[string]interface{})["value"] assert.Equal(t, float64(1), hitValue) assert.Contains(t, string(bodyBytes), "Alice") - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) } diff --git a/ci/it/testcases/test_reading_clickhouse_tables.go b/ci/it/testcases/test_reading_clickhouse_tables.go index 986e87c97..eb93ffed6 100644 --- a/ci/it/testcases/test_reading_clickhouse_tables.go +++ b/ci/it/testcases/test_reading_clickhouse_tables.go @@ -42,7 +42,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testBasicRequest(ctx contex t.Fatalf("Failed to make GET request: %s", err) } defer resp.Body.Close() - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) } func (a *ReadingClickHouseTablesIntegrationTestcase) testRandomThing(ctx context.Context, t *testing.T) { @@ -99,7 +99,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(c hitValue := hit.(map[string]interface{})["value"] assert.Equal(t, float64(1), hitValue) assert.Contains(t, string(bodyBytes), "Alice") - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) } diff --git a/ci/it/testcases/test_transparent_proxy.go b/ci/it/testcases/test_transparent_proxy.go index 6501e2bbf..060f73776 100644 --- a/ci/it/testcases/test_transparent_proxy.go +++ b/ci/it/testcases/test_transparent_proxy.go @@ -41,7 +41,7 @@ func (a *TransparentProxyIntegrationTestcase) testBasicRequest(ctx context.Conte t.Fatalf("Failed to make GET request: %s", err) } defer resp.Body.Close() - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) } func (a *TransparentProxyIntegrationTestcase) testIfCatHealthRequestReachesElasticsearch(ctx context.Context, t *testing.T) { @@ -54,7 +54,7 @@ func (a *TransparentProxyIntegrationTestcase) testIfCatHealthRequestReachesElast if err != nil { t.Fatalf("Failed to read response body: %s", err) } - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-elastic-product")) assert.Contains(t, string(bodyBytes), "green") } diff --git a/ci/it/testcases/test_two_pipelines.go b/ci/it/testcases/test_two_pipelines.go index b73fded72..27952da65 100644 --- a/ci/it/testcases/test_two_pipelines.go +++ b/ci/it/testcases/test_two_pipelines.go @@ -44,7 +44,7 @@ func (a *QueryAndIngestPipelineTestcase) testBasicRequest(ctx context.Context, t t.Fatalf("Failed to make GET request: %s", err) } defer resp.Body.Close() - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) } func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { @@ -79,7 +79,7 @@ func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.C hitValue := hit.(map[string]interface{})["value"] assert.Equal(t, float64(1), hitValue) assert.Contains(t, string(bodyBytes), "Alice") - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) } From 1ec716fc2b452edb0b3922772d5eaa49d3db5280 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 14:48:50 +0200 Subject: [PATCH 6/9] test --- ...-dual-writes-and-common-table.yml.template | 4 +- .../test_dual_write_and_common_table.go | 113 ++++++++++-------- .../test_reading_clickhouse_tables.go | 1 + ci/it/testcases/test_transparent_proxy.go | 1 + 4 files changed, 70 insertions(+), 49 deletions(-) diff --git a/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template index 4106dd58e..4e9cb9622 100644 --- a/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template +++ b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template @@ -27,7 +27,7 @@ processors: logs-1: target: [ e ] logs-2: - target: [ e ] + target: [ c ] logs-3: target: [ c, e ] logs-4: @@ -45,7 +45,7 @@ processors: logs-1: target: [ e ] logs-2: - target: [ e ] + target: [ c ] logs-3: target: [ c, e ] logs-4: diff --git a/ci/it/testcases/test_dual_write_and_common_table.go b/ci/it/testcases/test_dual_write_and_common_table.go index 22cfe1e53..acf8286fc 100644 --- a/ci/it/testcases/test_dual_write_and_common_table.go +++ b/ci/it/testcases/test_dual_write_and_common_table.go @@ -31,10 +31,12 @@ func (a *DualWriteAndCommonTableTestcase) SetupContainers(ctx context.Context) e } func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error { - //a.testBasicRequest(ctx, t) - //a.testWildcardGoesToElastic(ctx, t) - //a.testEmptyTargetDoc(ctx, t) - //a.testEmptyTargetBulk(ctx, t) + a.testBasicRequest(ctx, t) + a.testIngestToClickHouseWorks(ctx, t) + //a.testIngestToCommonTableWorks(ctx, t) + //a.testDualQueryReturnsDataFromClickHouse(ctx, t) + //a.testDualWritesWork(ctx, t) + a.testWildcardGoesToElastic(ctx, t) return nil } @@ -47,6 +49,66 @@ func (a *DualWriteAndCommonTableTestcase) testBasicRequest(ctx context.Context, assert.Equal(t, http.StatusOK, resp.StatusCode) } +func (a *DualWriteAndCommonTableTestcase) testIngestToClickHouseWorks(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-2/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'logs-2'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + + // Also make sure no such index got created in Elasticsearch + resp, err = a.RequestToElasticsearch(ctx, "GET", "/_cat/indices", nil) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Greater(t, len(bodyBytes), 0) + assert.Contains(t, string(bodyBytes), "green") // at least one index should be there + assert.NotContains(t, string(bodyBytes), "logs-2") +} + func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { // Given an index in Elasticsearch which falls under `*` in the configuration var err error @@ -83,46 +145,3 @@ func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context. assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) } - -func (a *DualWriteAndCommonTableTestcase) testEmptyTargetDoc(ctx context.Context, t *testing.T) { - resp, err := a.RequestToQuesma(ctx, "POST", "/logs_disabled/_doc", []byte(`{"name": "Alice"}`)) - if err != nil { - t.Fatalf("Error sending POST request: %s", err) - } - defer resp.Body.Close() - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatalf("Failed to read response body: %s", err) - } - - assert.Contains(t, string(bodyBytes), "index_closed_exception") - assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) - assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) -} - -func (a *DualWriteAndCommonTableTestcase) testEmptyTargetBulk(ctx context.Context, t *testing.T) { - bulkPayload := []byte(` - { "index": { "_index": "logs_disabled", "_id": "1" } } - { "name": "Alice", "age": 30 } - { "index": { "_index": "logs_disabled", "_id": "2" } } - { "name": "Bob", "age": 25 } - -`) - resp, err := a.RequestToQuesma(ctx, "POST", "/_bulk", bulkPayload) - if err != nil { - t.Fatalf("Error sending POST request: %s", err) - } - defer resp.Body.Close() - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatalf("Failed to read response body: %s", err) - } - - assert.Contains(t, string(bodyBytes), "index_closed_exception") - assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) - assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) -} - -// TODO: A POST to /logs_disabled/_doc/:id is going to be routed to Elasticsearch and will return result in writing to the index. diff --git a/ci/it/testcases/test_reading_clickhouse_tables.go b/ci/it/testcases/test_reading_clickhouse_tables.go index eb93ffed6..ca0f7b3e8 100644 --- a/ci/it/testcases/test_reading_clickhouse_tables.go +++ b/ci/it/testcases/test_reading_clickhouse_tables.go @@ -5,6 +5,7 @@ import ( "encoding/json" "github.com/stretchr/testify/assert" "io" + "net/http" "testing" ) diff --git a/ci/it/testcases/test_transparent_proxy.go b/ci/it/testcases/test_transparent_proxy.go index 060f73776..1becffa2d 100644 --- a/ci/it/testcases/test_transparent_proxy.go +++ b/ci/it/testcases/test_transparent_proxy.go @@ -4,6 +4,7 @@ import ( "context" "github.com/stretchr/testify/assert" "io" + "net/http" "testing" ) From fac97c4761a2c6b7a2dfd39fc63bb2addced904a Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 15:04:04 +0200 Subject: [PATCH 7/9] dual write test --- .../test_dual_write_and_common_table.go | 69 +++++++++++++++++-- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/ci/it/testcases/test_dual_write_and_common_table.go b/ci/it/testcases/test_dual_write_and_common_table.go index acf8286fc..8fda1aa4f 100644 --- a/ci/it/testcases/test_dual_write_and_common_table.go +++ b/ci/it/testcases/test_dual_write_and_common_table.go @@ -35,7 +35,7 @@ func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testi a.testIngestToClickHouseWorks(ctx, t) //a.testIngestToCommonTableWorks(ctx, t) //a.testDualQueryReturnsDataFromClickHouse(ctx, t) - //a.testDualWritesWork(ctx, t) + a.testDualWritesWork(ctx, t) a.testWildcardGoesToElastic(ctx, t) return nil } @@ -94,7 +94,7 @@ func (a *DualWriteAndCommonTableTestcase) testIngestToClickHouseWorks(ctx contex assert.Equal(t, 31337, age) // Also make sure no such index got created in Elasticsearch - resp, err = a.RequestToElasticsearch(ctx, "GET", "/_cat/indices", nil) + resp, err = a.RequestToElasticsearch(ctx, "GET", "/logs-2/_refresh", nil) if err != nil { t.Fatalf("Failed to make GET request: %s", err) } @@ -103,10 +103,69 @@ func (a *DualWriteAndCommonTableTestcase) testIngestToClickHouseWorks(ctx contex if err != nil { t.Fatalf("Failed to read response body: %s", err) } + + assert.Equal(t, http.StatusNotFound, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "no such index [logs-2]") +} + +func (a *DualWriteAndCommonTableTestcase) testDualWritesWork(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-3/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'logs-3'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + + // Also make sure no such index got created in Elasticsearch + _, _ = a.RequestToElasticsearch(ctx, "GET", "/logs-3/_refresh", nil) + resp, err = a.RequestToElasticsearch(ctx, "GET", "/logs-3/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Greater(t, len(bodyBytes), 0) - assert.Contains(t, string(bodyBytes), "green") // at least one index should be there - assert.NotContains(t, string(bodyBytes), "logs-2") + assert.Contains(t, string(bodyBytes), "Przemyslaw") } func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { From b4132edb9b78993911a267ae3ed9062d712120cb Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 15:11:59 +0200 Subject: [PATCH 8/9] dual query returns from CH --- ...-dual-writes-and-common-table.yml.template | 4 ++ .../test_dual_write_and_common_table.go | 67 ++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template index 4e9cb9622..6971eab85 100644 --- a/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template +++ b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template @@ -30,6 +30,8 @@ processors: target: [ c ] logs-3: target: [ c, e ] + logs-dual-query: + target: [ c, e ] logs-4: useCommonTable: true target: [ c ] @@ -48,6 +50,8 @@ processors: target: [ c ] logs-3: target: [ c, e ] + logs-dual-query: + target: [ c, e ] logs-4: useCommonTable: true target: [ c ] diff --git a/ci/it/testcases/test_dual_write_and_common_table.go b/ci/it/testcases/test_dual_write_and_common_table.go index 8fda1aa4f..d95edf708 100644 --- a/ci/it/testcases/test_dual_write_and_common_table.go +++ b/ci/it/testcases/test_dual_write_and_common_table.go @@ -34,7 +34,7 @@ func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testi a.testBasicRequest(ctx, t) a.testIngestToClickHouseWorks(ctx, t) //a.testIngestToCommonTableWorks(ctx, t) - //a.testDualQueryReturnsDataFromClickHouse(ctx, t) + a.testDualQueryReturnsDataFromClickHouse(ctx, t) a.testDualWritesWork(ctx, t) a.testWildcardGoesToElastic(ctx, t) return nil @@ -49,6 +49,71 @@ func (a *DualWriteAndCommonTableTestcase) testBasicRequest(ctx context.Context, assert.Equal(t, http.StatusOK, resp.StatusCode) } +func (a *DualWriteAndCommonTableTestcase) testDualQueryReturnsDataFromClickHouse(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-dual-query/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'logs-dual-query'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + + // In the meantime let's delete the index from Elasticsearch + _, _ = a.RequestToElasticsearch(ctx, "DELETE", "/logs-dual-query", nil) + if err != nil { + t.Fatalf("Failed to make DELETE request: %s", err) + } + // FINAL TEST - WHETHER QUESMA RETURNS DATA FROM CLICKHOUSE + resp, err = a.RequestToQuesma(ctx, "GET", "/logs-dual-query/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "Przemyslaw") + assert.Contains(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) +} + func (a *DualWriteAndCommonTableTestcase) testIngestToClickHouseWorks(ctx context.Context, t *testing.T) { resp, err := a.RequestToQuesma(ctx, "POST", "/logs-2/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) if err != nil { From e72b4aa450fe4255ffa5bee1c49a108a3fb2a4f1 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Fri, 11 Oct 2024 15:29:44 +0200 Subject: [PATCH 9/9] COMPLETE SET --- .../test_dual_write_and_common_table.go | 66 ++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/ci/it/testcases/test_dual_write_and_common_table.go b/ci/it/testcases/test_dual_write_and_common_table.go index d95edf708..af698b4a9 100644 --- a/ci/it/testcases/test_dual_write_and_common_table.go +++ b/ci/it/testcases/test_dual_write_and_common_table.go @@ -33,7 +33,7 @@ func (a *DualWriteAndCommonTableTestcase) SetupContainers(ctx context.Context) e func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error { a.testBasicRequest(ctx, t) a.testIngestToClickHouseWorks(ctx, t) - //a.testIngestToCommonTableWorks(ctx, t) + a.testIngestToCommonTableWorks(ctx, t) a.testDualQueryReturnsDataFromClickHouse(ctx, t) a.testDualWritesWork(ctx, t) a.testWildcardGoesToElastic(ctx, t) @@ -48,6 +48,70 @@ func (a *DualWriteAndCommonTableTestcase) testBasicRequest(ctx context.Context, defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } +func (a *DualWriteAndCommonTableTestcase) testIngestToCommonTableWorks(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-4/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'quesma_common_table'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + var quesmaIndexName string + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "__quesma_index_name": + if v, ok := col.(string); ok { + quesmaIndexName = v + } + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 && quesmaIndexName == "logs-4" { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + assert.Equal(t, "logs-4", quesmaIndexName) + + resp, err = a.RequestToQuesma(ctx, "GET", "/logs-4/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "Przemyslaw") + assert.Contains(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) +} func (a *DualWriteAndCommonTableTestcase) testDualQueryReturnsDataFromClickHouse(ctx context.Context, t *testing.T) { resp, err := a.RequestToQuesma(ctx, "POST", "/logs-dual-query/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))