Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit e2ee6a6

Browse files
committed
Adding quesma v2 core apis
1 parent 0186161 commit e2ee6a6

File tree

7 files changed

+386
-0
lines changed

7 files changed

+386
-0
lines changed

quesma/core/backend_connectors.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
import "context"
6+
7+
type BackendConnectorType int
8+
9+
const (
10+
NoopBackend = iota
11+
MySQLBackend
12+
PgSQLBackend
13+
)
14+
15+
func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string {
16+
switch connectorType {
17+
case MySQLBackend:
18+
return "mysql"
19+
case PgSQLBackend:
20+
return "pgsql"
21+
default:
22+
return "noop"
23+
}
24+
}
25+
26+
type NoopBackendConnector struct {
27+
}
28+
29+
func (p *NoopBackendConnector) GetId() BackendConnectorType {
30+
return NoopBackend
31+
}
32+
33+
func (p *NoopBackendConnector) Open() error {
34+
return nil
35+
}
36+
37+
func (p *NoopBackendConnector) Close() error {
38+
return nil
39+
}
40+
41+
func (p *NoopBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (Rows, error) {
42+
return nil, nil
43+
}
44+
45+
func (p *NoopBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
46+
return nil
47+
}

quesma/core/dispatch.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
import (
6+
"net/http"
7+
)
8+
9+
type HTTPFrontendHandler func(request *http.Request) (map[string]interface{}, any, error)
10+
11+
type HandlersPipe struct {
12+
Handler HTTPFrontendHandler
13+
Processors []Processor
14+
}
15+
16+
type Dispatcher struct {
17+
}
18+
19+
func (d *Dispatcher) Dispatch(processors []Processor, metadata map[string]interface{}, message any) (map[string]interface{}, any) {
20+
return d.dispatch(processors, metadata, message)
21+
}
22+
23+
func (d *Dispatcher) dispatch(processors []Processor, metadata map[string]interface{}, message any) (map[string]any, any) {
24+
// Process the response using the processor
25+
var inputMessages []any
26+
inputMessages = append(inputMessages, message)
27+
if processors == nil {
28+
return metadata, inputMessages[0]
29+
}
30+
var outMessage any
31+
for _, processor := range processors {
32+
metadata, outerMessage, _ := processor.Handle(metadata, inputMessages...)
33+
outMessage = outerMessage
34+
inputMessages = make([]any, 0)
35+
innerProcessors := processor.GetProcessors()
36+
for _, innerProc := range innerProcessors {
37+
// TODO inner processor can have its own processors
38+
metadata, outMessage, _ = innerProc.Handle(metadata, outerMessage)
39+
inputMessages = append(inputMessages, outMessage)
40+
}
41+
}
42+
return metadata, outMessage
43+
}

quesma/core/metadata_api.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
import "strconv"
6+
7+
func MakeNewMetadata() map[string]any {
8+
return make(map[string]any)
9+
}
10+
11+
func SetCorrelationId(metadata map[string]any, correlationId int64) {
12+
metadata["correlationId"] = strconv.FormatInt(correlationId, 10)
13+
}
14+
15+
func GetCorrelationId(metadata map[string]any) string {
16+
if correlationId, ok := metadata["correlationId"]; !ok {
17+
panic("CorrelationId not found in metadata")
18+
} else {
19+
checkedCorrelationId, err := CheckedCast[string](correlationId)
20+
if err != nil {
21+
panic("CorrelationId is not string")
22+
}
23+
return checkedCorrelationId
24+
}
25+
}

quesma/core/quesma_apis.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
import (
6+
"context"
7+
"net"
8+
"net/http"
9+
)
10+
11+
type Router interface {
12+
Cloner
13+
AddRoute(path string, handler HTTPFrontendHandler)
14+
GetHandlers() map[string]HandlersPipe
15+
SetHandlers(handlers map[string]HandlersPipe)
16+
Multiplexer() *http.ServeMux
17+
}
18+
19+
type FrontendConnector interface {
20+
Listen() error // Start listening on the endpoint
21+
GetEndpoint() string
22+
Stop(ctx context.Context) error // Stop listening
23+
}
24+
25+
type HTTPFrontendConnector interface {
26+
FrontendConnector
27+
AddRouter(router Router)
28+
GetRouter() Router
29+
}
30+
31+
type TCPFrontendConnector interface {
32+
FrontendConnector
33+
AddConnectionHandler(handler TCPConnectionHandler)
34+
GetConnectionHandler() TCPConnectionHandler
35+
}
36+
37+
type TCPConnectionHandler interface {
38+
HandleConnection(conn net.Conn) error
39+
SetHandlers(processor []Processor)
40+
}
41+
42+
type CompoundProcessor interface {
43+
AddProcessor(proc Processor)
44+
GetProcessors() []Processor
45+
}
46+
47+
type PipelineBuilder interface {
48+
AddFrontendConnector(conn FrontendConnector)
49+
GetFrontendConnectors() []FrontendConnector
50+
AddBackendConnector(conn BackendConnector)
51+
GetBackendConnectors() map[BackendConnectorType]BackendConnector
52+
CompoundProcessor
53+
Build() PipelineBuilder
54+
Start()
55+
}
56+
57+
type QuesmaBuilder interface {
58+
AddPipeline(pipeline PipelineBuilder)
59+
GetPipelines() []PipelineBuilder
60+
Build() (QuesmaBuilder, error)
61+
Start()
62+
Stop(ctx context.Context)
63+
}
64+
65+
type Processor interface {
66+
CompoundProcessor
67+
GetId() string
68+
Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error)
69+
SetBackendConnectors(conns map[BackendConnectorType]BackendConnector)
70+
GetBackendConnector(connectorType BackendConnectorType) BackendConnector
71+
GetSupportedBackendConnectors() []BackendConnectorType
72+
}
73+
74+
type Rows interface {
75+
Next() bool
76+
Scan(dest ...interface{}) error
77+
Close()
78+
Err() error
79+
}
80+
81+
type BackendConnector interface {
82+
GetId() BackendConnectorType
83+
Open() error
84+
// Query executes a query that returns rows, typically a SELECT.
85+
Query(ctx context.Context, query string, args ...interface{}) (Rows, error)
86+
87+
// Exec executes a command that doesn't return rows, typically an INSERT, UPDATE, or DELETE.
88+
Exec(ctx context.Context, query string, args ...interface{}) error
89+
Close() error
90+
}

