diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index dce91702b..afb640499 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -57,7 +57,7 @@ jobs: with: channel-id: ${{ secrets.SLACK_CHANNEL_ID }} slack-message: | - :exclamation: *Integration tests failed.* :exclamation: @channel + :exclamation: *Integration tests failed.* :exclamation: Last commit by: ${{ steps.get_author.outputs.author }} env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} diff --git a/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template new file mode 100644 index 000000000..6971eab85 --- /dev/null +++ b/ci/it/configs/quesma-with-dual-writes-and-common-table.yml.template @@ -0,0 +1,72 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: e + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: c + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +ingestStatistics: true +processors: + - name: QP + type: quesma-v1-processor-query + config: + indexes: + logs-1: + target: [ e ] + logs-2: + target: [ c ] + logs-3: + target: [ c, e ] + logs-dual-query: + target: [ c, e ] + logs-4: + useCommonTable: true + target: [ c ] + logs-5: + useCommonTable: true + target: [ c ] + "*": + target: [ e ] + - name: IP + type: quesma-v1-processor-ingest + config: + indexes: + logs-1: + target: [ e ] + logs-2: + target: [ c ] + logs-3: + target: [ c, e ] + logs-dual-query: + target: [ c, e ] + logs-4: + useCommonTable: true + target: [ c ] + "*": + target: [ e ] + logs-5: + useCommonTable: true + target: [ ] + +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ QP ] + backendConnectors: [ e, c ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ IP ] + backendConnectors: [ e, c ] diff --git a/ci/it/configs/quesma-with-two-pipelines.yml.template b/ci/it/configs/quesma-with-two-pipelines.yml.template new file mode 100644 index 000000000..9a0ae7077 --- /dev/null +++ b/ci/it/configs/quesma-with-two-pipelines.yml.template @@ -0,0 +1,54 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: my-hydrolix-instance + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +ingestStatistics: true +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + siem: + target: [my-hydrolix-instance] + logs: + target: [my-hydrolix-instance] + test_index: + target: [my-minimal-elasticsearch] + "*": + target: [ my-minimal-elasticsearch ] + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + test_index: + target: [ my-minimal-elasticsearch ] + logs_disabled: + target: [ ] + "*": + target: [ my-minimal-elasticsearch ] +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ] + diff --git a/ci/it/integration_test.go b/ci/it/integration_test.go index 0579e58bc..21f60f02d 100644 --- a/ci/it/integration_test.go +++ b/ci/it/integration_test.go @@ -26,3 +26,13 @@ func TestReadingClickHouseTablesIntegrationTestcase(t *testing.T) { testCase := testcases.NewReadingClickHouseTablesIntegrationTestcase() runIntegrationTest(t, testCase) } + +func TestQueryAndIngestPipelineTestcase(t *testing.T) { + testCase := testcases.NewQueryAndIngestPipelineTestcase() + runIntegrationTest(t, testCase) +} + +func TestDualWriteAndCommonTableTestcase(t *testing.T) { + testCase := testcases.NewDualWriteAndCommonTableTestcase() + runIntegrationTest(t, testCase) +} diff --git a/ci/it/testcases/test_dual_write_and_common_table.go b/ci/it/testcases/test_dual_write_and_common_table.go new file mode 100644 index 000000000..af698b4a9 --- /dev/null +++ b/ci/it/testcases/test_dual_write_and_common_table.go @@ -0,0 +1,335 @@ +package testcases + +import ( + "context" + "encoding/json" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +type DualWriteAndCommonTableTestcase struct { + IntegrationTestcaseBase +} + +func NewDualWriteAndCommonTableTestcase() *DualWriteAndCommonTableTestcase { + return &DualWriteAndCommonTableTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-with-dual-writes-and-common-table.yml.template", + }, + } +} + +func (a *DualWriteAndCommonTableTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + if err != nil { + return err + } + a.Containers = containers + return nil +} + +func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error { + a.testBasicRequest(ctx, t) + a.testIngestToClickHouseWorks(ctx, t) + a.testIngestToCommonTableWorks(ctx, t) + a.testDualQueryReturnsDataFromClickHouse(ctx, t) + a.testDualWritesWork(ctx, t) + a.testWildcardGoesToElastic(ctx, t) + return nil +} + +func (a *DualWriteAndCommonTableTestcase) testBasicRequest(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "GET", "/", nil) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} +func (a *DualWriteAndCommonTableTestcase) testIngestToCommonTableWorks(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-4/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'quesma_common_table'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + var quesmaIndexName string + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "__quesma_index_name": + if v, ok := col.(string); ok { + quesmaIndexName = v + } + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 && quesmaIndexName == "logs-4" { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + assert.Equal(t, "logs-4", quesmaIndexName) + + resp, err = a.RequestToQuesma(ctx, "GET", "/logs-4/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "Przemyslaw") + assert.Contains(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) +} + +func (a *DualWriteAndCommonTableTestcase) testDualQueryReturnsDataFromClickHouse(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-dual-query/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'logs-dual-query'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + + // In the meantime let's delete the index from Elasticsearch + _, _ = a.RequestToElasticsearch(ctx, "DELETE", "/logs-dual-query", nil) + if err != nil { + t.Fatalf("Failed to make DELETE request: %s", err) + } + // FINAL TEST - WHETHER QUESMA RETURNS DATA FROM CLICKHOUSE + resp, err = a.RequestToQuesma(ctx, "GET", "/logs-dual-query/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "Przemyslaw") + assert.Contains(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) +} + +func (a *DualWriteAndCommonTableTestcase) testIngestToClickHouseWorks(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-2/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'logs-2'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + + // Also make sure no such index got created in Elasticsearch + resp, err = a.RequestToElasticsearch(ctx, "GET", "/logs-2/_refresh", nil) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Equal(t, http.StatusNotFound, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "no such index [logs-2]") +} + +func (a *DualWriteAndCommonTableTestcase) testDualWritesWork(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs-3/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + if err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + chQuery := "SELECT * FROM 'logs-3'" + rows, err := a.ExecuteClickHouseQuery(ctx, chQuery) + if err != nil { + t.Fatalf("Failed to execute query: %s", err) + } + columnTypes, err := rows.ColumnTypes() + values := make([]interface{}, len(columnTypes)) + valuePtrs := make([]interface{}, len(columnTypes)) + for i := range values { + valuePtrs[i] = &values[i] + } + var name string + var age int + for rows.Next() { + if err := rows.Scan(valuePtrs...); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + for i, col := range values { + switch columnTypes[i].Name() { + case "name": + if v, ok := col.(*string); ok { + name = *v + } + case "age": + if v, ok := col.(*int64); ok { + age = int(*v) + } + } + } + if name == "Przemyslaw" && age == 31337 { + break + } + } + assert.Equal(t, "Przemyslaw", name) + assert.Equal(t, 31337, age) + + // Also make sure no such index got created in Elasticsearch + _, _ = a.RequestToElasticsearch(ctx, "GET", "/logs-3/_refresh", nil) + resp, err = a.RequestToElasticsearch(ctx, "GET", "/logs-3/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "Przemyslaw") +} + +func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { + // Given an index in Elasticsearch which falls under `*` in the configuration + var err error + if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + // When Quesma searches for that document + resp, err := a.RequestToQuesma(ctx, "POST", "/unmentioned_index/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + var jsonResponse map[string]interface{} + if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil { + t.Fatalf("Failed to unmarshal response body: %s", err) + } + hits, _ := jsonResponse["hits"].(map[string]interface{}) + // We should get proper search result from Elasticsearch + hit := hits["total"] + hitValue := hit.(map[string]interface{})["value"] + assert.Equal(t, float64(1), hitValue) + assert.Contains(t, string(bodyBytes), "Alice") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} diff --git a/ci/it/testcases/test_reading_clickhouse_tables.go b/ci/it/testcases/test_reading_clickhouse_tables.go index 6b2834656..ca0f7b3e8 100644 --- a/ci/it/testcases/test_reading_clickhouse_tables.go +++ b/ci/it/testcases/test_reading_clickhouse_tables.go @@ -2,7 +2,10 @@ package testcases import ( "context" + "encoding/json" "github.com/stretchr/testify/assert" + "io" + "net/http" "testing" ) @@ -30,6 +33,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) SetupContainers(ctx context func (a *ReadingClickHouseTablesIntegrationTestcase) RunTests(ctx context.Context, t *testing.T) error { a.testBasicRequest(ctx, t) a.testRandomThing(ctx, t) + a.testWildcardGoesToElastic(ctx, t) return nil } @@ -39,7 +43,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testBasicRequest(ctx contex t.Fatalf("Failed to make GET request: %s", err) } defer resp.Body.Close() - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) } func (a *ReadingClickHouseTablesIntegrationTestcase) testRandomThing(ctx context.Context, t *testing.T) { @@ -63,3 +67,43 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testRandomThing(ctx context defer resp.Body.Close() assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) } + +func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { + // Given an index in Elasticsearch which falls under `*` in the configuration + var err error + if _, err = a.RequestToElasticsearch(ctx, "PUT", "/extra_index", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/extra_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/extra_index/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + // When Quesma searches for that document + resp, err := a.RequestToQuesma(ctx, "POST", "/extra_index/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + var jsonResponse map[string]interface{} + if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil { + t.Fatalf("Failed to unmarshal response body: %s", err) + } + hits, _ := jsonResponse["hits"].(map[string]interface{}) + // We should get proper search result from Elasticsearch + hit := hits["total"] + hitValue := hit.(map[string]interface{})["value"] + assert.Equal(t, float64(1), hitValue) + assert.Contains(t, string(bodyBytes), "Alice") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +// At this moment this configuration does not disable ingest (ingest req's will get routed to ES and handled normally) +// Future test idea -> ensure ingest req gets rejected. diff --git a/ci/it/testcases/test_transparent_proxy.go b/ci/it/testcases/test_transparent_proxy.go index 6501e2bbf..1becffa2d 100644 --- a/ci/it/testcases/test_transparent_proxy.go +++ b/ci/it/testcases/test_transparent_proxy.go @@ -4,6 +4,7 @@ import ( "context" "github.com/stretchr/testify/assert" "io" + "net/http" "testing" ) @@ -41,7 +42,7 @@ func (a *TransparentProxyIntegrationTestcase) testBasicRequest(ctx context.Conte t.Fatalf("Failed to make GET request: %s", err) } defer resp.Body.Close() - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) } func (a *TransparentProxyIntegrationTestcase) testIfCatHealthRequestReachesElasticsearch(ctx context.Context, t *testing.T) { @@ -54,7 +55,7 @@ func (a *TransparentProxyIntegrationTestcase) testIfCatHealthRequestReachesElast if err != nil { t.Fatalf("Failed to read response body: %s", err) } - assert.Equal(t, 200, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Equal(t, "Elasticsearch", resp.Header.Get("X-elastic-product")) assert.Contains(t, string(bodyBytes), "green") } diff --git a/ci/it/testcases/test_two_pipelines.go b/ci/it/testcases/test_two_pipelines.go new file mode 100644 index 000000000..27952da65 --- /dev/null +++ b/ci/it/testcases/test_two_pipelines.go @@ -0,0 +1,128 @@ +package testcases + +import ( + "context" + "encoding/json" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +type QueryAndIngestPipelineTestcase struct { + IntegrationTestcaseBase +} + +func NewQueryAndIngestPipelineTestcase() *QueryAndIngestPipelineTestcase { + return &QueryAndIngestPipelineTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-with-two-pipelines.yml.template", + }, + } +} + +func (a *QueryAndIngestPipelineTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + if err != nil { + return err + } + a.Containers = containers + return nil +} + +func (a *QueryAndIngestPipelineTestcase) RunTests(ctx context.Context, t *testing.T) error { + a.testBasicRequest(ctx, t) + a.testWildcardGoesToElastic(ctx, t) + a.testEmptyTargetDoc(ctx, t) + a.testEmptyTargetBulk(ctx, t) + return nil +} + +func (a *QueryAndIngestPipelineTestcase) testBasicRequest(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "GET", "/", nil) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { + // Given an index in Elasticsearch which falls under `*` in the configuration + var err error + if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { + t.Fatalf("Failed to insert document: %s", err) + } + if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + // When Quesma searches for that document + resp, err := a.RequestToQuesma(ctx, "POST", "/unmentioned_index/_search", []byte(`{"query": {"match_all": {}}}`)) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + var jsonResponse map[string]interface{} + if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil { + t.Fatalf("Failed to unmarshal response body: %s", err) + } + hits, _ := jsonResponse["hits"].(map[string]interface{}) + // We should get proper search result from Elasticsearch + hit := hits["total"] + hitValue := hit.(map[string]interface{})["value"] + assert.Equal(t, float64(1), hitValue) + assert.Contains(t, string(bodyBytes), "Alice") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +func (a *QueryAndIngestPipelineTestcase) testEmptyTargetDoc(ctx context.Context, t *testing.T) { + resp, err := a.RequestToQuesma(ctx, "POST", "/logs_disabled/_doc", []byte(`{"name": "Alice"}`)) + if err != nil { + t.Fatalf("Error sending POST request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +func (a *QueryAndIngestPipelineTestcase) testEmptyTargetBulk(ctx context.Context, t *testing.T) { + bulkPayload := []byte(` + { "index": { "_index": "logs_disabled", "_id": "1" } } + { "name": "Alice", "age": 30 } + { "index": { "_index": "logs_disabled", "_id": "2" } } + { "name": "Bob", "age": 25 } + +`) + resp, err := a.RequestToQuesma(ctx, "POST", "/_bulk", bulkPayload) + if err != nil { + t.Fatalf("Error sending POST request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} + +// TODO: A POST to /logs_disabled/_doc/:id is going to be routed to Elasticsearch and will return result in writing to the index.