Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit b725f88

Browse files
authored
Add integration test exercising _bulk endpoint (#1458)
Recently our tests didn't catch a panic (see #1457), because the `_bulk` endpoint wasn't exercised in the tests - only the `_doc` endpoint. Even though #1457 added a reproducer integration test, `_bulk` endpoint isn't exercised that much in integration tests - this PR adds an additional integration test that uses this endpoint.
1 parent 27bbce0 commit b725f88

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

ci/it/configs/quesma-ingest.yml.template

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ processors:
6767
type: geo_point
6868
"OriginLocation":
6969
type: geo_point
70+
kibana_sample_data_flights_bulk:
71+
target:
72+
- my-clickhouse-instance
73+
schemaOverrides:
74+
fields:
75+
"DestLocation":
76+
type: geo_point
77+
"OriginLocation":
78+
type: geo_point
7079
kibana_sample_data_flights_with_mappings:
7180
target:
7281
- my-clickhouse-instance
@@ -148,6 +157,15 @@ processors:
148157
type: geo_point
149158
"OriginLocation":
150159
type: geo_point
160+
kibana_sample_data_flights_bulk:
161+
target:
162+
- my-clickhouse-instance
163+
schemaOverrides:
164+
fields:
165+
"DestLocation":
166+
type: geo_point
167+
"OriginLocation":
168+
type: geo_point
151169
kibana_sample_data_flights_with_mappings:
152170
target:
153171
- my-clickhouse-instance

ci/it/testcases/test_ingest.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package testcases
88

99
import (
1010
"context"
11+
"fmt"
1112
"github.com/stretchr/testify/assert"
1213
"maps"
1314
"net/http"
@@ -35,6 +36,7 @@ func (a *IngestTestcase) SetupContainers(ctx context.Context) error {
3536
func (a *IngestTestcase) RunTests(ctx context.Context, t *testing.T) error {
3637
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
3738
t.Run("test kibana_sample_data_flights ingest to ClickHouse", func(t *testing.T) { a.testKibanaSampleFlightsIngestToClickHouse(ctx, t) })
39+
t.Run("test kibana_sample_data_flights ingest to ClickHouse (bulk)", func(t *testing.T) { a.testKibanaSampleFlightsBulkIngestToClickHouse(ctx, t) })
3840
t.Run("test kibana_sample_data_flights ingest to ClickHouse (with PUT mapping)", func(t *testing.T) { a.testKibanaSampleFlightsIngestWithMappingToClickHouse(ctx, t) })
3941
t.Run("test kibana_sample_data_ecommerce ingest to ClickHouse", func(t *testing.T) { a.testKibanaSampleEcommerceIngestToClickHouse(ctx, t) })
4042
t.Run("test kibana_sample_data_ecommerce ingest to ClickHouse (with PUT mapping)", func(t *testing.T) { a.testKibanaSampleEcommerceIngestWithMappingToClickHouse(ctx, t) })
@@ -86,6 +88,7 @@ var (
8688
"timestamp": "DateTime64(3)",
8789
}
8890
sampleDocKibanaSampleFlights = []byte(`{"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main","AvgTicketPrice":841.2656419677076,"DistanceMiles":10247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"DE","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2024-11-11T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"Kibana Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":17.179506930998397,"FlightDelayMin":0}`)
91+
sampleDoc2KibanaSampleFlights = []byte(`{"FlightNum":"1HY2AWR","DestCountry":"PL","OriginWeather":"Sunny","OriginCityName":"Warsaw","AvgTicketPrice":141.2656419677076,"DistanceMiles":30247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"PL","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2024-11-11T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"Kibana Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":11.179506930998397,"FlightDelayMin":0}`)
8992
putMappingKibanaSampleFlights = []byte(`
9093
{
9194
"mappings": {
@@ -451,6 +454,32 @@ func (a *IngestTestcase) testKibanaSampleFlightsIngestToClickHouse(ctx context.C
451454
assert.Equal(t, expectedColsKibanaSampleFlights, cols)
452455
}
453456

457+
func (a *IngestTestcase) testKibanaSampleFlightsBulkIngestToClickHouse(ctx context.Context, t *testing.T) {
458+
var bulkBody []byte
459+
bulkBody = append(bulkBody, []byte(fmt.Sprintf(`{ "index": { "_index": "kibana_sample_data_flights_bulk" } }%s`, "\n"))...)
460+
bulkBody = append(bulkBody, sampleDocKibanaSampleFlights...)
461+
bulkBody = append(bulkBody, []byte("\n")...)
462+
bulkBody = append(bulkBody, []byte(fmt.Sprintf(`{ "index": { "_index": "kibana_sample_data_flights_bulk" } }%s`, "\n"))...)
463+
bulkBody = append(bulkBody, sampleDoc2KibanaSampleFlights...)
464+
bulkBody = append(bulkBody, []byte("\n")...)
465+
466+
resp, _ := a.RequestToQuesma(ctx, t, "POST", "/_bulk", bulkBody)
467+
assert.Equal(t, http.StatusOK, resp.StatusCode)
468+
469+
cols, err := a.FetchClickHouseColumns(ctx, "kibana_sample_data_flights_bulk")
470+
assert.NoError(t, err, "error fetching clickhouse columns")
471+
assert.Equal(t, expectedColsKibanaSampleFlights, cols)
472+
473+
// Check the ClickHouse table to ensure the data was ingested correctly
474+
rows, err := a.ExecuteClickHouseQuery(ctx, "SELECT COUNT(*) FROM kibana_sample_data_flights_bulk")
475+
assert.NoError(t, err, "error executing clickhouse query")
476+
assert.True(t, rows.Next(), "expected count(*) row to be present")
477+
var rowCount int
478+
err = rows.Scan(&rowCount)
479+
assert.NoError(t, err, "error scanning row count")
480+
assert.Equal(t, 2, rowCount, "expected 2 rows in kibana_sample_data_flights_bulk table")
481+
}
482+
454483
func (a *IngestTestcase) testKibanaSampleFlightsIngestWithMappingToClickHouse(ctx context.Context, t *testing.T) {
455484
resp, _ := a.RequestToQuesma(ctx, t, "PUT", "/kibana_sample_data_flights_with_mappings", putMappingKibanaSampleFlights)
456485
assert.Equal(t, http.StatusOK, resp.StatusCode)

0 commit comments

Comments
 (0)