diff --git a/ci/it/configs/quesma-ingest.yml.template b/ci/it/configs/quesma-ingest.yml.template index cdc897380..84fe878e2 100644 --- a/ci/it/configs/quesma-ingest.yml.template +++ b/ci/it/configs/quesma-ingest.yml.template @@ -67,6 +67,15 @@ processors: type: geo_point "OriginLocation": type: geo_point + kibana_sample_data_flights_bulk: + target: + - my-clickhouse-instance + schemaOverrides: + fields: + "DestLocation": + type: geo_point + "OriginLocation": + type: geo_point kibana_sample_data_flights_with_mappings: target: - my-clickhouse-instance @@ -148,6 +157,15 @@ processors: type: geo_point "OriginLocation": type: geo_point + kibana_sample_data_flights_bulk: + target: + - my-clickhouse-instance + schemaOverrides: + fields: + "DestLocation": + type: geo_point + "OriginLocation": + type: geo_point kibana_sample_data_flights_with_mappings: target: - my-clickhouse-instance diff --git a/ci/it/testcases/test_ingest.go b/ci/it/testcases/test_ingest.go index 4decc2afd..35f84a36b 100644 --- a/ci/it/testcases/test_ingest.go +++ b/ci/it/testcases/test_ingest.go @@ -8,6 +8,7 @@ package testcases import ( "context" + "fmt" "github.com/stretchr/testify/assert" "maps" "net/http" @@ -35,6 +36,7 @@ func (a *IngestTestcase) SetupContainers(ctx context.Context) error { func (a *IngestTestcase) RunTests(ctx context.Context, t *testing.T) error { t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) }) t.Run("test kibana_sample_data_flights ingest to ClickHouse", func(t *testing.T) { a.testKibanaSampleFlightsIngestToClickHouse(ctx, t) }) + t.Run("test kibana_sample_data_flights ingest to ClickHouse (bulk)", func(t *testing.T) { a.testKibanaSampleFlightsBulkIngestToClickHouse(ctx, t) }) t.Run("test kibana_sample_data_flights ingest to ClickHouse (with PUT mapping)", func(t *testing.T) { a.testKibanaSampleFlightsIngestWithMappingToClickHouse(ctx, t) }) t.Run("test kibana_sample_data_ecommerce ingest to ClickHouse", func(t *testing.T) { a.testKibanaSampleEcommerceIngestToClickHouse(ctx, t) }) t.Run("test kibana_sample_data_ecommerce ingest to ClickHouse (with PUT mapping)", func(t *testing.T) { a.testKibanaSampleEcommerceIngestWithMappingToClickHouse(ctx, t) }) @@ -86,6 +88,7 @@ var ( "timestamp": "DateTime64(3)", } sampleDocKibanaSampleFlights = []byte(`{"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main","AvgTicketPrice":841.2656419677076,"DistanceMiles":10247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"DE","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2024-11-11T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"Kibana Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":17.179506930998397,"FlightDelayMin":0}`) + sampleDoc2KibanaSampleFlights = []byte(`{"FlightNum":"1HY2AWR","DestCountry":"PL","OriginWeather":"Sunny","OriginCityName":"Warsaw","AvgTicketPrice":141.2656419677076,"DistanceMiles":30247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"PL","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2024-11-11T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"Kibana Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":11.179506930998397,"FlightDelayMin":0}`) putMappingKibanaSampleFlights = []byte(` { "mappings": { @@ -451,6 +454,32 @@ func (a *IngestTestcase) testKibanaSampleFlightsIngestToClickHouse(ctx context.C assert.Equal(t, expectedColsKibanaSampleFlights, cols) } +func (a *IngestTestcase) testKibanaSampleFlightsBulkIngestToClickHouse(ctx context.Context, t *testing.T) { + var bulkBody []byte + bulkBody = append(bulkBody, []byte(fmt.Sprintf(`{ "index": { "_index": "kibana_sample_data_flights_bulk" } }%s`, "\n"))...) + bulkBody = append(bulkBody, sampleDocKibanaSampleFlights...) + bulkBody = append(bulkBody, []byte("\n")...) + bulkBody = append(bulkBody, []byte(fmt.Sprintf(`{ "index": { "_index": "kibana_sample_data_flights_bulk" } }%s`, "\n"))...) + bulkBody = append(bulkBody, sampleDoc2KibanaSampleFlights...) + bulkBody = append(bulkBody, []byte("\n")...) + + resp, _ := a.RequestToQuesma(ctx, t, "POST", "/_bulk", bulkBody) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + cols, err := a.FetchClickHouseColumns(ctx, "kibana_sample_data_flights_bulk") + assert.NoError(t, err, "error fetching clickhouse columns") + assert.Equal(t, expectedColsKibanaSampleFlights, cols) + + // Check the ClickHouse table to ensure the data was ingested correctly + rows, err := a.ExecuteClickHouseQuery(ctx, "SELECT COUNT(*) FROM kibana_sample_data_flights_bulk") + assert.NoError(t, err, "error executing clickhouse query") + assert.True(t, rows.Next(), "expected count(*) row to be present") + var rowCount int + err = rows.Scan(&rowCount) + assert.NoError(t, err, "error scanning row count") + assert.Equal(t, 2, rowCount, "expected 2 rows in kibana_sample_data_flights_bulk table") +} + func (a *IngestTestcase) testKibanaSampleFlightsIngestWithMappingToClickHouse(ctx context.Context, t *testing.T) { resp, _ := a.RequestToQuesma(ctx, t, "PUT", "/kibana_sample_data_flights_with_mappings", putMappingKibanaSampleFlights) assert.Equal(t, http.StatusOK, resp.StatusCode)