Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions ci/it/configs/quesma-ingest.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-instance
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
kibana_sample_data_ecommerce:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_ecommerce_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_flights:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
"*":
target:
- my-minimal-elasticsearch
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
kibana_sample_data_ecommerce:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_ecommerce_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"geoip.location":
type: geo_point
"products.manufacturer":
type: text
"products.product_name":
type: text
category:
type: text
manufacturer:
type: text
kibana_sample_data_flights:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
kibana_sample_data_flights_with_mappings:
target:
- my-clickhouse-instance
schemaOverrides:
fields:
"DestLocation":
type: geo_point
"OriginLocation":
type: geo_point
"*":
target:
- my-minimal-elasticsearch
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]

5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ func TestWildcardClickhouseTestcase(t *testing.T) {
testCase := testcases.NewWildcardClickhouseTestcase()
runIntegrationTest(t, testCase)
}

func TestIngestTestcase(t *testing.T) {
testCase := testcases.NewIngestTestcase()
runIntegrationTest(t, testCase)
}
21 changes: 21 additions & 0 deletions ci/it/testcases/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ func (tc *IntegrationTestcaseBase) ExecuteClickHouseStatement(ctx context.Contex
return res, nil
}

func (tc *IntegrationTestcaseBase) FetchClickHouseColumns(ctx context.Context, tableName string) (map[string]string, error) {
rows, err := tc.ExecuteClickHouseQuery(ctx, fmt.Sprintf("SELECT name, type FROM system.columns WHERE table = '%s'", tableName))
if err != nil {
return nil, err
}
defer rows.Close()

result := make(map[string]string)
for rows.Next() {
var name, colType string
if err := rows.Scan(&name, &colType); err != nil {
return nil, err
}
result[name] = colType
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}

func (tc *IntegrationTestcaseBase) RequestToQuesma(ctx context.Context, t *testing.T, method, uri string, requestBody []byte) (*http.Response, []byte) {
endpoint := tc.getQuesmaEndpoint()
resp, err := tc.doRequest(ctx, method, endpoint+uri, requestBody, nil)
Expand Down
Loading
Loading