Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit c554318

Browse files
pdelewskimieciu
andauthored
Introducing 2 separate frontend connectors for ingest and query (#1092)
This PR for the first time introduces two separate ingest and query frontend connectors with that use separated routers ![image](https://github.com/user-attachments/assets/581d21a5-a501-47e7-96ae-a89f00e6d67b) --------- Signed-off-by: Przemyslaw Delewski <[email protected]> Co-authored-by: Przemysław Hejman <[email protected]>
1 parent 4da1078 commit c554318

File tree

5 files changed

+95
-50
lines changed

5 files changed

+95
-50
lines changed

quesma/frontend_connectors/router_v2.go

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (r *RouterV2) elasticFallback(decision *quesma_api.Decision,
203203
}
204204
}
205205

206-
func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, searchRouter *quesma_api.PathRouter, ingestRouter *quesma_api.PathRouter, logManager *clickhouse.LogManager) {
206+
func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router quesma_api.Router, logManager *clickhouse.LogManager) {
207207
defer recovery.LogAndHandlePanic(ctx, func(err error) {
208208
w.WriteHeader(500)
209209
w.Write(queryparser.InternalQuesmaError("Unknown Quesma error"))
@@ -223,31 +223,18 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
223223
}
224224

225225
quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)
226-
var handler quesma_api.HTTPFrontendHandler
227-
var decision *quesma_api.Decision
228-
searchHandlerPipe, searchDecision := searchRouter.Matches(quesmaRequest)
229-
if searchDecision != nil {
230-
decision = searchDecision
231-
}
232-
if searchHandlerPipe != nil {
233-
handler = searchHandlerPipe.Handler
234-
}
235-
ingestHandlerPipe, ingestDecision := ingestRouter.Matches(quesmaRequest)
236-
if searchDecision == nil {
237-
decision = ingestDecision
238-
}
239-
if searchHandlerPipe == nil && ingestHandlerPipe != nil {
240-
handler = ingestHandlerPipe.Handler
241-
}
226+
227+
handlersPipe, decision := router.Matches(quesmaRequest)
228+
242229
if decision != nil {
243230
w.Header().Set(QuesmaTableResolverHeader, decision.String())
244231
} else {
245232
w.Header().Set(QuesmaTableResolverHeader, "n/a")
246233
}
247234

248-
if handler != nil {
235+
if handlersPipe != nil {
249236
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.QuesmaManagementConsole, func() (*quesma_api.Result, error) {
250-
return handler(ctx, quesmaRequest)
237+
return handlersPipe.Handler(ctx, quesmaRequest)
251238
})
252239

253240
zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")

quesma/quesma/dual_write_proxy_v2.go

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,15 @@ func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) {
7272
q.Close(ctx)
7373
}
7474

75-
func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, processor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
76-
queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver)
75+
func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
76+
queryProcessor := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver)
7777
// not sure how we should configure our query translator ???
7878
// is this a config option??
7979

80-
queryRunner.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral
80+
queryProcessor.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral
8181

8282
// tests should not be run with optimization enabled by default
83-
queryRunner.EnableQueryOptimization(config)
84-
85-
ingestRouter := ConfigureIngestRouterV2(config, processor, agent, resolver)
86-
searchRouter := ConfigureSearchRouterV2(config, registry, logManager, quesmaManagementConsole, queryRunner, resolver)
83+
queryProcessor.EnableQueryOptimization(config)
8784

8885
tr := &http.Transport{
8986
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
@@ -92,20 +89,45 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli
9289
Transport: tr,
9390
Timeout: time.Minute, // should be more configurable, 30s is Kibana default timeout
9491
}
95-
routerInstance := frontend_connectors.RouterV2{PhoneHomeAgent: agent, Config: config, QuesmaManagementConsole: quesmaManagementConsole, HttpClient: client, RequestPreprocessors: quesma_api.ProcessorChain{}}
92+
93+
routerInstance := frontend_connectors.RouterV2{PhoneHomeAgent: agent,
94+
Config: config, QuesmaManagementConsole: quesmaManagementConsole,
95+
HttpClient: client, RequestPreprocessors: quesma_api.ProcessorChain{}}
9696
routerInstance.
9797
RegisterPreprocessor(quesma_api.NewTraceIdPreprocessor())
9898
agent.FailedRequestsCollector(func() int64 {
9999
return routerInstance.FailedRequests.Load()
100100
})
101101

