Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions v2/backend_connectors/mysql_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package backend_connectors

import (
"context"
"database/sql"
_ "github.com/go-sql-driver/mysql"
quesma_api "quesma_v2/core"
)

type MySqlRows struct {
rows *sql.Rows
}

func (p *MySqlRows) Next() bool {
return p.rows.Next()
}

func (p *MySqlRows) Scan(dest ...interface{}) error {
return p.rows.Scan(dest...)
}

func (p *MySqlRows) Close() {
err := p.rows.Close()
if err != nil {
panic(err)
}
}

func (p *MySqlRows) Err() error {
return p.rows.Err()
}

type MySqlBackendConnector struct {
Endpoint string
connection *sql.DB
}

func (p *MySqlBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.MySQLBackend
}

func (p *MySqlBackendConnector) Open() error {
conn, err := sql.Open("mysql", p.Endpoint)
if err != nil {
return err
}
err = conn.Ping()
if err != nil {
return err
}
p.connection = conn
return nil
}

func (p *MySqlBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close()
}

func (p *MySqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
rows, err := p.connection.QueryContext(context.Background(), query, args...)
if err != nil {
return nil, err
}
return &MySqlRows{rows: rows}, nil
}

func (p *MySqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.ExecContext(context.Background(), query)
return err
}
_, err := p.connection.ExecContext(context.Background(), query, args...)
return err
}
45 changes: 45 additions & 0 deletions v2/backend_connectors/postgres_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package backend_connectors

import (
"context"
"github.com/jackc/pgx/v4"
quesma_api "quesma_v2/core"
)

type PostgresBackendConnector struct {
Endpoint string
connection *pgx.Conn
}

func (p *PostgresBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.PgSQLBackend
}

func (p *PostgresBackendConnector) Open() error {
conn, err := pgx.Connect(context.Background(), p.Endpoint)
if err != nil {
return err
}
p.connection = conn
return nil
}

func (p *PostgresBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close(context.Background())
}

func (p *PostgresBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
return p.connection.Query(context.Background(), query, args...)
}

func (p *PostgresBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.Exec(context.Background(), query)
return err
}
_, err := p.connection.Exec(context.Background(), query, args...)
return err
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
147 changes: 147 additions & 0 deletions v2/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package frontend_connectors

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
quesma_api "quesma_v2/core"
"sync"
)

type HTTPRouter struct {
mux *http.ServeMux // Default HTTP multiplexer
handlers map[string]quesma_api.HandlersPipe // Map to store custom route handlers
mutex sync.RWMutex // Mutex for concurrent access to handlers
}

func NewHTTPRouter() *HTTPRouter {
return &HTTPRouter{
mux: http.NewServeMux(),
handlers: make(map[string]quesma_api.HandlersPipe),
}
}

// AddRoute adds a new route to the router
func (router *HTTPRouter) AddRoute(path string, handler quesma_api.HTTPFrontendHandler) {
router.mutex.Lock()
defer router.mutex.Unlock()
router.handlers[path] = quesma_api.HandlersPipe{Handler: handler}
fmt.Printf("Added route: %s\n", path)
}

func (router *HTTPRouter) Clone() quesma_api.Cloner {
newRouter := NewHTTPRouter()
router.mutex.Lock()
defer router.mutex.Unlock()
for path, handler := range router.handlers {
newRouter.handlers[path] = handler
}
return newRouter
}

func (router *HTTPRouter) GetHandlers() map[string]quesma_api.HandlersPipe {
router.mutex.RLock()
defer router.mutex.RUnlock()
callInfos := make(map[string]quesma_api.HandlersPipe)
for k, v := range router.handlers {
callInfos[k] = v
}
return callInfos
}

func (router *HTTPRouter) SetHandlers(handlers map[string]quesma_api.HandlersPipe) {
router.mutex.Lock()
defer router.mutex.Unlock()
for path, handler := range handlers {
router.handlers[path] = handler
}
}

func (router *HTTPRouter) Lock() {
router.mutex.Lock()
}

func (router *HTTPRouter) Unlock() {
router.mutex.Unlock()
}

func (router *HTTPRouter) Multiplexer() *http.ServeMux {
return router.mux
}

type BasicHTTPFrontendConnector struct {
listener *http.Server
router quesma_api.Router

endpoint string
}

func NewBasicHTTPFrontendConnector(endpoint string) *BasicHTTPFrontendConnector {
return &BasicHTTPFrontendConnector{
endpoint: endpoint,
}
}

