Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 6340d5a

Browse files
committed
WIP
1 parent eab9572 commit 6340d5a

File tree

2 files changed

+28
-3
lines changed

2 files changed

+28
-3
lines changed

ci/it/testcases/test_reading_clickhouse_tables.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,29 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(c
9393
func (a *ReadingClickHouseTablesIntegrationTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) {
9494
// There is no ingest pipeline, so Quesma should reject all ingest requests
9595
for _, tt := range []string{"test_table", "extra_index"} {
96-
t.Run(tt, func(t *testing.T) {
96+
t.Run(tt+"_doc", func(t *testing.T) {
9797
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 11111}`))
9898
assert.Contains(t, string(bodyBytes), "index_closed_exception")
9999
assert.Equal(t, http.StatusOK, resp.StatusCode)
100100
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
101101
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
102102
})
103103
}
104+
105+
for _, tt := range []string{"test_table", "extra_index"} {
106+
t.Run(tt+"_bulk", func(t *testing.T) {
107+
108+
bulkPayload := []byte(fmt.Sprintf(`
109+
{ "index": { "_index": "%s", "_id": "1" } }
110+
{ "name": "Alice", "age": 30 }
111+
`, tt))
112+
113+
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/_bulk", bulkPayload)
114+
assert.Contains(t, string(bodyBytes), "index_closed_exception")
115+
assert.Equal(t, http.StatusOK, resp.StatusCode)
116+
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
117+
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
118+
})
119+
}
120+
104121
}

platform/functionality/bulk/bulk.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,13 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
7575

7676
// The returned results should be in the same order as the input request, however splitting the bulk might change the order.
7777
// Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk.
78-
results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver, ip.GetIndexNameRewriter())
78+
79+
var indexNameRewriter ingest.IndexNameRewriter
80+
if ip != nil {
81+
indexNameRewriter = ip.GetIndexNameRewriter()
82+
}
83+
84+
results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver, indexNameRewriter)
7985
if err != nil {
8086
return []BulkItem{}, err
8187
}
@@ -129,7 +135,9 @@ func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, max
129135
index := op.GetIndex()
130136
operation := op.GetOperation()
131137

132-
index = rewriter.RewriteIndex(index)
138+
if rewriter != nil {
139+
index = rewriter.RewriteIndex(index)
140+
}
133141

134142
entryWithResponse := BulkRequestEntry{
135143
operation: operation,

0 commit comments

Comments
 (0)