Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 0b40e2c

Browse files
authored
Router fixes (#1013)
PR #982 changed how the router works. It fixes some issues, but introduces a bug: "PUT _bulk" is treated as "PUT :index". In this PR : 1. Revert router changes (#982) 2. Fix async handling 3. Fix mapping handling 4. Fix "GET :index" handling (it was always redirected to the elastic) 5. Handle Kibana internal indexes in an explicit way
1 parent 864c8ba commit 0b40e2c

File tree

3 files changed

+105
-65
lines changed

3 files changed

+105
-65
lines changed

quesma/elasticsearch/index.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@ func IsIndexPattern(index string) bool {
1717
func IsInternalIndex(index string) bool {
1818
return strings.HasPrefix(index, internalIndexPrefix)
1919
}
20+
21+
// InternalPaths is a list of paths that are considered internal and should not handled by Quesma
22+
var InternalPaths = []string{"/_nodes", "/_xpack"}

quesma/quesma/mux/mux.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,21 @@ func (p *PathRouter) Matches(req *Request) (Handler, *table_resolver.Decision) {
9999
}
100100
}
101101

102-
func (p *PathRouter) findHandler(req *Request) (handler Handler, decision *table_resolver.Decision) {
102+
func (p *PathRouter) findHandler(req *Request) (Handler, *table_resolver.Decision) {
103103
path := strings.TrimSuffix(req.Path, "/")
104104
for _, m := range p.mappings {
105-
if pathData, pathMatches := m.compiledPath.Match(path); pathMatches {
106-
req.Params = pathData.Params
105+
meta, match := m.compiledPath.Match(path)
106+
if match {
107+
req.Params = meta.Params
107108
predicateResult := m.predicate.Matches(req)
108-
decision = predicateResult.Decision
109109
if predicateResult.Matched {
110-
handler = m.handler
110+
return m.handler, predicateResult.Decision
111+
} else {
112+
return nil, predicateResult.Decision
111113
}
112114
}
113115
}
114-
return handler, decision
116+
return nil, nil
115117
}
116118

117119
type httpMethodPredicate struct {

quesma/quesma/router.go

Lines changed: 94 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,23 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
4141
and := mux.And
4242

4343
router := mux.NewPathRouter()
44+
45+
// These are the endpoints that are not supported by Quesma
46+
// These will redirect to the elastic cluster.
47+
for _, path := range elasticsearch.InternalPaths {
48+
router.Register(path, mux.Never(), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { return nil, nil })
49+
}
50+
51+
// These are the endpoints that are supported by Quesma
52+
53+
// Warning:
54+
// The first handler that matches the path will be considered to use.
55+
// If the predicate returns false it will be redirected to the elastic cluster.
56+
// If the predicate returns true, the handler will be used.
57+
//
58+
// So, if you add multiple handlers with the same path, the first one will be used, the rest will be redirected to the elastic cluster.
59+
// This is current limitation of the router.
60+
4461
router.Register(routes.ClusterHealthPath, method("GET"), func(_ context.Context, req *mux.Request) (*mux.Result, error) {
4562
return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil
4663
})
@@ -200,33 +217,38 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
200217
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
201218
})
202219

203-
router.Register(routes.IndexMappingPath, and(method("PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
204-
index := req.Params["index"]
220+
router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
205221

206-
body, err := types.ExpectJSON(req.ParsedBody)
207-
if err != nil {
208-
return nil, err
209-
}
222+
switch req.Method {
210223

211-
columns := elasticsearch.ParseMappings("", body)
224+
case "GET":
225+
index := req.Params["index"]
212226

213-
sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})
227+
foundSchema, found := sr.FindSchema(schema.TableName(index))
228+
if !found {
229+
return &mux.Result{StatusCode: http.StatusNotFound}, nil
230+
}
214231

215-
return putIndexResult(index)
216-
})
232+
hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
233+
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)
217234

218-
router.Register(routes.IndexMappingPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
219-
index := req.Params["index"]
235+
return getIndexMappingResult(index, mappings)
220236

221-
foundSchema, found := sr.FindSchema(schema.TableName(index))
222-
if !found {
223-
return &mux.Result{StatusCode: http.StatusNotFound}, nil
237+
case "PUT":
238+
index := req.Params["index"]
239+
240+
body, err := types.ExpectJSON(req.ParsedBody)
241+
if err != nil {
242+
return nil, err
243+
}
244+
245+
columns := elasticsearch.ParseMappings("", body)
246+
sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})
247+
return putIndexResult(index)
224248
}
225249

