Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit c2f696f

Browse files
authored
Using new http frontend connector API (#1059)
This PR: - introduces `ElasticHttpFrontendConnector` and integrate with the existing routing implementation. - extracts `elasticFallback` function
1 parent 61d0d96 commit c2f696f

File tree

2 files changed

+120
-72
lines changed

2 files changed

+120
-72
lines changed

quesma/quesma/dual_write_proxy_v2.go

Lines changed: 68 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -110,24 +110,14 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli
110110
return routerInstance.failedRequests.Load()
111111
})
112112

113-
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
114-
defer recovery.LogPanic()
115-
reqBody, err := peekBodyV2(req)
116-
if err != nil {
117-
http.Error(w, "Error reading request body", http.StatusInternalServerError)
118-
return
119-
}
120-
121-
ua := req.Header.Get("User-Agent")
122-
agent.UserAgentCounters().Add(ua, 1)
113+
elasticHttpFrontentConnector := NewElasticHttpFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
114+
&routerInstance, searchRouter, ingestRouter, logManager, agent)
123115

124-
routerInstance.reroute(req.Context(), w, req, reqBody, searchRouter, ingestRouter, logManager)
125-
})
126116
var limitedHandler http.Handler
127117
if config.DisableAuth {
128-
limitedHandler = newSimultaneousClientsLimiterV2(handler, concurrentClientsLimitV2)
118+
limitedHandler = newSimultaneousClientsLimiterV2(elasticHttpFrontentConnector, concurrentClientsLimitV2)
129119
} else {
130-
limitedHandler = newSimultaneousClientsLimiterV2(NewAuthMiddleware(handler, config.Elasticsearch), concurrentClientsLimitV2)
120+
limitedHandler = newSimultaneousClientsLimiterV2(NewAuthMiddleware(elasticHttpFrontentConnector, config.Elasticsearch), concurrentClientsLimitV2)
131121
}
132122