quesma/core/quesma_builder.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
import (
6+
"context"
7+
"fmt"
8+
)
9+
10+
type Quesma struct {
11+
pipelines []PipelineBuilder
12+
}
13+
14+
func NewQuesma() *Quesma {
15+
return &Quesma{
16+
pipelines: make([]PipelineBuilder, 0),
17+
}
18+
}
19+
20+
func (quesma *Quesma) AddPipeline(pipeline PipelineBuilder) {
21+
quesma.pipelines = append(quesma.pipelines, pipeline)
22+
}
23+
24+
func (quesma *Quesma) GetPipelines() []PipelineBuilder {
25+
return quesma.pipelines
26+
}
27+
28+
func (quesma *Quesma) Start() {
29+
for _, pipeline := range quesma.pipelines {
30+
pipeline.Start()
31+
}
32+
}
33+
34+
func (quesma *Quesma) Stop(ctx context.Context) {
35+
for _, pipeline := range quesma.pipelines {
36+
for _, conn := range pipeline.GetFrontendConnectors() {
37+
conn.Stop(ctx)
38+
}
39+
}
40+
for _, pipeline := range quesma.pipelines {
41+
for _, conn := range pipeline.GetBackendConnectors() {
42+
conn.Close()
43+
}
44+
}
45+
}
46+
47+
func (quesma *Quesma) Build() (QuesmaBuilder, error) {
48+
endpoints := make(map[string]struct{})
49+
handlers := make(map[string]HandlersPipe)
50+
51+
for _, pipeline := range quesma.pipelines {
52+
for _, conn := range pipeline.GetFrontendConnectors() {
53+
if httpConn, ok := conn.(HTTPFrontendConnector); ok {
54+
endpoints[conn.GetEndpoint()] = struct{}{}
55+
router := httpConn.GetRouter()
56+
for path, handlerWrapper := range router.GetHandlers() {
57+
handlerWrapper.Processors = append(handlerWrapper.Processors, pipeline.GetProcessors()...)
58+
handlers[path] = handlerWrapper
59+
}
60+
}
61+
}
62+
}
63+
if len(endpoints) == 1 {
64+
for _, pipeline := range quesma.pipelines {
65+
for _, conn := range pipeline.GetFrontendConnectors() {
66+
if httpConn, ok := conn.(HTTPFrontendConnector); ok {
67+
router := httpConn.GetRouter().Clone().(Router)
68+
if len(endpoints) == 1 {
69+
router.SetHandlers(handlers)
70+
}
71+
httpConn.AddRouter(router)
72+
73+
}
74+
}
75+
}
76+
}
77+
78+
for _, pipeline := range quesma.pipelines {
79+
backendConnectorTypesPerPipeline := make(map[BackendConnectorType]struct{})
80+
for _, conn := range pipeline.GetFrontendConnectors() {
81+
if tcpConn, ok := conn.(TCPFrontendConnector); ok {
82+
if len(pipeline.GetProcessors()) > 0 {
83+
tcpConn.GetConnectionHandler().SetHandlers(pipeline.GetProcessors())
84+
}
85+
}
86+
}
87+
backendConnectors := pipeline.GetBackendConnectors()
88+
for _, backendConnector := range backendConnectors {
89+
backendConnectorTypesPerPipeline[backendConnector.GetId()] = struct{}{}
90+
}
91+
for _, proc := range pipeline.GetProcessors() {
92+
supportedBackendConnectorsByProc := proc.GetSupportedBackendConnectors()
93+
for _, backendConnectorType := range supportedBackendConnectorsByProc {
94+
if _, ok := backendConnectorTypesPerPipeline[backendConnectorType]; !ok {
95+
return nil, fmt.Errorf("processor %v requires backend connector %v which is not provided", proc.GetId(), GetBackendConnectorNameFromType(backendConnectorType))
96+
}
97+
}
98+
proc.SetBackendConnectors(backendConnectors)
99+
}
100+
101+
}
102+
103+
return quesma, nil
104+
}

