Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 7db7f5b

Browse files
authored
Support Kibana CSV export (#1363)
Adding a support for CSV export in Kibana. <img width="1561" alt="image" src="https://github.com/user-attachments/assets/5e36becc-0161-4535-9c52-8f0536d32226" /> ### Some technical details Kibana CSV report generation uses two steps: 1. Create Point-In-Time (`:indexPattern/_pit` API) for an index pattern 2. Fetch data via `/_search` API, referencing PIT ID created before In our case, we're **not** creating any persistent PIT for ClickHouse table - we're simply using that PIT ID as a reference to specific index pattern. So the logic looks like the following: 1. Quesma controls `:indexPattern/_pit` endpoint. Whenever pit creation is issued and `:indexPattern` is under our control, instead of relaying request to Elasticsearch we respond that PIT has been created and it's ID is `quesma_:indexPattern`. 2. When a request to `/_search` comes to Quesma and it has PIT == `quesma_:indexPattern`, we simply execute query to a table which is matching `:indexPattern` Having that, we have this feature implemented without the need of any persistent storage 🎉 <!-- A note on testing your PR --> <!-- Basic unit test run is executed against each commit in the PR. If you want to run a full integration test suite, you can trigger it by commenting with '/run-integration-tests' -->
1 parent 37b6fd8 commit 7db7f5b

File tree

4 files changed

+89
-9
lines changed

4 files changed

+89
-9
lines changed

platform/frontend_connectors/matchers.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,13 @@ func matchedAgainstAsyncId() quesma_api.RequestMatcher {
2121
})
2222
}
2323

24-
// Query path only (looks at QueryTarget)
2524
func matchedAgainstPattern(indexRegistry table_resolver.TableResolver) quesma_api.RequestMatcher {
2625
return matchAgainstTableResolver(indexRegistry, quesma_api.QueryPipeline)
2726
}
2827