133123
return &dualWriteHttpProxyV2{
@@ -286,6 +276,69 @@ func (*routerV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter,
286276

287277
}
288278

279+
func (r *routerV2) elasticFallback(decision *table_resolver.Decision,
280+
ctx context.Context, w http.ResponseWriter,
281+
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager) {
282+
283+
var sendToElastic bool
284+
285+
if decision != nil {
286+
287+
if decision.Err != nil {
288+
w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse)
289+
addProductAndContentHeaders(req.Header, w.Header())
290+
r.errorResponseV2(ctx, decision.Err, w)
291+
return
292+
}
293+
294+
if decision.IsClosed {
295+
w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse)
296+
addProductAndContentHeaders(req.Header, w.Header())
297+
r.closedIndexResponse(ctx, w, decision.IndexPattern)
298+
return
299+
}
300+
301+
if decision.IsEmpty {
302+
w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse)
303+
addProductAndContentHeaders(req.Header, w.Header())
304+
w.WriteHeader(http.StatusNoContent)
305+
w.Write(queryparser.EmptySearchResponse(ctx))
306+
return
307+
}
308+
309+
for _, connector := range decision.UseConnectors {
310+
if _, ok := connector.(*table_resolver.ConnectorDecisionElastic); ok {
311+
// this is desired elastic call
312+
sendToElastic = true
313+
break
314+
}
315+
}
316+
317+
} else {
318+
// this is fallback case
319+
// in case we don't support sth, we should send it to Elastic
320+
sendToElastic = true
321+
}
322+
323+
if sendToElastic {
324+
feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(opaqueIdHeaderKey), logManager.ResolveIndexPattern)
325+
326+
rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true)
327+
response := rawResponse.response
328+
if response != nil {
329+
responseFromElasticV2(ctx, response, w)
330+
} else {
331+
w.Header().Set(quesmaSourceHeader, quesmaSourceElastic)
332+
w.WriteHeader(500)
333+
if rawResponse.error != nil {
334+
_, _ = w.Write([]byte(rawResponse.error.Error()))
335+
}
336+
}
337+
} else {
338+
r.errorResponseV2(ctx, end_user_errors.ErrNoConnector.New(fmt.Errorf("no connector found")), w)
339+
}
340+
}
341+
289342
func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, searchRouter *mux.PathRouter, ingestRouter *mux.PathRouter, logManager *clickhouse.LogManager) {
290343
defer recovery.LogAndHandlePanic(ctx, func(err error) {
291344
w.WriteHeader(500)
@@ -352,64 +405,7 @@ func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http
352405
r.errorResponseV2(ctx, err, w)
353406
}
354407
} else {
355-
356-
var sendToElastic bool
357-
358-
if decision != nil {
359-
360-
if decision.Err != nil {
361-
w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse)
362-
addProductAndContentHeaders(req.Header, w.Header())
363-
r.errorResponseV2(ctx, decision.Err, w)
364-
return
365-
}
366-
367-
if decision.IsClosed {
368-
w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse)
369-
addProductAndContentHeaders(req.Header, w.Header())
370-
r.closedIndexResponse(ctx, w, decision.IndexPattern)
371-
return
372-
}
373-
374-
if decision.IsEmpty {
375-
w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse)
376-
addProductAndContentHeaders(req.Header, w.Header())
377-
w.WriteHeader(http.StatusNoContent)
378-
w.Write(queryparser.EmptySearchResponse(ctx))
379-
return
380-
}
381-
382-
for _, connector := range decision.UseConnectors {
383-
if _, ok := connector.(*table_resolver.ConnectorDecisionElastic); ok {
384-
// this is desired elastic call
385-
sendToElastic = true
386-
break
387-
}
388-
}
389-
390-
} else {
391-
// this is fallback case
392-
// in case we don't support sth, we should send it to Elastic
393-
sendToElastic = true
394-
}
395-
396-
if sendToElastic {
397-
feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(opaqueIdHeaderKey), logManager.ResolveIndexPattern)
398-
399-
rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true)
400-
response := rawResponse.response
401-
if response != nil {
402-
responseFromElasticV2(ctx, response, w)
403-
} else {
404-
w.Header().Set(quesmaSourceHeader, quesmaSourceElastic)
405-
w.WriteHeader(500)
406-
if rawResponse.error != nil {
407-
_, _ = w.Write([]byte(rawResponse.error.Error()))
408-
}
409-
}
410-
} else {
411-
r.errorResponseV2(ctx, end_user_errors.ErrNoConnector.New(fmt.Errorf("no connector found")), w)
412-
}
408+
r.elasticFallback(decision, ctx, w, req, reqBody, logManager)
413409
}
414410
}
415411

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
4+
package quesma
5+
6+
import (
7+
"net/http"
8+
"quesma/clickhouse"
9+
"quesma/frontend_connectors"
10+
"quesma/quesma/mux"
11+
"quesma/quesma/recovery"
12+
"quesma/telemetry"
13+
)
14+
15+
type ElasticHttpFrontendConnector struct {
16+
*frontend_connectors.BasicHTTPFrontendConnector
17+
routerInstance *routerV2
18+
searchRouter *mux.PathRouter
19+
ingestRouter *mux.PathRouter
20+
logManager *clickhouse.LogManager
21+
agent telemetry.PhoneHomeAgent
22+
}
23+
24+
func NewElasticHttpFrontendConnector(endpoint string,
25+
routerInstance *routerV2,
26+
searchRouter *mux.PathRouter,
27+
ingestRouter *mux.PathRouter,
28+
logManager *clickhouse.LogManager,
29+
agent telemetry.PhoneHomeAgent) *ElasticHttpFrontendConnector {
30+
return &ElasticHttpFrontendConnector{
31+
BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint),
32+
routerInstance: routerInstance,
33+
searchRouter: searchRouter,
34+
ingestRouter: ingestRouter,
35+
logManager: logManager,
36+
agent: agent,
37+
}
38+
}
39+
40+
func (h *ElasticHttpFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
41+
defer recovery.LogPanic()
42+
reqBody, err := peekBodyV2(req)
43+
if err != nil {
44+
http.Error(w, "Error reading request body", http.StatusInternalServerError)
45+
return
46+
}
47+
48+
ua := req.Header.Get("User-Agent")
49+
h.agent.UserAgentCounters().Add(ua, 1)
50+
51+
h.routerInstance.reroute(req.Context(), w, req, reqBody, h.searchRouter, h.ingestRouter, h.logManager)
52+
}

0 commit comments

Comments
 (0)