Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 74c04f2

Browse files
authored
Fix _bulk endpoint (#1243)
Given that we have the sophisticated bulk body introduced in #1202, there is no longer need to examine the bulk payload in order to see where it should be routed. Also, that `matchedAgainstBulkBody` implementation wasn't entirely correct anyways (relying on optional data line being always %2).
1 parent 288da8e commit 74c04f2

File tree

4 files changed

+2
-102
lines changed

4 files changed

+2
-102
lines changed

quesma/quesma/es_responses.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
1515
"github.com/goccy/go-json"
1616
"net/http"
17-
"regexp"
1817
"sync"
1918
)
2019

@@ -212,15 +211,3 @@ type (
212211
Index string `json:"index"`
213212
}
214213
)
215-
216-
var indexNamePattern = regexp.MustCompile(`"_index"\s*:\s*"([^"]+)"`)
217-
218-
func extractIndexName(input string) string {
219-
results := indexNamePattern.FindStringSubmatch(input)
220-
221-
if len(results) < 2 {
222-
return ""
223-
}
224-
225-
return results[1]
226-
}

quesma/quesma/matchers.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package quesma
55
import (
66
"github.com/QuesmaOrg/quesma/quesma/logger"
77
"github.com/QuesmaOrg/quesma/quesma/painful"
8-
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
98
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
109
"github.com/QuesmaOrg/quesma/quesma/table_resolver"
1110
"github.com/QuesmaOrg/quesma/quesma/v2/core"
@@ -24,38 +23,6 @@ func matchedAgainstAsyncId() quesma_api.RequestMatcher {
2423
})
2524
}
2625

27-
func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration, tableResolver table_resolver.TableResolver) quesma_api.RequestMatcher {
28-
return quesma_api.RequestMatcherFunc(func(req *quesma_api.Request) quesma_api.MatchResult {
29-
idx := 0
30-
for _, s := range strings.Split(req.Body, "\n") {
31-
if len(s) == 0 {
32-
// ElasticSearch Agent sends empty lines between some JSONs, ignore them.
33-
continue
34-
}
35-
if idx%2 == 0 {
36-
name := extractIndexName(s)
37-
38-
decision := tableResolver.Resolve(quesma_api.IngestPipeline, name)
39-
40-
if decision.IsClosed {
41-
return quesma_api.MatchResult{Matched: true, Decision: decision}
42-
}
43-
44-
// if have any enabled Clickhouse connector, then return true
45-
for _, connector := range decision.UseConnectors {
46-
if _, ok := connector.(*quesma_api.ConnectorDecisionClickhouse); ok {
47-
return quesma_api.MatchResult{Matched: true, Decision: decision}
48-
}
49-
}
50-
}
51-
idx += 1
52-
}
53-
54-
// All indexes are disabled, the whole bulk can go to Elastic
55-
return quesma_api.MatchResult{Matched: false}
56-
})
57-
}
58-
5926
// Query path only (looks at QueryTarget)
6027
func matchedAgainstPattern(indexRegistry table_resolver.TableResolver) quesma_api.RequestMatcher {
6128
return matchAgainstTableResolver(indexRegistry, quesma_api.QueryPipeline)

quesma/quesma/router_test.go

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
9898
return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil
9999
})
100100

101-
router.Register(routes.BulkPath, and(method("POST", "PUT"), matchedAgainstBulkBody(cfg, tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
101+
router.Register(routes.BulkPath, method("POST", "PUT"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
102102

103103
body, err := types.ExpectNDJSON(req.ParsedBody)
104104
if err != nil {
@@ -682,60 +682,6 @@ func withAutodiscovery(cfg config.QuesmaConfiguration) config.QuesmaConfiguratio
682682
return cfg
683683
}
684684

685-
func Test_matchedAgainstBulkBody(t *testing.T) {
686-
687-
t.Skip(skipMessage)
688-
689-
tests := []struct {
690-
name string
691-
body string
692-
config config.QuesmaConfiguration
693-
want bool
694-
}{
695-
{
696-
name: "single index, config present",
697-
body: `{"create":{"_index":"logs-generic-default"}}`,
698-
config: indexConfig("logs-generic-default", false),
699-
want: true,
700-
},
701-
{
702-
name: "single index, table not present",
703-
body: `{"create":{"_index":"logs-generic-default"}}`,
704-
config: indexConfig("foo", false),
705-
want: false,
706-
},
707-
{
708-
name: "multiple indexes, table present",
709-
body: `{"create":{"_index":"logs-generic-default"}}` + "\n{}\n" + `{"create":{"_index":"logs-generic-default"}}`,
710-
config: indexConfig("logs-generic-default", false),
711-
want: true,
712-
},
713-
{
714-
name: "multiple indexes, some tables not present",
715-
body: `{"create":{"_index":"logs-generic-default"}}` + "\n{}\n" + `{"create":{"_index":"non-existent"}}`,
716-
config: indexConfig("logs-generic-default", false),
717-
want: true,
718-
},
719-
{
720-
name: "multiple indexes, all tables not present",
721-
body: `{"create":{"_index":"not-there"}}` + "\n{}\n" + `{"create":{"_index":"non-existent"}}`,
722-
config: indexConfig("logs-generic-default", false),
723-
want: false,
724-
},
725-
}
726-
727-
resolver := table_resolver.NewEmptyTableResolver()
728-
729-
for _, tt := range tests {
730-
t.Run(tt.name, func(t *testing.T) {
731-
732-
req := &quesma_api.Request{Body: tt.body}
733-
734-
assert.Equalf(t, tt.want, matchedAgainstBulkBody(&tt.config, resolver).Matches(req), "matchedAgainstBulkBody(%+v)", tt.config)
735-
})
736-
}
737-
}
738-
739685
const testIndexName = "indexName"
740686

741687
func TestConfigureRouter(t *testing.T) {

quesma/quesma/router_v2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
6868
}, nil
6969
})
7070

71-
router.Register(routes.BulkPath, and(method("POST", "PUT"), matchedAgainstBulkBody(cfg, tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
71+
router.Register(routes.BulkPath, method("POST", "PUT"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
7272
body, err := types.ExpectNDJSON(req.ParsedBody)
7373
if err != nil {
7474
return nil, err

0 commit comments

Comments
 (0)