Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 891fab9

Browse files
authored
Unifying HTTPFrontendHandler (#1081)
This PR unifies `HTTPFrontendHandler`. Now `POC router` and v1 `PathRouter` uses the same handler signature: ``` type HTTPFrontendHandler func(ctx context.Context, req *Request) (*Result, error) ```
1 parent b4953ab commit 891fab9

File tree

10 files changed

+63
-65
lines changed

10 files changed

+63
-65
lines changed

quesma/frontend_connectors/basic_http_frontend_connector.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,11 @@ func (router *HTTPRouter) Unlock() {
8585
router.mutex.Unlock()
8686
}
8787

88-
func (router *HTTPRouter) Multiplexer() *http.ServeMux {
89-
return router.mux
90-
}
91-
92-
func (router *HTTPRouter) Register(pattern string, predicate quesma_api.RequestMatcher, handler quesma_api.Handler) {
88+
func (router *HTTPRouter) Register(pattern string, predicate quesma_api.RequestMatcher, handler quesma_api.HTTPFrontendHandler) {
9389
panic("not implemented")
9490
}
9591

96-
func (router *HTTPRouter) Matches(req *quesma_api.Request) (*quesma_api.HttpHandlersPipe, *quesma_api.Decision) {
92+
func (router *HTTPRouter) Matches(req *quesma_api.Request) (*quesma_api.HandlersPipe, *quesma_api.Decision) {
9793
panic("not implemented")
9894
}
9995

@@ -132,8 +128,8 @@ func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.
132128
if h.router.GetFallbackHandler() != nil {
133129
fmt.Printf("No handler found for path: %s\n", req.URL.Path)
134130
handler := h.router.GetFallbackHandler()
135-
_, message, _ := handler(req)
136-
_, err := w.Write(message.([]byte))
131+
result, _ := handler(context.Background(), &quesma_api.Request{OriginalRequest: req})
132+
_, err := w.Write(result.GenericResult.([]byte))
137133
if err != nil {
138134
fmt.Printf("Error writing response: %s\n", err)
139135
}
@@ -142,9 +138,9 @@ func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.
142138
return
143139
}
144140
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
145-
metadata, message, _ := handlerWrapper.Handler(req)
141+
result, _ := handlerWrapper.Handler(context.Background(), &quesma_api.Request{OriginalRequest: req})
146142

147-
_, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message)
143+
_, message := dispatcher.Dispatch(handlerWrapper.Processors, result.Meta, result.GenericResult)
148144
_, err := w.Write(message.([]byte))
149145
if err != nil {
150146
fmt.Printf("Error writing response: %s\n", err)

quesma/frontend_connectors/elasticsearch_ingest.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package frontend_connectors
55

66
import (
7+
"context"
78
"github.com/ucarion/urlpath"
89
"net/http"
910
quesma_api "quesma_v2/core"
@@ -44,18 +45,18 @@ func setContentType(w http.ResponseWriter) http.ResponseWriter {
4445
return w
4546
}
4647

47-
func bulk(request *http.Request) (map[string]interface{}, any, error) {
48+
func bulk(_ context.Context, request *quesma_api.Request) (*quesma_api.Result, error) {
4849
metadata := quesma_api.MakeNewMetadata()
4950
metadata[IngestAction] = BulkIndexAction
50-
metadata[IngestTargetKey] = getIndexFromRequest(request)
51-
return metadata, request, nil
51+
metadata[IngestTargetKey] = getIndexFromRequest(request.OriginalRequest)
52+
return &quesma_api.Result{Meta: metadata, GenericResult: request}, nil
5253
}
5354

54-
func doc(request *http.Request) (map[string]interface{}, any, error) {
55+
func doc(_ context.Context, request *quesma_api.Request) (*quesma_api.Result, error) {
5556
metadata := quesma_api.MakeNewMetadata()
5657
metadata[IngestAction] = DocIndexAction
57-
metadata[IngestTargetKey] = getIndexFromRequest(request)
58-
return metadata, request, nil
58+
metadata[IngestTargetKey] = getIndexFromRequest(request.OriginalRequest)
59+
return &quesma_api.Result{Meta: metadata, GenericResult: request}, nil
5960
}
6061

6162
func getIndexFromRequest(request *http.Request) string {

quesma/frontend_connectors/router_v2.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseW
5151
logger.Debug().Str(logger.RID, id).Msg("responding from Quesma")
5252

5353
for key, value := range quesmaResponse.Meta {
54-
w.Header().Set(key, value)
54+
w.Header().Set(key, value.(string))
5555
}
5656
if zip {
5757
w.Header().Set("Content-Encoding", "gzip")
@@ -223,7 +223,7 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
223223
}
224224

225225
quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)
226-
var handler quesma_api.Handler
226+
var handler quesma_api.HTTPFrontendHandler
227227
var decision *quesma_api.Decision
228228
searchHandlerPipe, searchDecision := searchRouter.Matches(quesmaRequest)
229229
if searchDecision != nil {

quesma/main_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package main
66
import (
77
"context"
88
"github.com/stretchr/testify/assert"
9-
"net/http"
109
"os"
1110
"os/signal"
1211
"quesma/backend_connectors"
@@ -54,11 +53,11 @@ func Test_backendConnectorValidation(t *testing.T) {
5453

5554
var fallbackCalled int32 = 0
5655

57-
func fallback(request *http.Request) (map[string]interface{}, any, error) {
56+
func fallback(_ context.Context, _ *quesma_api.Request) (*quesma_api.Result, error) {
5857
metadata := quesma_api.MakeNewMetadata()
5958
atomic.AddInt32(&fallbackCalled, 1)
6059
resp := []byte("unknown\n")
61-
return metadata, resp, nil
60+
return &quesma_api.Result{Meta: metadata, GenericResult: resp}, nil
6261
}
6362

6463
func ab_testing_scenario() quesma_api.QuesmaBuilder {

quesma/quesma/dual_write_proxy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func responseFromQuesma(ctx context.Context, unzipped []byte, w http.ResponseWri
198198
logger.Debug().Str(logger.RID, id).Msg("responding from Quesma")
199199

200200
for key, value := range quesmaResponse.Meta {
201-
w.Header().Set(key, value)
201+
w.Header().Set(key, value.(string))
202202
}
203203
if zip {
204204
w.Header().Set("Content-Encoding", "gzip")

quesma/quesma/router.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ func elasticsearchCountResult(body int64, statusCode int) (*quesma_api.Result, e
469469
if err != nil {
470470
return nil, err
471471
}
472-
return &quesma_api.Result{Body: string(serialized), Meta: map[string]string{
472+
return &quesma_api.Result{Body: string(serialized), Meta: map[string]any{
473473
"Content-Type": "application/json",
474474
"X-Quesma-Headers-Source": "Quesma",
475475
}, StatusCode: statusCode}, nil
@@ -486,7 +486,7 @@ type countResult struct {
486486
}
487487

488488
func elasticsearchQueryResult(body string, statusCode int) *quesma_api.Result {
489-
return &quesma_api.Result{Body: body, Meta: map[string]string{
489+
return &quesma_api.Result{Body: body, Meta: map[string]any{
490490
// TODO copy paste from the original request
491491
"X-Quesma-Headers-Source": "Quesma",
492492
}, StatusCode: statusCode}
@@ -556,7 +556,7 @@ func bulkInsertResult(ctx context.Context, ops []bulk.BulkItem, err error) (*que
556556
}
557557

558558
func elasticsearchInsertResult(body string, statusCode int) *quesma_api.Result {
559-
return &quesma_api.Result{Body: body, Meta: map[string]string{
559+
return &quesma_api.Result{Body: body, Meta: map[string]any{
560560
// TODO copy paste from the original request
561561
frontend_connectors.ContentTypeHeaderKey: "application/json",
562562
"X-Quesma-Headers-Source": "Quesma",
@@ -575,7 +575,7 @@ func resolveIndexResult(sources elasticsearch.Sources) (*quesma_api.Result, erro
575575

576576
return &quesma_api.Result{
577577
Body: string(body),
578-
Meta: map[string]string{},
578+
Meta: map[string]any{},
579579
StatusCode: http.StatusOK}, nil
580580
}
581581

quesma/v2/core/dispatch.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,9 @@ package quesma_api
44

55
import (
66
"context"
7-
"net/http"
87
)
98

10-
// TODO currently there are two types of handlers, HTTPFrontendHandler and Handler
11-
// first one comes from v2 POC and the second one comes from v1 quesma
12-
// we need to unify them
13-
type HTTPFrontendHandler func(request *http.Request) (map[string]interface{}, any, error)
14-
type Handler func(ctx context.Context, req *Request) (*Result, error)
9+
type HTTPFrontendHandler func(ctx context.Context, req *Request) (*Result, error)
1510

1611
type HandlersPipe struct {
1712
Handler HTTPFrontendHandler

quesma/v2/core/mux.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,25 @@ type (
1414
PathRouter struct {
1515
mappings []mapping
1616
}
17-
HttpHandlersPipe struct {
18-
Handler Handler
19-
Processors []Processor
20-
}
2117
mapping struct {
2218
pattern string
2319
compiledPath urlpath.Path
2420
predicate RequestMatcher
25-
handler *HttpHandlersPipe
21+
handler *HandlersPipe
2622
}
23+
// Result is a kind of adapter for response
24+
// to uniform v1 routing
25+
// GenericResult is generic result that can be used by processors
2726
Result struct {
28-
Body string
29-
Meta map[string]string
30-
StatusCode int
27+
Body string
28+
Meta map[string]any
29+
StatusCode int
30+
GenericResult any
3131
}
3232

33+
// Request is kind of adapter for http.Request
34+
// to uniform v1 routing
35+
// it stores original http request
3336
Request struct {
3437
Method string
3538
Path string
@@ -40,6 +43,8 @@ type (
4043

4144
Body string
4245
ParsedBody RequestBody
46+
// OriginalRequest is the original http.Request object that was received by the server.
47+
OriginalRequest *http.Request
4348
}
4449

4550
MatchResult struct {
@@ -56,14 +61,14 @@ type RequestMatcherFunc func(req *Request) MatchResult
5661
func ServerErrorResult() *Result {
5762
return &Result{
5863
StatusCode: http.StatusInternalServerError,
59-
Meta: map[string]string{"Content-Type": "text/plain"},
64+
Meta: map[string]any{"Content-Type": "text/plain"},
6065
}
6166
}
6267

6368
func BadReqeustResult() *Result {
6469
return &Result{
6570
StatusCode: http.StatusBadRequest,
66-
Meta: map[string]string{"Content-Type": "text/plain"},
71+
Meta: map[string]any{"Content-Type": "text/plain"},
6772
}
6873
}
6974

@@ -86,14 +91,14 @@ func (p *PathRouter) Clone() Cloner {
8691
return newRouter
8792
}
8893

89-
func (p *PathRouter) Register(pattern string, predicate RequestMatcher, handler Handler) {
94+
func (p *PathRouter) Register(pattern string, predicate RequestMatcher, handler HTTPFrontendHandler) {
9095

91-
mapping := mapping{pattern, urlpath.New(pattern), predicate, &HttpHandlersPipe{Handler: handler}}
96+
mapping := mapping{pattern, urlpath.New(pattern), predicate, &HandlersPipe{Handler: handler}}
9297
p.mappings = append(p.mappings, mapping)
9398

9499
}
95100

96-
func (p *PathRouter) Matches(req *Request) (*HttpHandlersPipe, *Decision) {
101+
func (p *PathRouter) Matches(req *Request) (*HandlersPipe, *Decision) {
97102
handler, decision := p.findHandler(req)
98103
if handler != nil {
99104
routerStatistics.addMatched(req.Path)
@@ -104,7 +109,7 @@ func (p *PathRouter) Matches(req *Request) (*HttpHandlersPipe, *Decision) {
104109
}
105110
}
106111

107-
func (p *PathRouter) findHandler(req *Request) (*HttpHandlersPipe, *Decision) {
112+
func (p *PathRouter) findHandler(req *Request) (*HandlersPipe, *Decision) {
108113
path := strings.TrimSuffix(req.Path, "/")
109114
for _, m := range p.mappings {
110115
meta, match := m.compiledPath.Match(path)
@@ -192,11 +197,14 @@ func (p *PathRouter) GetFallbackHandler() HTTPFrontendHandler {
192197
panic("not implemented")
193198
}
194199
func (p *PathRouter) GetHandlers() map[string]HandlersPipe {
195-
panic("not implemented")
200+
callInfos := make(map[string]HandlersPipe)
201+
for _, v := range p.mappings {
202+
callInfos[v.pattern] = *v.handler
203+
}
204+
return callInfos
196205
}
197206
func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) {
198-
panic("not implemented")
199-
}
200-
func (p *PathRouter) Multiplexer() *http.ServeMux {
201-
panic("not implemented")
207+
for path, handler := range handlers {
208+
p.mappings = append(p.mappings, mapping{pattern: path, compiledPath: urlpath.New(path), handler: &handler})
209+
}
202210
}

quesma/v2/core/quesma_apis.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package quesma_api
55
import (
66
"context"
77
"net"
8-
"net/http"
98
)
109

1110
type Router interface {
@@ -15,9 +14,8 @@ type Router interface {
1514
GetFallbackHandler() HTTPFrontendHandler
1615
GetHandlers() map[string]HandlersPipe
1716
SetHandlers(handlers map[string]HandlersPipe)
18-
Multiplexer() *http.ServeMux
19-
Register(pattern string, predicate RequestMatcher, handler Handler)
20-
Matches(req *Request) (*HttpHandlersPipe, *Decision)
17+
Register(pattern string, predicate RequestMatcher, handler HTTPFrontendHandler)
18+
Matches(req *Request) (*HandlersPipe, *Decision)
2119
}
2220

2321
type FrontendConnector interface {

quesma/v2_test_objects.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package main
55

66
import (
7+
"context"
78
"net/http"
89
"quesma/frontend_connectors"
910
"quesma/processors"
@@ -78,41 +79,41 @@ var responses = [][]byte{
7879
}`),
7980
}
8081

81-
func bulk(request *http.Request) (map[string]interface{}, any, error) {
82-
_, err := frontend_connectors.ReadRequestBody(request)
82+
func bulk(_ context.Context, request *quesma_api.Request) (*quesma_api.Result, error) {
83+
_, err := frontend_connectors.ReadRequestBody(request.OriginalRequest)
8384
if err != nil {
84-
return nil, nil, err
85+
return nil, err
8586
}
8687
metadata := quesma_api.MakeNewMetadata()
8788
metadata["level"] = 0
8889
resp := []byte("bulk\n")
8990
atomic.AddInt64(&correlationId, 1)
9091
quesma_api.SetCorrelationId(metadata, correlationId)
91-
return metadata, resp, nil
92+
return &quesma_api.Result{Meta: metadata, GenericResult: resp}, nil
9293
}
9394

94-
func doc(request *http.Request) (map[string]interface{}, any, error) {
95-
_, err := frontend_connectors.ReadRequestBody(request)
95+
func doc(_ context.Context, request *quesma_api.Request) (*quesma_api.Result, error) {
96+
_, err := frontend_connectors.ReadRequestBody(request.OriginalRequest)
9697
if err != nil {
97-
return nil, nil, err
98+
return nil, err
9899
}
99100
metadata := quesma_api.MakeNewMetadata()
100101
metadata["level"] = 0
101102
atomic.AddInt64(&correlationId, 1)
102103
quesma_api.SetCorrelationId(metadata, correlationId)
103104
resp := []byte("doc\n")
104105

105-
return metadata, resp, nil
106+
return &quesma_api.Result{Meta: metadata, GenericResult: resp}, nil
106107
}
107108

108109
var correlationId int64 = 0
109110

110-
func search(request *http.Request) (map[string]interface{}, any, error) {
111+
func search(_ context.Context, request *quesma_api.Request) (*quesma_api.Result, error) {
111112
metadata := quesma_api.MakeNewMetadata()
112113
metadata["level"] = 0
113114
atomic.AddInt64(&correlationId, 1)
114115
quesma_api.SetCorrelationId(metadata, correlationId)
115-
return metadata, request, nil
116+
return &quesma_api.Result{Meta: metadata, GenericResult: request.OriginalRequest}, nil
116117
}
117118

118119
type IngestProcessor struct {

0 commit comments

Comments
 (0)