102-
elasticHttpFrontentConnector := NewElasticHttpFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
103-
&routerInstance, searchRouter.(*quesma_api.PathRouter), ingestRouter.(*quesma_api.PathRouter), logManager, agent)
102+
ingestRouter := ConfigureIngestRouterV2(config, ingestProcessor, agent, resolver)
103+
searchRouter := ConfigureSearchRouterV2(config, registry, logManager, quesmaManagementConsole, queryProcessor, resolver)
104+
105+
elasticHttpIngestFrontendConnector := NewElasticHttpIngestFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
106+
&routerInstance, logManager, agent)
107+
elasticHttpIngestFrontendConnector.AddRouter(ingestRouter)
108+
109+
elasticHttpQueryFrontendConnector := NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
110+
&routerInstance, logManager, agent)
111+
elasticHttpQueryFrontendConnector.AddRouter(searchRouter)
112+
113+
quesmaBuilder := quesma_api.NewQuesma()
114+
ingestPipeline := quesma_api.NewPipeline()
115+
ingestPipeline.AddFrontendConnector(elasticHttpIngestFrontendConnector)
116+
117+
queryPipeline := quesma_api.NewPipeline()
118+
queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector)
119+
quesmaBuilder.AddPipeline(ingestPipeline)
120+
quesmaBuilder.AddPipeline(queryPipeline)
121+
_, err := quesmaBuilder.Build()
122+
if err != nil {
123+
logger.Fatal().Msgf("Error building Quesma: %v", err)
124+
}
125+
104126
var limitedHandler http.Handler
105127
if config.DisableAuth {
106-
limitedHandler = newSimultaneousClientsLimiterV2(elasticHttpFrontentConnector, concurrentClientsLimitV2)
128+
limitedHandler = newSimultaneousClientsLimiterV2(elasticHttpIngestFrontendConnector, concurrentClientsLimitV2)
107129
} else {
108-
limitedHandler = newSimultaneousClientsLimiterV2(NewAuthMiddleware(elasticHttpFrontentConnector, config.Elasticsearch), concurrentClientsLimitV2)
130+
limitedHandler = newSimultaneousClientsLimiterV2(NewAuthMiddleware(elasticHttpIngestFrontendConnector, config.Elasticsearch), concurrentClientsLimitV2)
109131
}
110132

111133
return &dualWriteHttpProxyV2{
@@ -119,10 +141,10 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli
119141
logManager: logManager,
120142
publicPort: config.PublicTcpPort,
121143
asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor(
122-
queryRunner.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory),
123-
queryRunner.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory),
144+
queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory),
145+
queryProcessor.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory),
124146
),
125-
queryRunner: queryRunner,
147+
queryRunner: queryProcessor,
126148
}
127149
}
128150

quesma/quesma/elastic_http_frontend_connector.go

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,34 @@ import (
99
"quesma/frontend_connectors"
1010
"quesma/quesma/recovery"
1111
"quesma/telemetry"
12-
"quesma_v2/core"
12+
quesma_api "quesma_v2/core"
1313
)
1414

