Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit fa8ed83

Browse files
authored
Generalizing quesma build implementation (#1155)
This PR generalizes some aspects of `quesma build()` implementation and adds few unit tests.
1 parent 93f6b7f commit fa8ed83

File tree

2 files changed

+176
-25
lines changed

2 files changed

+176
-25
lines changed

quesma/main_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,3 +281,101 @@ func Test_middleware(t *testing.T) {
281281
assert.Equal(t, int32(8), middlewareCallCount)
282282
}
283283
}
284+
285+
func Test_QuesmaBuild(t *testing.T) {
286+
cfg := &config.QuesmaConfiguration{
287+
DisableAuth: true,
288+
Elasticsearch: config.ElasticsearchConfiguration{
289+
Url: &config.Url{Host: "localhost:9200", Scheme: "http"},
290+
User: "",
291+
Password: "",
292+
},
293+
}
294+
{
295+
// Two pipelines with different endpoints
296+
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())
297+
firstFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
298+
firstHTTPRouter := quesma_api.NewPathRouter()
299+
firstHTTPRouter.AddRoute("/_bulk", bulk)
300+
firstFrontendConnector.AddRouter(firstHTTPRouter)
301+
var firstPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
302+
firstPipeline.AddFrontendConnector(firstFrontendConnector)
303+
304+
secondFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8889", cfg)
305+
secondHTTPRouter := quesma_api.NewPathRouter()
306+
secondHTTPRouter.AddRoute("/_search", search)
307+
secondFrontendConnector.AddRouter(secondHTTPRouter)
308+
var secondPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
309+
secondPipeline.AddFrontendConnector(secondFrontendConnector)
310+
311+
quesmaBuilder.AddPipeline(firstPipeline)
312+
quesmaBuilder.AddPipeline(secondPipeline)
313+
quesma, err := quesmaBuilder.Build()
314+
assert.NotNil(t, quesma)
315+
assert.Equal(t, 2, len(quesma.GetPipelines()))
316+
assert.Equal(t, 1, len(quesma.GetPipelines()[0].GetFrontendConnectors()))
317+
assert.Equal(t, 1, len(quesma.GetPipelines()[1].GetFrontendConnectors()))
318+
assert.Equal(t, 1, len(quesma.GetPipelines()[0].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
319+
assert.Equal(t, 1, len(quesma.GetPipelines()[1].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
320+
assert.NotEqual(t, quesma.GetPipelines()[1].GetFrontendConnectors()[0], quesma.GetPipelines()[0].GetFrontendConnectors()[0])
321+
322+
assert.NoError(t, err)
323+
324+
}
325+
{
326+
// Two pipelines with the same endpoint
327+
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())
328+
firstFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
329+
firstHTTPRouter := quesma_api.NewPathRouter()
330+
firstHTTPRouter.AddRoute("/_bulk", bulk)
331+
firstFrontendConnector.AddRouter(firstHTTPRouter)
332+
var firstPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
333+
firstPipeline.AddFrontendConnector(firstFrontendConnector)
334+
335+
secondFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
336+
secondHTTPRouter := quesma_api.NewPathRouter()
337+
secondHTTPRouter.AddRoute("/_search", search)
338+
secondFrontendConnector.AddRouter(secondHTTPRouter)
339+
var secondPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
340+
secondPipeline.AddFrontendConnector(secondFrontendConnector)
341+
342+
quesmaBuilder.AddPipeline(firstPipeline)
343+
quesmaBuilder.AddPipeline(secondPipeline)
344+
quesma, err := quesmaBuilder.Build()
345+
assert.NotNil(t, quesma)
346+
assert.Equal(t, 2, len(quesma.GetPipelines()))
347+
assert.Equal(t, 1, len(quesma.GetPipelines()[0].GetFrontendConnectors()))
348+
assert.Equal(t, 1, len(quesma.GetPipelines()[1].GetFrontendConnectors()))
349+
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
350+
assert.Equal(t, 2, len(quesma.GetPipelines()[1].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
351+
assert.Equal(t, quesma.GetPipelines()[1].GetFrontendConnectors()[0], quesma.GetPipelines()[0].GetFrontendConnectors()[0])
352+
assert.NoError(t, err)
353+
}
354+
{
355+
// One pipeline with the same endpoint
356+
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())
357+
firstFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
358+
firstHTTPRouter := quesma_api.NewPathRouter()
359+
firstHTTPRouter.AddRoute("/_bulk", bulk)
360+
firstFrontendConnector.AddRouter(firstHTTPRouter)
361+
var firstPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
362+
firstPipeline.AddFrontendConnector(firstFrontendConnector)
363+
364+
secondFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
365+
secondHTTPRouter := quesma_api.NewPathRouter()
366+
secondHTTPRouter.AddRoute("/_search", search)
367+
secondFrontendConnector.AddRouter(secondHTTPRouter)
368+
firstPipeline.AddFrontendConnector(secondFrontendConnector)
369+
370+
quesmaBuilder.AddPipeline(firstPipeline)
371+
quesma, err := quesmaBuilder.Build()
372+
assert.NotNil(t, quesma)
373+
assert.Equal(t, 1, len(quesma.GetPipelines()))
374+
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()))
375+
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
376+
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()[1].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
377+
assert.Equal(t, quesma.GetPipelines()[0].GetFrontendConnectors()[0], quesma.GetPipelines()[0].GetFrontendConnectors()[1])
378+
379+
assert.NoError(t, err)
380+
}
381+
}

quesma/v2/core/quesma_builder.go

Lines changed: 78 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -59,52 +59,107 @@ func (quesma *Quesma) Stop(ctx context.Context) {
5959
}
6060
}
6161