quesma/core/quesma_pipeline.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
type Pipeline struct {
6+
FrontendConnectors []FrontendConnector
7+
Processors []Processor
8+
BackendConnectors map[BackendConnectorType]BackendConnector
9+
}
10+
11+
func NewPipeline() *Pipeline {
12+
backendConnectors := make(map[BackendConnectorType]BackendConnector)
13+
backendConnectors[NoopBackend] = &NoopBackendConnector{}
14+
return &Pipeline{
15+
FrontendConnectors: make([]FrontendConnector, 0),
16+
Processors: make([]Processor, 0),
17+
BackendConnectors: backendConnectors,
18+
}
19+
}
20+
21+
func (p *Pipeline) AddFrontendConnector(conn FrontendConnector) {
22+
p.FrontendConnectors = append(p.FrontendConnectors, conn)
23+
}
24+
25+
func (p *Pipeline) AddProcessor(proc Processor) {
26+
p.Processors = append(p.Processors, proc)
27+
}
28+
29+
func (p *Pipeline) AddBackendConnector(conn BackendConnector) {
30+
p.BackendConnectors[conn.GetId()] = conn
31+
}
32+
33+
func (p *Pipeline) Build() PipelineBuilder {
34+
return p
35+
}
36+
37+
func (p *Pipeline) Start() {
38+
for _, conn := range p.FrontendConnectors {
39+
go conn.Listen()
40+
}
41+
}
42+
43+
func (p *Pipeline) GetFrontendConnectors() []FrontendConnector {
44+
return p.FrontendConnectors
45+
}
46+
47+
func (p *Pipeline) GetProcessors() []Processor {
48+
return p.Processors
49+
}
50+
51+
func (p *Pipeline) GetBackendConnectors() map[BackendConnectorType]BackendConnector {
52+
return p.BackendConnectors
53+
}

quesma/core/quesma_utils.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma_api
4+
5+
import (
6+
"fmt"
7+
)
8+
9+
type Cloner interface {
10+
Clone() Cloner
11+
}
12+
13+
func CheckedCast[T any](value interface{}) (T, error) {
14+
v, ok := value.(T)
15+
if !ok {
16+
var zero T
17+
return zero, fmt.Errorf("cannot cast %v to %T", value, zero)
18+
}
19+
return v, nil
20+
}
21+
22+
func SetInputType[T any]() any {
23+
return nil
24+
}

0 commit comments

Comments
 (0)