29-
// check whether exact index name is enabled
3028
func matchAgainstTableResolver(indexRegistry table_resolver.TableResolver, pipelineName string) quesma_api.RequestMatcher {
3129
return quesma_api.RequestMatcherFunc(func(req *quesma_api.Request) quesma_api.MatchResult {
32-
3330
indexName := req.Params["index"]
34-
3531
decision := indexRegistry.Resolve(pipelineName, indexName)
3632
if decision.Err != nil {
3733
return quesma_api.MatchResult{Matched: false, Decision: decision}
@@ -45,6 +41,46 @@ func matchAgainstTableResolver(indexRegistry table_resolver.TableResolver, pipel
4541
})
4642
}
4743

44+
// getPitIdFromRequest gets the PIT ID from the request body,
45+
// depending on request kind it can be either at root or under `pit` key, e.g.:
46+
// {"id": "pit_id"} or {"pit": {"id": "pit_id"}}
47+
func getPitIdFromRequest(req *quesma_api.Request, pitAtRoot bool) string {
48+
var payload struct {
49+
ID string `json:"id,omitempty"`
50+
Pit struct {
51+
ID string `json:"id"`
52+
} `json:"pit,omitempty"`
53+
}
54+
if err := json.Unmarshal([]byte(req.Body), &payload); err != nil {
55+
return ""
56+
}
57+
if pitAtRoot {
58+
return payload.ID
59+
}
60+
return payload.Pit.ID
61+
}
62+
63+
func matchPitId(getPitFn func(*quesma_api.Request) string) quesma_api.RequestMatcher {
64+
return quesma_api.RequestMatcherFunc(func(req *quesma_api.Request) quesma_api.MatchResult {
65+
if strings.HasPrefix(getPitFn(req), quesmaPitPrefix) {
66+
return quesma_api.MatchResult{Matched: true}
67+
}
68+
return quesma_api.MatchResult{Matched: false}
69+
})
70+
}
71+
72+
func isSearchRequestWithQuesmaPit() quesma_api.RequestMatcher {
73+
return matchPitId(func(req *quesma_api.Request) string {
74+
return getPitIdFromRequest(req, false)
75+
})
76+
}
77+
78+
func hasQuesmaPitId() quesma_api.RequestMatcher {
79+
return matchPitId(func(req *quesma_api.Request) string {
80+
return getPitIdFromRequest(req, true)
81+
})
82+
}
83+
4884
func matchedExactQueryPath(indexRegistry table_resolver.TableResolver) quesma_api.RequestMatcher {
4985
return matchAgainstTableResolver(indexRegistry, quesma_api.QueryPipeline)
5086
}

platform/frontend_connectors/route_handlers.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package frontend_connectors
66
import (
77
"context"
88
"errors"
9+
"fmt"
910
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
1011
"github.com/QuesmaOrg/quesma/platform/clickhouse"
1112
"github.com/QuesmaOrg/quesma/platform/config"
@@ -27,6 +28,8 @@ import (
2728
"net/http"
2829
)
2930

31+
const quesmaPitPrefix = "quesma_"
32+
3033
func HandleDeletingAsyncSearchById(queryRunner QueryRunnerIFace, asyncSearchId string) (*quesma_api.Result, error) {
3134
responseBody, err := queryRunner.DeleteAsyncSearch(asyncSearchId)
3235
if err != nil {
@@ -187,6 +190,36 @@ func HandleGetIndexMapping(sr schema.Registry, index string) (*quesma_api.Result
187190
return getIndexMappingResult(index, mappings)
188191
}
189192

193+
func HandlePitStore(indexPattern string) (*quesma_api.Result, error) {
194+
pitId := fmt.Sprintf("%s%s", quesmaPitPrefix, indexPattern)
195+
pitCreatedResponse := fmt.Sprintf(`{
196+
"_shards": {
197+
"failed": 0,
198+
"skipped": 0,
199+
"successful": 0,
200+
"total": 0
201+
},
202+
"id": "%s"
203+
}`, pitId)
204+
return &quesma_api.Result{
205+
Body: pitCreatedResponse,
206+
StatusCode: http.StatusOK,
207+
GenericResult: []byte(pitCreatedResponse),
208+
}, nil
209+
}
210+
211+
func PitDeletedResponse() (*quesma_api.Result, error) {
212+
pitDeletedResponse := `{
213+
"num_freed": 1,
214+
"succeeded": true
215+
}`
216+
return &quesma_api.Result{
217+
Body: pitDeletedResponse,
218+
StatusCode: http.StatusOK,
219+
GenericResult: []byte(pitDeletedResponse),
220+
}, nil
221+
}
222+
190223
func HandleBulkIndex(ctx context.Context, index string, body types.NDJSON, ip *ingest.IngestProcessor, ingestStatsEnabled bool, esConn *backend_connectors.ElasticsearchBackendConnector, dependencies quesma_api.Dependencies, tableResolver table_resolver.TableResolver) (*quesma_api.Result, error) {
191224
results, err := bulk.Write(ctx, &index, body, ip, ingestStatsEnabled, esConn, dependencies.PhoneHomeAgent(), tableResolver)
192225
return bulkInsertResult(ctx, results, err)

platform/frontend_connectors/router_v2.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,22 +133,31 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
133133
return HandleResolveIndex(ctx, req.Params["index"], sr, cfg.Elasticsearch)
134134
})
135135

136+
router.Register(routes.IndexPatternPitPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
137+
indexPattern := req.Params["index"]
138+
logger.Debug().Msgf("Quesma-managed PIT request, targeting indexPattern=%s", indexPattern)
139+
return HandlePitStore(indexPattern)
140+
})
141+
142+
router.Register(routes.PitPath, and(method("DELETE"), hasQuesmaPitId()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
143+
return PitDeletedResponse()
144+
})
145+
136146
router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
137147
return HandleIndexCount(ctx, req.Params["index"], queryRunner)
138148
})
139149

140-
// TODO: This endpoint is currently disabled (mux.Never()) as it's pretty much used only by internal Kibana requests,
141-
// it's error-prone to detect them in matchAgainstKibanaInternal() and Quesma can't handle well the cases of wildcard
142-
// matching many indices either way.
143-
router.Register(routes.GlobalSearchPath, and(quesma_api.Never(), method("GET", "POST"), matchAgainstKibanaInternal()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
150+
router.Register(routes.GlobalSearchPath, and(method("GET", "POST"), isSearchRequestWithQuesmaPit()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
151+
pitId := getPitIdFromRequest(req, false)
152+
indexPattern := strings.TrimPrefix(pitId, quesmaPitPrefix)
144153

145154
body, err := types.ExpectJSON(req.ParsedBody)
146155
if err != nil {
147156
return nil, err
148157
}
149158

150159
// TODO we should pass JSON here instead of []byte
151-
responseBody, err := queryRunner.HandleSearch(ctx, "*", body)
160+
responseBody, err := queryRunner.HandleSearch(ctx, indexPattern, body)
152161
if err != nil {
153162
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
154163
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil

platform/v2/core/routes/paths.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
IndexMappingPath = "/:index/_mapping"
1818
FieldCapsPath = "/:index/_field_caps"
1919
TermsEnumPath = "/:index/_terms_enum"
20+
IndexPatternPitPath = "/:index/_pit"
21+
PitPath = "/_pit"
2022
EQLSearch = "/:index/_eql/search"
2123
ResolveIndexPath = "/_resolve/index/:index"
2224
ClusterHealthPath = "/_cluster/health"

0 commit comments

Comments
 (0)