Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 3918405

Browse files
authored
Adopting PathRouter to new dispatching #1 (#1072)
This is first part of integration PathRouter to new API
1 parent bd2002d commit 3918405

File tree

9 files changed

+72
-21
lines changed

9 files changed

+72
-21
lines changed

quesma/frontend_connectors/basic_http_frontend_connector.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ func (router *HTTPRouter) Multiplexer() *http.ServeMux {
8989
return router.mux
9090
}
9191

92+
func (router *HTTPRouter) Register(pattern string, predicate quesma_api.RequestMatcher, handler quesma_api.Handler) {
93+
panic("not implemented")
94+
}
95+
96+
func (router *HTTPRouter) Matches(req *quesma_api.Request) (*quesma_api.HttpHandlersPipe, *quesma_api.Decision) {
97+
panic("not implemented")
98+
}
99+
92100
type BasicHTTPFrontendConnector struct {
93101
listener *http.Server
94102
router quesma_api.Router
@@ -161,6 +169,7 @@ func (h *BasicHTTPFrontendConnector) Listen() error {
161169
h.listener.Handler = h
162170
go func() {
163171
err := h.listener.ListenAndServe()
172+
// TODO: Handle error
164173
_ = err
165174
}()
166175

quesma/frontend_connectors/router_v2.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,19 +225,19 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
225225
quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)
226226
var handler quesma_api.Handler
227227
var decision *quesma_api.Decision
228-
searchHandler, searchDecision := searchRouter.Matches(quesmaRequest)
228+
searchHandlerPipe, searchDecision := searchRouter.Matches(quesmaRequest)
229229
if searchDecision != nil {
230230
decision = searchDecision
231231
}
232-
if searchHandler != nil {
233-
handler = searchHandler
232+
if searchHandlerPipe != nil {
233+
handler = searchHandlerPipe.Handler
234234
}
235-
ingestHandler, ingestDecision := ingestRouter.Matches(quesmaRequest)
235+
ingestHandlerPipe, ingestDecision := ingestRouter.Matches(quesmaRequest)
236236
if searchDecision == nil {
237237
decision = ingestDecision
238238
}
239-
if searchHandler == nil {
240-
handler = ingestHandler
239+
if searchHandlerPipe == nil && ingestHandlerPipe != nil {
240+
handler = ingestHandlerPipe.Handler
241241
}
242242
if decision != nil {
243243
w.Header().Set(QuesmaTableResolverHeader, decision.String())

quesma/quesma/dual_write_proxy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,17 +308,17 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
308308

309309
quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)
310310

311-
handler, decision := router.Matches(quesmaRequest)
311+
handlersPipe, decision := router.Matches(quesmaRequest)
312312

313313
if decision != nil {
314314
w.Header().Set(frontend_connectors.QuesmaTableResolverHeader, decision.String())
315315
} else {
316316
w.Header().Set(frontend_connectors.QuesmaTableResolverHeader, "n/a")
317317
}
318318

319-
if handler != nil {
319+
if handlersPipe != nil {
320320
quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*quesma_api.Result, error) {
321-
return handler(ctx, quesmaRequest)
321+
return handlersPipe.Handler(ctx, quesmaRequest)
322322
})
323323

324324
zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")

quesma/quesma/dual_write_proxy_v2.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli
100100
})
101101

102102
elasticHttpFrontentConnector := NewElasticHttpFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
103-
&routerInstance, searchRouter, ingestRouter, logManager, agent)
104-
103+
&routerInstance, searchRouter.(*quesma_api.PathRouter), ingestRouter.(*quesma_api.PathRouter), logManager, agent)
105104
var limitedHandler http.Handler
106105
if config.DisableAuth {
107106
limitedHandler = newSimultaneousClientsLimiterV2(elasticHttpFrontentConnector, concurrentClientsLimitV2)

quesma/quesma/router_v2.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
"time"
3434
)
3535

36-
func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, ip *ingest.IngestProcessor, phoneHomeAgent telemetry.PhoneHomeAgent, tableResolver table_resolver.TableResolver) *quesma_api.PathRouter {
36+
func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, ip *ingest.IngestProcessor, phoneHomeAgent telemetry.PhoneHomeAgent, tableResolver table_resolver.TableResolver) quesma_api.Router {
3737
// some syntactic sugar
3838
method := quesma_api.IsHTTPMethod
3939
and := quesma_api.And
@@ -127,7 +127,7 @@ func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, ip *ingest.IngestP
127127
return router
128128
}
129129