15-
type ElasticHttpFrontendConnector struct {
15+
type ElasticHttpIngestFrontendConnector struct {
1616
*frontend_connectors.BasicHTTPFrontendConnector
1717
routerInstance *frontend_connectors.RouterV2
18-
searchRouter *quesma_api.PathRouter
19-
ingestRouter *quesma_api.PathRouter
2018
logManager *clickhouse.LogManager
2119
agent telemetry.PhoneHomeAgent
2220
}
2321

24-
func NewElasticHttpFrontendConnector(endpoint string,
22+
func NewElasticHttpIngestFrontendConnector(endpoint string,
2523
routerInstance *frontend_connectors.RouterV2,
26-
searchRouter *quesma_api.PathRouter,
27-
ingestRouter *quesma_api.PathRouter,
2824
logManager *clickhouse.LogManager,
29-
agent telemetry.PhoneHomeAgent) *ElasticHttpFrontendConnector {
30-
return &ElasticHttpFrontendConnector{
25+
agent telemetry.PhoneHomeAgent) *ElasticHttpIngestFrontendConnector {
26+
27+
return &ElasticHttpIngestFrontendConnector{
3128
BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint),
3229
routerInstance: routerInstance,
33-
searchRouter: searchRouter,
34-
ingestRouter: ingestRouter,
3530
logManager: logManager,
3631
agent: agent,
3732
}
3833
}
3934

40-
func (h *ElasticHttpFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
35+
func serveHTTPHelper(w http.ResponseWriter, req *http.Request,
36+
routerInstance *frontend_connectors.RouterV2,
37+
pathRouter quesma_api.Router,
38+
agent telemetry.PhoneHomeAgent,
39+
logManager *clickhouse.LogManager) {
4140
defer recovery.LogPanic()
4241
reqBody, err := frontend_connectors.PeekBodyV2(req)
4342
if err != nil {
@@ -46,7 +45,35 @@ func (h *ElasticHttpFrontendConnector) ServeHTTP(w http.ResponseWriter, req *htt
4645
}
4746

4847
ua := req.Header.Get("User-Agent")
49-
h.agent.UserAgentCounters().Add(ua, 1)
48+
agent.UserAgentCounters().Add(ua, 1)
49+
50+
routerInstance.Reroute(req.Context(), w, req, reqBody, pathRouter, logManager)
51+
}
52+
53+
func (h *ElasticHttpIngestFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
54+
serveHTTPHelper(w, req, h.routerInstance, h.GetRouter(), h.agent, h.logManager)
55+
}
56+
57+
type ElasticHttpQueryFrontendConnector struct {
58+
*frontend_connectors.BasicHTTPFrontendConnector
59+
routerInstance *frontend_connectors.RouterV2
60+
logManager *clickhouse.LogManager
61+
agent telemetry.PhoneHomeAgent
62+
}
63+
64+
func NewElasticHttpQueryFrontendConnector(endpoint string,
65+
routerInstance *frontend_connectors.RouterV2,
66+
logManager *clickhouse.LogManager,
67+
agent telemetry.PhoneHomeAgent) *ElasticHttpIngestFrontendConnector {
68+
69+
return &ElasticHttpIngestFrontendConnector{
70+
BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint),
71+
routerInstance: routerInstance,
72+
logManager: logManager,
73+
agent: agent,
74+
}
75+
}
5076

51-
h.routerInstance.Reroute(req.Context(), w, req, reqBody, h.searchRouter, h.ingestRouter, h.logManager)
77+
func (h *ElasticHttpQueryFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
78+
serveHTTPHelper(w, req, h.routerInstance, h.GetRouter(), h.agent, h.logManager)
5279
}

quesma/v2/core/dispatch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
type HTTPFrontendHandler func(ctx context.Context, req *Request) (*Result, error)
1010

1111
type HandlersPipe struct {
12+
Predicate RequestMatcher
1213
Handler HTTPFrontendHandler
1314
Processors []Processor
1415
}

quesma/v2/core/mux.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (p *PathRouter) Clone() Cloner {
9393
}
9494

9595
func (p *PathRouter) Register(pattern string, predicate RequestMatcher, handler HTTPFrontendHandler) {
96-
mapping := mapping{pattern, urlpath.New(pattern), predicate, &HandlersPipe{Handler: handler}}
96+
mapping := mapping{pattern, urlpath.New(pattern), predicate, &HandlersPipe{Handler: handler, Predicate: predicate}}
9797
p.mappings = append(p.mappings, mapping)
9898

9999
}
@@ -203,6 +203,14 @@ func (p *PathRouter) GetHandlers() map[string]HandlersPipe {
203203
}
204204
func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) {
205205
for path, handler := range handlers {
206-
p.mappings = append(p.mappings, mapping{pattern: path, compiledPath: urlpath.New(path), handler: &handler})
206+
if _, ok := handler.Predicate.(*predicateAlways); ok { // in order to pass processors we have to make this alignment (predicates aren't present in the old API
207+
p.mappings = append(p.mappings, mapping{pattern: path,
208+
compiledPath: urlpath.New(path),
209+
handler: &HandlersPipe{Handler: handler.Handler,
210+
Predicate: handler.Predicate,
211+
Processors: handler.Processors}})
212+
} else {
213+
p.Register(path, handler.Predicate, handler.Handler)
214+
}
207215
}
208216
}

0 commit comments

Comments
 (0)