Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ci/it/configs/quesma-ingest.yml.template
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ processors:
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_bulk:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_with_mappings:
target:
- my-clickhouse-instance
Expand Down Expand Up @@ -148,6 +157,15 @@ processors:
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_bulk:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_with_mappings:
target:
- my-clickhouse-instance
Expand Down
29 changes: 29 additions & 0 deletions ci/it/testcases/test_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package testcases

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"maps"
"net/http"
Expand Down Expand Up @@ -35,6 +36,7 @@ func (a *IngestTestcase) SetupContainers(ctx context.Context) error {
func (a *IngestTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test kibana_sample_data_flights ingest to ClickHouse", func(t *testing.T) { a.testKibanaSampleFlightsIngestToClickHouse(ctx, t) })
t.Run("test kibana_sample_data_flights ingest to ClickHouse (bulk)", func(t *testing.T) { a.testKibanaSampleFlightsBulkIngestToClickHouse(ctx, t) })
t.Run("test kibana_sample_data_flights ingest to ClickHouse (with PUT mapping)", func(t *testing.T) { a.testKibanaSampleFlightsIngestWithMappingToClickHouse(ctx, t) })
t.Run("test kibana_sample_data_ecommerce ingest to ClickHouse", func(t *testing.T) { a.testKibanaSampleEcommerceIngestToClickHouse(ctx, t) })
t.Run("test kibana_sample_data_ecommerce ingest to ClickHouse (with PUT mapping)", func(t *testing.T) { a.testKibanaSampleEcommerceIngestWithMappingToClickHouse(ctx, t) })
Expand Down Expand Up @@ -86,6 +88,7 @@ var (
"timestamp": "DateTime64(3)",
}
sampleDocKibanaSampleFlights = []byte(`{"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main","AvgTicketPrice":841.2656419677076,"DistanceMiles":10247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"DE","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2024-11-11T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"Kibana Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":17.179506930998397,"FlightDelayMin":0}`)
sampleDoc2KibanaSampleFlights = []byte(`{"FlightNum":"1HY2AWR","DestCountry":"PL","OriginWeather":"Sunny","OriginCityName":"Warsaw","AvgTicketPrice":141.2656419677076,"DistanceMiles":30247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"PL","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2024-11-11T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"Kibana Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":11.179506930998397,"FlightDelayMin":0}`)
putMappingKibanaSampleFlights = []byte(`
{
"mappings": {
Expand Down Expand Up @@ -451,6 +454,32 @@ func (a *IngestTestcase) testKibanaSampleFlightsIngestToClickHouse(ctx context.C
assert.Equal(t, expectedColsKibanaSampleFlights, cols)
}

func (a *IngestTestcase) testKibanaSampleFlightsBulkIngestToClickHouse(ctx context.Context, t *testing.T) {
var bulkBody []byte
bulkBody = append(bulkBody, []byte(fmt.Sprintf(`{ "index": { "_index": "kibana_sample_data_flights_bulk" } }%s`, "\n"))...)
bulkBody = append(bulkBody, sampleDocKibanaSampleFlights...)
bulkBody = append(bulkBody, []byte("\n")...)
bulkBody = append(bulkBody, []byte(fmt.Sprintf(`{ "index": { "_index": "kibana_sample_data_flights_bulk" } }%s`, "\n"))...)
bulkBody = append(bulkBody, sampleDoc2KibanaSampleFlights...)
bulkBody = append(bulkBody, []byte("\n")...)

resp, _ := a.RequestToQuesma(ctx, t, "POST", "/_bulk", bulkBody)
assert.Equal(t, http.StatusOK, resp.StatusCode)

cols, err := a.FetchClickHouseColumns(ctx, "kibana_sample_data_flights_bulk")
assert.NoError(t, err, "error fetching clickhouse columns")
assert.Equal(t, expectedColsKibanaSampleFlights, cols)

// Check the ClickHouse table to ensure the data was ingested correctly
rows, err := a.ExecuteClickHouseQuery(ctx, "SELECT COUNT(*) FROM kibana_sample_data_flights_bulk")
assert.NoError(t, err, "error executing clickhouse query")
assert.True(t, rows.Next(), "expected count(*) row to be present")
var rowCount int
err = rows.Scan(&rowCount)
assert.NoError(t, err, "error scanning row count")
assert.Equal(t, 2, rowCount, "expected 2 rows in kibana_sample_data_flights_bulk table")
}

func (a *IngestTestcase) testKibanaSampleFlightsIngestWithMappingToClickHouse(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "PUT", "/kibana_sample_data_flights_with_mappings", putMappingKibanaSampleFlights)
assert.Equal(t, http.StatusOK, resp.StatusCode)
Expand Down
Loading