130-
func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, console *ui.QuesmaManagementConsole, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) *quesma_api.PathRouter {
130+
func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, console *ui.QuesmaManagementConsole, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) quesma_api.Router {
131131

132132
// some syntactic sugar
133133
method := quesma_api.IsHTTPMethod

quesma/v2/core/dispatch.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@
33
package quesma_api
44

55
import (
6+
"context"
67
"net/http"
78
)
89

10+
// TODO currently there are two types of handlers, HTTPFrontendHandler and Handler
11+
// first one comes from v2 POC and the second one comes from v1 quesma
12+
// we need to unify them
913
type HTTPFrontendHandler func(request *http.Request) (map[string]interface{}, any, error)
14+
type Handler func(ctx context.Context, req *Request) (*Result, error)
1015

1116
type HandlersPipe struct {
1217
Handler HTTPFrontendHandler

quesma/v2/core/mux.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
package quesma_api
44

55
import (
6-
"context"
76
"github.com/ucarion/urlpath"
87
"net/http"
98
"net/url"
@@ -15,12 +14,15 @@ type (
1514
PathRouter struct {
1615
mappings []mapping
1716
}
17+
HttpHandlersPipe struct {
18+
Handler Handler
19+
Processors []Processor
20+
}
1821
mapping struct {
1922
pattern string
2023
compiledPath urlpath.Path
2124
predicate RequestMatcher
22-
handler Handler
23-
processors []Processor
25+
handler *HttpHandlersPipe
2426
}
2527
Result struct {
2628
Body string
@@ -40,8 +42,6 @@ type (
4042
ParsedBody RequestBody
4143
}
4244

43-
Handler func(ctx context.Context, req *Request) (*Result, error)
44-
4545
MatchResult struct {
4646
Matched bool
4747
Decision *Decision
@@ -78,14 +78,22 @@ func NewPathRouter() *PathRouter {
7878
return &PathRouter{mappings: make([]mapping, 0)}
7979
}
8080

81+
func (p *PathRouter) Clone() Cloner {
82+
newRouter := NewPathRouter()
83+
for _, mapping := range p.mappings {
84+
newRouter.Register(mapping.pattern, mapping.predicate, mapping.handler.Handler)
85+
}
86+
return newRouter
87+
}
88+
8189
func (p *PathRouter) Register(pattern string, predicate RequestMatcher, handler Handler) {
8290

83-
mapping := mapping{pattern, urlpath.New(pattern), predicate, handler, nil}
91+
mapping := mapping{pattern, urlpath.New(pattern), predicate, &HttpHandlersPipe{Handler: handler}}
8492
p.mappings = append(p.mappings, mapping)
8593

8694
}
8795

88-
func (p *PathRouter) Matches(req *Request) (Handler, *Decision) {
96+
func (p *PathRouter) Matches(req *Request) (*HttpHandlersPipe, *Decision) {
8997
handler, decision := p.findHandler(req)
9098
if handler != nil {
9199
routerStatistics.addMatched(req.Path)
@@ -96,7 +104,7 @@ func (p *PathRouter) Matches(req *Request) (Handler, *Decision) {
96104
}
97105
}
98106

99-
func (p *PathRouter) findHandler(req *Request) (Handler, *Decision) {
107+
func (p *PathRouter) findHandler(req *Request) (*HttpHandlersPipe, *Decision) {
100108
path := strings.TrimSuffix(req.Path, "/")
101109
for _, m := range p.mappings {
102110
meta, match := m.compiledPath.Match(path)
@@ -171,3 +179,24 @@ func (p *predicateAlways) Matches(req *Request) MatchResult {
171179
func Always() RequestMatcher {
172180
return &predicateAlways{}
173181
}
182+
183+
func (p *PathRouter) AddRoute(path string, handler HTTPFrontendHandler) {
184+
// TODO: it seems that we can adapt this to register call
185+
// p.Register(path, Always(), handler)
186+
panic("not implemented")
187+
}
188+
func (p *PathRouter) AddFallbackHandler(handler HTTPFrontendHandler) {
189+
panic("not implemented")
190+
}
191+
func (p *PathRouter) GetFallbackHandler() HTTPFrontendHandler {
192+
panic("not implemented")
193+
}
194+
func (p *PathRouter) GetHandlers() map[string]HandlersPipe {
195+
panic("not implemented")
196+
}
197+
func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) {
198+
panic("not implemented")
199+
}
200+
func (p *PathRouter) Multiplexer() *http.ServeMux {
201+
panic("not implemented")
202+
}

quesma/v2/core/quesma_apis.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type Router interface {
1616
GetHandlers() map[string]HandlersPipe
1717
SetHandlers(handlers map[string]HandlersPipe)
1818
Multiplexer() *http.ServeMux
19+
Register(pattern string, predicate RequestMatcher, handler Handler)
20+
Matches(req *Request) (*HttpHandlersPipe, *Decision)
1921
}
2022

2123
type FrontendConnector interface {

quesma/v2/core/quesma_pipeline.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ func (p *Pipeline) Build() PipelineBuilder {
3535
}
3636

3737
func (p *Pipeline) Start() {
38+
// TODO connectors for the same endpoint should be sharing the same listener
39+
// This is a temporary solution to start all connectors
40+
// some of them will fail to start
41+
// because the port is already in use
42+
// This works well from application perspective
43+
// because we are copying routing table from all connectors
44+
// however, bind error remains
3845
for _, conn := range p.FrontendConnectors {
3946
go conn.Listen()
4047
}

0 commit comments

Comments
 (0)