func (h *BasicHTTPFrontendConnector) AddRouter(router quesma_api.Router) {
h.router = router
}

func (h *BasicHTTPFrontendConnector) GetRouter() quesma_api.Router {
return h.router
}

func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handlerWrapper, exists := h.router.GetHandlers()[req.URL.Path]
if !exists {
h.router.Multiplexer().ServeHTTP(w, req)
return
}
dispatcher := &quesma_api.Dispatcher{}
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
metadata, message, _ := handlerWrapper.Handler(req)

metadata, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message)
_, err := w.Write(message.([]byte))
if err != nil {
fmt.Printf("Error writing response: %s\n", err)
}
}).ServeHTTP(w, req)
}

func (h *BasicHTTPFrontendConnector) Listen() error {
h.listener = &http.Server{}
h.listener.Addr = h.endpoint
h.listener.Handler = h
go func() {
err := h.listener.ListenAndServe()
_ = err
}()

return nil
}

func (h *BasicHTTPFrontendConnector) Stop(ctx context.Context) error {
if h.listener == nil {
return nil
}
err := h.listener.Shutdown(ctx)
if err != nil {
return err
}
return h.listener.Close()
}

func (h *BasicHTTPFrontendConnector) GetEndpoint() string {
return h.endpoint
}

func ReadRequestBody(request *http.Request) ([]byte, error) {
reqBody, err := io.ReadAll(request.Body)
if err != nil {
return nil, err
}
request.Body = io.NopCloser(bytes.NewBuffer(reqBody))
return reqBody, nil
}
44 changes: 44 additions & 0 deletions v2/frontend_connectors/basic_tcp_connection_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package frontend_connectors

import (
"fmt"
"io"
"net"
quesma_api "quesma_v2/core"
)

type BasicTcpConnectionHandler struct {
processors []quesma_api.Processor
}

func (h *BasicTcpConnectionHandler) SetHandlers(processors []quesma_api.Processor) {
h.processors = processors
}

func (h *BasicTcpConnectionHandler) HandleConnection(conn net.Conn) error {
fmt.Println("Handling connection")
defer conn.Close()

// Example: Read data from the connection
buffer := make([]byte, 1024)
for {
n, err := conn.Read(buffer)
if err != nil {
if err == io.EOF {
fmt.Println("Connection closed")
} else {
fmt.Println("Error reading from connection:", err)
}
return err
}
for _, processor := range h.processors {
if processor != nil {
processor.Handle(nil, buffer[:n])
}
}
fmt.Printf("Received data: %s\n", string(buffer[:n]))

// Echo the data back (for demonstration purposes)
conn.Write(buffer[:n])
}
}
72 changes: 72 additions & 0 deletions v2/frontend_connectors/basic_tcp_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package frontend_connectors

import (
"context"
"fmt"
"net"
quesma_api "quesma_v2/core"
"sync/atomic"
)

type TCPListener struct {
listener net.Listener
Endpoint string
handler quesma_api.TCPConnectionHandler
isShutdown atomic.Bool
}

func NewTCPConnector(endpoint string) *TCPListener {
return &TCPListener{
Endpoint: endpoint,
}
}

func (t *TCPListener) Listen() error {
ln, err := net.Listen("tcp", t.Endpoint)
if err != nil {
return fmt.Errorf("failed to start TCP listener: %v", err)
}
t.listener = ln

// Start listening for incoming connections in a goroutine
go func() {
for {
conn, err := ln.Accept()
if err != nil {
if t.isShutdown.Load() {
return
}
fmt.Println("Failed to accept connection:", err)
continue
}
// Handle each connection in a separate goroutine to allow concurrent handling
go func() {
err := t.GetConnectionHandler().HandleConnection(conn)
if err != nil {
fmt.Println("Error handling connection:", err)
}
}()
}
}()
return nil
}

func (t *TCPListener) GetEndpoint() string {
return t.Endpoint
}

func (t *TCPListener) AddConnectionHandler(handler quesma_api.TCPConnectionHandler) {
t.handler = handler
}

func (t *TCPListener) GetConnectionHandler() quesma_api.TCPConnectionHandler {
return t.handler
}

func (t *TCPListener) Stop(ctx context.Context) error {
t.isShutdown.Store(true)
if t.listener != nil {
return t.listener.Close()
}
return nil
}
Loading
Loading