62+
func dumpEndpoints(endpoints map[string][]struct {
63+
pipelineIndex int
64+
connIndex int
65+
}) string {
66+
var buff bytes.Buffer
67+
_, _ = fmt.Fprintln(&buff, "Endpoints:")
68+
for endpoint, pipelines := range endpoints {
69+
_, _ = fmt.Fprintf(&buff, " %v:\n", endpoint)
70+
for _, pipeline := range pipelines {
71+
_, _ = fmt.Fprintf(&buff, " pipeline %v, connector %v\n", pipeline.pipelineIndex, pipeline.connIndex)
72+
}
73+
}
74+
return buff.String()
75+
}
76+
6277
func (quesma *Quesma) buildInternal() (QuesmaBuilder, error) {
78+
endpoints := make(map[string][]struct {
79+
pipelineIndex int
80+
connIndex int
81+
})
82+
// This pass collects information about endpoints
83+
// e.g. multiple frontend connectors can use the same endpoint
84+
for pipelineIndex, pipeline := range quesma.pipelines {
85+
for connIndex, conn := range pipeline.GetFrontendConnectors() {
86+
endpoints[conn.GetEndpoint()] = append(endpoints[conn.GetEndpoint()], struct {
87+
pipelineIndex int
88+
connIndex int
89+
}{
90+
pipelineIndex: pipelineIndex,
91+
connIndex: connIndex,
92+
})
93+
quesma.dependencies.Logger().Info().Msgf("%s:%s, index: %d, pipeline:%d", conn.InstanceName(),
94+
conn.GetEndpoint(),
95+
connIndex,
96+
pipelineIndex)
97+
}
98+
}
6399

64-
endpoints := make(map[string]struct{})
65-
handlers := make(map[string]HandlersPipe)
100+
// Second pass is about connecting routers with processors
101+
// and merge them if they are the same properties
102+
handlersPerEndpoint := make(map[string]map[string]HandlersPipe)
66103
for _, pipeline := range quesma.pipelines {
67104
for _, conn := range pipeline.GetFrontendConnectors() {
68105
if httpConn, ok := conn.(HTTPFrontendConnector); ok {
69-
endpoints[conn.GetEndpoint()] = struct{}{}
70106
router := httpConn.GetRouter()
71107
for path, handlerWrapper := range router.GetHandlers() {
72108
handlerWrapper.Processors = append(handlerWrapper.Processors, pipeline.GetProcessors()...)
73-
handlers[path] = handlerWrapper
109+
sharedWrappers := make(map[string]HandlersPipe)
110+
sharedWrappers[path] = handlerWrapper
111+
if _, ok := handlersPerEndpoint[conn.GetEndpoint()]; ok {
112+
for path, handlerWrapper := range handlersPerEndpoint[conn.GetEndpoint()] {
113+
sharedWrappers[path] = handlerWrapper
114+
}
115+
}
116+
handlersPerEndpoint[conn.GetEndpoint()] = sharedWrappers
74117
}
75118
}
76119
}
77120
}
78-
if len(endpoints) == 1 {
79-
for _, pipeline := range quesma.pipelines {
80-
for _, conn := range pipeline.GetFrontendConnectors() {
81-
if httpConn, ok := conn.(HTTPFrontendConnector); ok {
82-
router := httpConn.GetRouter().Clone().(Router)
83-
router.SetHandlers(handlers)
84-
httpConn.AddRouter(router)
85-
}
121+
122+
quesma.dependencies.Logger().Debug().Msg(dumpEndpoints(endpoints))
123+
124+
// This pass sets the routers with the handlers
125+
for _, pipeline := range quesma.pipelines {
126+
for _, conn := range pipeline.GetFrontendConnectors() {
127+
if httpConn, ok := conn.(HTTPFrontendConnector); ok {
128+
router := httpConn.GetRouter().Clone().(Router)
129+
router.SetHandlers(handlersPerEndpoint[conn.GetEndpoint()])
130+
httpConn.AddRouter(router)
86131
}
87132
}
88-
// TODO this fixes the problem of sharing the same frontend connector
89-
// in the case of having only one endpoint
90-
// however it's not fully generic yet as only subset of connectors might be shared
91-
if len(quesma.pipelines) > 1 {
92-
if len(quesma.pipelines[0].GetFrontendConnectors()) == 0 {
93-
return nil, fmt.Errorf("no frontend connectors provided")
94-
}
95-
sharedFc := quesma.pipelines[0].GetFrontendConnectors()[0]
96-
for index := 1; index < len(quesma.pipelines); index++ {
97-
for indexFc := range quesma.pipelines[index].GetFrontendConnectors() {
98-
quesma.pipelines[index].GetFrontendConnectors()[indexFc] = sharedFc
133+
}
134+
135+
// This pass is about sharing frontend connectors
136+
for endpoint, endpointInfo := range endpoints {
137+
if len(endpointInfo) < 1 {
138+
continue
139+
}
140+
sharedFc := quesma.pipelines[endpointInfo[0].pipelineIndex].GetFrontendConnectors()[endpointInfo[0].connIndex]
141+
for _, info := range endpointInfo {
142+
for pipelineIndex, pipeline := range quesma.pipelines {
143+
for connIndex, conn := range pipeline.GetFrontendConnectors() {
144+
if conn.GetEndpoint() == endpoint {
145+
if info.pipelineIndex == pipelineIndex && info.connIndex == connIndex {
146+
continue
147+
}
148+
quesma.dependencies.Logger().Info().Msgf("Sharing frontend connector %v with %v", sharedFc.InstanceName(), conn.InstanceName())
149+
pipeline.GetFrontendConnectors()[connIndex] = sharedFc
150+
}
99151
}
100152
}
101153
}
102154
}
103155

156+
// This pass is about connecting processors with tcp connectors
157+
// and doing some validation
104158
for _, pipeline := range quesma.pipelines {
105159
backendConnectorTypesPerPipeline := make(map[BackendConnectorType]struct{})
106160
for _, conn := range pipeline.GetFrontendConnectors() {
107161
if tcpConn, ok := conn.(TCPFrontendConnector); ok {
162+
// Inject processors set on pipeline level into connection handler
108163
if len(pipeline.GetProcessors()) > 0 {
109164
tcpConn.GetConnectionHandler().SetHandlers(pipeline.GetProcessors())
110165
}
@@ -126,9 +181,7 @@ func (quesma *Quesma) buildInternal() (QuesmaBuilder, error) {
126181
return nil, fmt.Errorf("processor %v failed to initialize: %v", proc.GetId(), err)
127182
}
128183
}
129-
130184
}
131-
132185
return quesma, nil
133186
}
134187

0 commit comments

Comments
 (0)