226-
hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
227-
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)
250+
return nil, errors.New("unsupported method")
228251

229-
return getIndexMappingResult(index, mappings)
230252
})
231253

232254
router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
@@ -237,21 +259,27 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
237259
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
238260
})
239261

240-
router.Register(routes.AsyncSearchIdPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
241-
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"])
242-
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"])
243-
if err != nil {
244-
return nil, err
245-
}
246-
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
247-
})
262+
router.Register(routes.AsyncSearchIdPath, and(method("GET", "DELETE"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
248263

249-
router.Register(routes.AsyncSearchIdPath, and(method("DELETE"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
250-
responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"])
251-
if err != nil {
252-
return nil, err
264+
switch req.Method {
265+
266+
case "GET":
267+
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"])
268+
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"])
269+
if err != nil {
270+
return nil, err
271+
}
272+
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
273+
274+
case "DELETE":
275+
responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"])
276+
if err != nil {
277+
return nil, err
278+
}
279+
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
253280
}
254-
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
281+
282+
return nil, errors.New("unsupported method")
255283
})
256284

257285
router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
@@ -307,42 +335,49 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
307335
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
308336
})
309337

310-
router.Register(routes.IndexPath, and(method("PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
311-
index := req.Params["index"]
312-
if req.Body == "" {
313-
logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index)
314-
return putIndexResult(index)
315-
}
338+
router.Register(routes.IndexPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
316339

317-
body, err := types.ExpectJSON(req.ParsedBody)
318-
if err != nil {
319-
return nil, err
320-
}
340+
switch req.Method {
321341

322-
mappings, ok := body["mappings"]
323-
if !ok {
324-
logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, req.Body)
325-
return putIndexResult(index)
326-
}
327-
columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{}))
342+
case "GET":
343+
index := req.Params["index"]
344+
345+
foundSchema, found := sr.FindSchema(schema.TableName(index))
346+
if !found {
347+
return &mux.Result{StatusCode: http.StatusNotFound}, nil
348+
}
328349

329-
sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})
350+
hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
351+
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)
330352

331-
return putIndexResult(index)
332-
})
353+
return getIndexResult(index, mappings)
333354

334-
router.Register(routes.IndexPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
335-
index := req.Params["index"]
355+
case "PUT":
336356

337-
foundSchema, found := sr.FindSchema(schema.TableName(index))
338-
if !found {
339-
return &mux.Result{StatusCode: http.StatusNotFound}, nil
340-
}
357+
index := req.Params["index"]
358+
if req.Body == "" {
359+
logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index)
360+
return putIndexResult(index)
361+
}
341362

342-
hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
343-
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)
363+
body, err := types.ExpectJSON(req.ParsedBody)
364+
if err != nil {
365+
return nil, err
366+
}
367+
368+
mappings, ok := body["mappings"]
369+
if !ok {
370+
logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, req.Body)
371+
return putIndexResult(index)
372+
}
373+
columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{}))
374+
375+
sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})
376+
377+
return putIndexResult(index)
378+
}
344379

345-
return getIndexResult(index, mappings)
380+
return nil, errors.New("unsupported method")
346381
})
347382

348383
router.Register(routes.QuesmaTableResolverPath, method("GET"), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {

0 commit comments

Comments
 (0)