diff --git a/v2/backend_connectors/mysql_backend_connector.go b/v2/backend_connectors/mysql_backend_connector.go new file mode 100644 index 000000000..f4f88d6f3 --- /dev/null +++ b/v2/backend_connectors/mysql_backend_connector.go @@ -0,0 +1,77 @@ +package backend_connectors + +import ( + "context" + "database/sql" + _ "github.com/go-sql-driver/mysql" + quesma_api "quesma_v2/core" +) + +type MySqlRows struct { + rows *sql.Rows +} + +func (p *MySqlRows) Next() bool { + return p.rows.Next() +} + +func (p *MySqlRows) Scan(dest ...interface{}) error { + return p.rows.Scan(dest...) +} + +func (p *MySqlRows) Close() { + err := p.rows.Close() + if err != nil { + panic(err) + } +} + +func (p *MySqlRows) Err() error { + return p.rows.Err() +} + +type MySqlBackendConnector struct { + Endpoint string + connection *sql.DB +} + +func (p *MySqlBackendConnector) GetId() quesma_api.BackendConnectorType { + return quesma_api.MySQLBackend +} + +func (p *MySqlBackendConnector) Open() error { + conn, err := sql.Open("mysql", p.Endpoint) + if err != nil { + return err + } + err = conn.Ping() + if err != nil { + return err + } + p.connection = conn + return nil +} + +func (p *MySqlBackendConnector) Close() error { + if p.connection == nil { + return nil + } + return p.connection.Close() +} + +func (p *MySqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { + rows, err := p.connection.QueryContext(context.Background(), query, args...) + if err != nil { + return nil, err + } + return &MySqlRows{rows: rows}, nil +} + +func (p *MySqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + if len(args) == 0 { + _, err := p.connection.ExecContext(context.Background(), query) + return err + } + _, err := p.connection.ExecContext(context.Background(), query, args...) + return err +} diff --git a/v2/backend_connectors/postgres_backend_connector.go b/v2/backend_connectors/postgres_backend_connector.go new file mode 100644 index 000000000..885f77acd --- /dev/null +++ b/v2/backend_connectors/postgres_backend_connector.go @@ -0,0 +1,45 @@ +package backend_connectors + +import ( + "context" + "github.com/jackc/pgx/v4" + quesma_api "quesma_v2/core" +) + +type PostgresBackendConnector struct { + Endpoint string + connection *pgx.Conn +} + +func (p *PostgresBackendConnector) GetId() quesma_api.BackendConnectorType { + return quesma_api.PgSQLBackend +} + +func (p *PostgresBackendConnector) Open() error { + conn, err := pgx.Connect(context.Background(), p.Endpoint) + if err != nil { + return err + } + p.connection = conn + return nil +} + +func (p *PostgresBackendConnector) Close() error { + if p.connection == nil { + return nil + } + return p.connection.Close(context.Background()) +} + +func (p *PostgresBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { + return p.connection.Query(context.Background(), query, args...) +} + +func (p *PostgresBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + if len(args) == 0 { + _, err := p.connection.Exec(context.Background(), query) + return err + } + _, err := p.connection.Exec(context.Background(), query, args...) + return err +} diff --git a/v2/backend_connectors.go b/v2/core/backend_connectors.go similarity index 100% rename from v2/backend_connectors.go rename to v2/core/backend_connectors.go diff --git a/v2/dispatch.go b/v2/core/dispatch.go similarity index 100% rename from v2/dispatch.go rename to v2/core/dispatch.go diff --git a/v2/metadata_api.go b/v2/core/metadata_api.go similarity index 100% rename from v2/metadata_api.go rename to v2/core/metadata_api.go diff --git a/v2/quesma_apis.go b/v2/core/quesma_apis.go similarity index 100% rename from v2/quesma_apis.go rename to v2/core/quesma_apis.go diff --git a/v2/quesma_builder.go b/v2/core/quesma_builder.go similarity index 100% rename from v2/quesma_builder.go rename to v2/core/quesma_builder.go diff --git a/v2/quesma_pipeline.go b/v2/core/quesma_pipeline.go similarity index 100% rename from v2/quesma_pipeline.go rename to v2/core/quesma_pipeline.go diff --git a/v2/quesma_utils.go b/v2/core/quesma_utils.go similarity index 100% rename from v2/quesma_utils.go rename to v2/core/quesma_utils.go diff --git a/v2/frontend_connectors/basic_http_frontend_connector.go b/v2/frontend_connectors/basic_http_frontend_connector.go new file mode 100644 index 000000000..3a3599852 --- /dev/null +++ b/v2/frontend_connectors/basic_http_frontend_connector.go @@ -0,0 +1,147 @@ +package frontend_connectors + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + quesma_api "quesma_v2/core" + "sync" +) + +type HTTPRouter struct { + mux *http.ServeMux // Default HTTP multiplexer + handlers map[string]quesma_api.HandlersPipe // Map to store custom route handlers + mutex sync.RWMutex // Mutex for concurrent access to handlers +} + +func NewHTTPRouter() *HTTPRouter { + return &HTTPRouter{ + mux: http.NewServeMux(), + handlers: make(map[string]quesma_api.HandlersPipe), + } +} + +// AddRoute adds a new route to the router +func (router *HTTPRouter) AddRoute(path string, handler quesma_api.HTTPFrontendHandler) { + router.mutex.Lock() + defer router.mutex.Unlock() + router.handlers[path] = quesma_api.HandlersPipe{Handler: handler} + fmt.Printf("Added route: %s\n", path) +} + +func (router *HTTPRouter) Clone() quesma_api.Cloner { + newRouter := NewHTTPRouter() + router.mutex.Lock() + defer router.mutex.Unlock() + for path, handler := range router.handlers { + newRouter.handlers[path] = handler + } + return newRouter +} + +func (router *HTTPRouter) GetHandlers() map[string]quesma_api.HandlersPipe { + router.mutex.RLock() + defer router.mutex.RUnlock() + callInfos := make(map[string]quesma_api.HandlersPipe) + for k, v := range router.handlers { + callInfos[k] = v + } + return callInfos +} + +func (router *HTTPRouter) SetHandlers(handlers map[string]quesma_api.HandlersPipe) { + router.mutex.Lock() + defer router.mutex.Unlock() + for path, handler := range handlers { + router.handlers[path] = handler + } +} + +func (router *HTTPRouter) Lock() { + router.mutex.Lock() +} + +func (router *HTTPRouter) Unlock() { + router.mutex.Unlock() +} + +func (router *HTTPRouter) Multiplexer() *http.ServeMux { + return router.mux +} + +type BasicHTTPFrontendConnector struct { + listener *http.Server + router quesma_api.Router + + endpoint string +} + +func NewBasicHTTPFrontendConnector(endpoint string) *BasicHTTPFrontendConnector { + return &BasicHTTPFrontendConnector{ + endpoint: endpoint, + } +} + +func (h *BasicHTTPFrontendConnector) AddRouter(router quesma_api.Router) { + h.router = router +} + +func (h *BasicHTTPFrontendConnector) GetRouter() quesma_api.Router { + return h.router +} + +func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { + handlerWrapper, exists := h.router.GetHandlers()[req.URL.Path] + if !exists { + h.router.Multiplexer().ServeHTTP(w, req) + return + } + dispatcher := &quesma_api.Dispatcher{} + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + metadata, message, _ := handlerWrapper.Handler(req) + + metadata, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message) + _, err := w.Write(message.([]byte)) + if err != nil { + fmt.Printf("Error writing response: %s\n", err) + } + }).ServeHTTP(w, req) +} + +func (h *BasicHTTPFrontendConnector) Listen() error { + h.listener = &http.Server{} + h.listener.Addr = h.endpoint + h.listener.Handler = h + go func() { + err := h.listener.ListenAndServe() + _ = err + }() + + return nil +} + +func (h *BasicHTTPFrontendConnector) Stop(ctx context.Context) error { + if h.listener == nil { + return nil + } + err := h.listener.Shutdown(ctx) + if err != nil { + return err + } + return h.listener.Close() +} + +func (h *BasicHTTPFrontendConnector) GetEndpoint() string { + return h.endpoint +} + +func ReadRequestBody(request *http.Request) ([]byte, error) { + reqBody, err := io.ReadAll(request.Body) + if err != nil { + return nil, err + } + request.Body = io.NopCloser(bytes.NewBuffer(reqBody)) + return reqBody, nil +} diff --git a/v2/frontend_connectors/basic_tcp_connection_handler.go b/v2/frontend_connectors/basic_tcp_connection_handler.go new file mode 100644 index 000000000..451000cc0 --- /dev/null +++ b/v2/frontend_connectors/basic_tcp_connection_handler.go @@ -0,0 +1,44 @@ +package frontend_connectors + +import ( + "fmt" + "io" + "net" + quesma_api "quesma_v2/core" +) + +type BasicTcpConnectionHandler struct { + processors []quesma_api.Processor +} + +func (h *BasicTcpConnectionHandler) SetHandlers(processors []quesma_api.Processor) { + h.processors = processors +} + +func (h *BasicTcpConnectionHandler) HandleConnection(conn net.Conn) error { + fmt.Println("Handling connection") + defer conn.Close() + + // Example: Read data from the connection + buffer := make([]byte, 1024) + for { + n, err := conn.Read(buffer) + if err != nil { + if err == io.EOF { + fmt.Println("Connection closed") + } else { + fmt.Println("Error reading from connection:", err) + } + return err + } + for _, processor := range h.processors { + if processor != nil { + processor.Handle(nil, buffer[:n]) + } + } + fmt.Printf("Received data: %s\n", string(buffer[:n])) + + // Echo the data back (for demonstration purposes) + conn.Write(buffer[:n]) + } +} diff --git a/v2/frontend_connectors/basic_tcp_connector.go b/v2/frontend_connectors/basic_tcp_connector.go new file mode 100644 index 000000000..758e11863 --- /dev/null +++ b/v2/frontend_connectors/basic_tcp_connector.go @@ -0,0 +1,72 @@ +package frontend_connectors + +import ( + "context" + "fmt" + "net" + quesma_api "quesma_v2/core" + "sync/atomic" +) + +type TCPListener struct { + listener net.Listener + Endpoint string + handler quesma_api.TCPConnectionHandler + isShutdown atomic.Bool +} + +func NewTCPConnector(endpoint string) *TCPListener { + return &TCPListener{ + Endpoint: endpoint, + } +} + +func (t *TCPListener) Listen() error { + ln, err := net.Listen("tcp", t.Endpoint) + if err != nil { + return fmt.Errorf("failed to start TCP listener: %v", err) + } + t.listener = ln + + // Start listening for incoming connections in a goroutine + go func() { + for { + conn, err := ln.Accept() + if err != nil { + if t.isShutdown.Load() { + return + } + fmt.Println("Failed to accept connection:", err) + continue + } + // Handle each connection in a separate goroutine to allow concurrent handling + go func() { + err := t.GetConnectionHandler().HandleConnection(conn) + if err != nil { + fmt.Println("Error handling connection:", err) + } + }() + } + }() + return nil +} + +func (t *TCPListener) GetEndpoint() string { + return t.Endpoint +} + +func (t *TCPListener) AddConnectionHandler(handler quesma_api.TCPConnectionHandler) { + t.handler = handler +} + +func (t *TCPListener) GetConnectionHandler() quesma_api.TCPConnectionHandler { + return t.handler +} + +func (t *TCPListener) Stop(ctx context.Context) error { + t.isShutdown.Store(true) + if t.listener != nil { + return t.listener.Close() + } + return nil +} diff --git a/v2/frontend_connectors/passthrough_tcp_connection_handler.go b/v2/frontend_connectors/passthrough_tcp_connection_handler.go new file mode 100644 index 000000000..c783e1082 --- /dev/null +++ b/v2/frontend_connectors/passthrough_tcp_connection_handler.go @@ -0,0 +1,51 @@ +package frontend_connectors + +import ( + "fmt" + "io" + "net" + quesma_api "quesma_v2/core" +) + +func (p *PassThroughConnectionHandler) copyData(src io.Reader, dest io.Writer) { + if _, err := io.Copy(dest, src); err != nil { + fmt.Printf("Error copying data: %v", err) + } +} + +type PassThroughConnectionHandler struct { + endpoint string +} + +func NewPassThroughConnectionHandler(endpoint string) *PassThroughConnectionHandler { + return &PassThroughConnectionHandler{ + endpoint: endpoint, + } +} + +func (p *PassThroughConnectionHandler) SetHandlers(processors []quesma_api.Processor) { +} + +func closeConnection(connection net.Conn) { + if err := connection.Close(); err != nil { + fmt.Printf("Error closing connection: %v", err) + } +} +func (p *PassThroughConnectionHandler) handle(fromConn, destConn net.Conn) { + fmt.Println("handle:", fromConn.RemoteAddr(), "->", destConn.RemoteAddr()) + defer closeConnection(fromConn) + defer closeConnection(destConn) + go p.copyData(fromConn, destConn) + p.copyData(destConn, fromConn) +} + +func (p *PassThroughConnectionHandler) HandleConnection(fromConn net.Conn) error { + fmt.Println("Tcp connection handler") + destConn, err := net.Dial("tcp", p.endpoint) + if err != nil { + closeConnection(fromConn) + return err + } + p.handle(fromConn, destConn) + return nil +} diff --git a/v2/frontend_connectors/tcp_postgres_connection_handler.go b/v2/frontend_connectors/tcp_postgres_connection_handler.go new file mode 100644 index 000000000..285db0101 --- /dev/null +++ b/v2/frontend_connectors/tcp_postgres_connection_handler.go @@ -0,0 +1,79 @@ +package frontend_connectors + +import ( + "fmt" + "github.com/jackc/pgx/v5/pgproto3" + "net" + quesma_api "quesma_v2/core" +) + +type TcpPostgresConnectionHandler struct { + processors []quesma_api.Processor +} + +func (p *TcpPostgresConnectionHandler) HandleConnection(conn net.Conn) error { + backend := pgproto3.NewBackend(conn, conn) + defer p.close(conn) + + err := p.handleStartup(conn, backend) + if err != nil { + return err + } + + dispatcher := quesma_api.Dispatcher{} + + for { + msg, err := backend.Receive() + if err != nil { + return fmt.Errorf("error receiving message: %w", err) + } + var resp any = msg + metadata := make(map[string]interface{}) + metadata, resp = dispatcher.Dispatch(p.processors, metadata, resp) + if resp != nil { + _, err = conn.Write(resp.([]byte)) + } + } +} + +func (p *TcpPostgresConnectionHandler) handleStartup(conn net.Conn, backend *pgproto3.Backend) error { + startupMessage, err := backend.ReceiveStartupMessage() + if err != nil { + return fmt.Errorf("error receiving startup message: %w", err) + } + + switch startupMessage.(type) { + case *pgproto3.StartupMessage: + buf := mustEncode((&pgproto3.AuthenticationOk{}).Encode(nil)) + buf = mustEncode((&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)) + _, err = conn.Write(buf) + if err != nil { + return fmt.Errorf("error sending ready for query: %w", err) + } + case *pgproto3.SSLRequest: + _, err = conn.Write([]byte("N")) + if err != nil { + return fmt.Errorf("error sending deny SSL request: %w", err) + } + return p.handleStartup(conn, backend) + default: + return fmt.Errorf("unknown startup message: %#v", startupMessage) + } + + return nil +} + +func (p *TcpPostgresConnectionHandler) close(conn net.Conn) error { + return conn.Close() +} + +func mustEncode(buf []byte, err error) []byte { + if err != nil { + panic(err) + } + return buf +} + +func (h *TcpPostgresConnectionHandler) SetHandlers(processors []quesma_api.Processor) { + h.processors = processors +} diff --git a/v2/go.mod b/v2/go.mod new file mode 100644 index 000000000..6a1e6e3cb --- /dev/null +++ b/v2/go.mod @@ -0,0 +1,29 @@ +module quesma_v2 + +go 1.23.2 + +require ( + github.com/go-sql-driver/mysql v1.8.1 + github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 + github.com/jackc/pgx/v4 v4.18.3 + github.com/jackc/pgx/v5 v5.7.1 + github.com/stretchr/testify v1.10.0 +) + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgconn v1.14.3 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.3.3 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgtype v1.14.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/text v0.18.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/v2/go.sum b/v2/go.sum new file mode 100644 index 000000000..59f0a59dc --- /dev/null +++ b/v2/go.sum @@ -0,0 +1,196 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= +github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= +github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= +github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= +github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= +github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= +github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= +github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= +github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= +github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= +github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM= +github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= +github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= +github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= +github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= +github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= +github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= +github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/v2/processors/ab_test_processor.go b/v2/processors/ab_test_processor.go new file mode 100644 index 000000000..05884310e --- /dev/null +++ b/v2/processors/ab_test_processor.go @@ -0,0 +1,105 @@ +package processors + +import ( + "encoding/json" + "fmt" + "github.com/google/go-cmp/cmp" + quesma_api "quesma_v2/core" + "strconv" +) + +type ABTestProcessor struct { + Id string + BaseProcessor + messageStorage map[string][][]byte + doResultComparison bool +} + +func NewABTestProcessor(id string, doResultComparison bool) *ABTestProcessor { + return &ABTestProcessor{ + Id: id, + BaseProcessor: NewBaseProcessor(), + messageStorage: make(map[string][][]byte), + doResultComparison: doResultComparison, + } +} + +func (p *ABTestProcessor) GetId() string { + return p.Id +} + +func (p *ABTestProcessor) compare(json1 string, json2 string) (bool, string) { + var obj1, obj2 map[string]interface{} + err := json.Unmarshal([]byte(json1), &obj1) + if err != nil { + fmt.Println("Error unmarshalling JSON1:", err) + return false, "" + } + json.Unmarshal([]byte(json2), &obj2) + if err != nil { + fmt.Println("Error unmarshalling JSON2:", err) + return false, "" + } + + diff := cmp.Diff(obj1, obj2) + if diff == "" { + fmt.Println("JSON objects are equal") + return true, "" + } + fmt.Println("JSON objects are not equal:", diff) + return false, diff +} + +func (p *ABTestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + + for _, m := range message { + mCasted, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("ABTestProcessor: invalid message type") + } + data = append(data, mCasted...) + level := metadata["level"].(int) + correlationId := quesma_api.GetCorrelationId(metadata) + currentSlice, exists := p.messageStorage[correlationId] + if !exists { + currentSlice = [][]byte{} + } + currentSlice = append(currentSlice, mCasted) + p.messageStorage[correlationId] = currentSlice + + data = append(data, strconv.Itoa(level)...) + data = append(data, []byte(p.GetId())...) + data = append(data, []byte(",correlationId:")...) + data = append(data, []byte(correlationId)...) + data = append(data, []byte("\n")...) + } + + if !p.doResultComparison { + return metadata, data, nil + } + resp := make([]byte, 0) + for _, messages := range p.messageStorage { + if len(messages) == 2 { + equal, diff := p.compare(string(messages[0]), string(messages[1])) + if equal { + resp = append(resp, []byte("ABTestProcessor processor: Responses are equal\n\n")...) + resp = append(resp, []byte("\n")...) + resp = append(resp, []byte(diff)...) + + } else { + resp = append(resp, []byte("ABTestProcessor processor: Responses are not equal\n\n")...) + resp = append(resp, []byte("\n")...) + resp = append(resp, []byte(diff)...) + } + // clean storage + p.messageStorage = make(map[string][][]byte) + } + } + + return metadata, resp, nil +} + +func (p *ABTestProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.NoopBackend} +} diff --git a/v2/processors/base_processor.go b/v2/processors/base_processor.go new file mode 100644 index 000000000..e46a06e3e --- /dev/null +++ b/v2/processors/base_processor.go @@ -0,0 +1,38 @@ +package processors + +import quesma_api "quesma_v2/core" + +type BaseProcessor struct { + InnerProcessors []quesma_api.Processor + BackendConnectors map[quesma_api.BackendConnectorType]quesma_api.BackendConnector +} + +func NewBaseProcessor() BaseProcessor { + return BaseProcessor{ + InnerProcessors: make([]quesma_api.Processor, 0), + BackendConnectors: make(map[quesma_api.BackendConnectorType]quesma_api.BackendConnector), + } +} + +func (p *BaseProcessor) AddProcessor(proc quesma_api.Processor) { + p.InnerProcessors = append(p.InnerProcessors, proc) +} + +func (p *BaseProcessor) GetProcessors() []quesma_api.Processor { + return p.InnerProcessors +} + +func (p *BaseProcessor) SetBackendConnectors(conns map[quesma_api.BackendConnectorType]quesma_api.BackendConnector) { + p.BackendConnectors = conns +} + +func (p *BaseProcessor) GetBackendConnector(connectorType quesma_api.BackendConnectorType) quesma_api.BackendConnector { + if conn, ok := p.BackendConnectors[connectorType]; ok { + return conn + } + return nil +} + +func (p *BaseProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.NoopBackend} +} diff --git a/v2/processors/basic_tcp_processor.go b/v2/processors/basic_tcp_processor.go new file mode 100644 index 000000000..8eabac1d3 --- /dev/null +++ b/v2/processors/basic_tcp_processor.go @@ -0,0 +1,25 @@ +package processors + +import ( + "fmt" +) + +type TcpProcessor struct { + BaseProcessor +} + +func NewTcpProcessor() *TcpProcessor { + return &TcpProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} + +func (p *TcpProcessor) GetId() string { + return "tcp" +} + +func (p *TcpProcessor) Handle(metadata map[string]interface{}, message any) (map[string]interface{}, any, error) { + fmt.Println("TCP processor") + data := message.([]byte) + return metadata, data, nil +} diff --git a/v2/processors/mysql_ingest_processor.go b/v2/processors/mysql_ingest_processor.go new file mode 100644 index 000000000..79e2310fe --- /dev/null +++ b/v2/processors/mysql_ingest_processor.go @@ -0,0 +1,73 @@ +package processors + +import ( + "context" + "fmt" + "github.com/google/uuid" + quesma_api "quesma_v2/core" +) + +type MySqlIngestProcessor struct { + BaseProcessor +} + +func NewMySqlIngestProcessor() *MySqlIngestProcessor { + return &MySqlIngestProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} + +func (p *MySqlIngestProcessor) GetId() string { + return "postgresingest" +} + +func (p *MySqlIngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + for _, m := range message { + mCasted, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("MySqlIngestProcessor: invalid message type") + } + data = mCasted + fmt.Println("MySqlIngestProcessor processor ") + data = append(data, []byte("\nProcessed by MySqlIngestProcessor processor\n")...) + data = append(data, []byte("\t|\n")...) + backendConn := p.GetBackendConnector(quesma_api.MySQLBackend) + if backendConn != nil { + fmt.Println("Backend connector found") + err := backendConn.Open() + if err != nil { + fmt.Printf("Error opening connection: %v", err) + return nil, nil, err + } + // Create table SQL statement + createTableQuery := ` + CREATE TABLE IF NOT EXISTS users ( + id INT AUTO_INCREMENT PRIMARY KEY, + username VARCHAR(50) NOT NULL, + email VARCHAR(100) NOT NULL UNIQUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + );` + // Execute the query + err = backendConn.Exec(context.Background(), createTableQuery) + if err != nil { + fmt.Printf("Failed to create table: %v\n", err) + return nil, nil, err + } + id := uuid.New() + username := "user" + id.String() + email := username + "@quesma.com" + // Execute the insert statement directly using Exec + err = backendConn.Exec(context.Background(), "INSERT INTO users (username, email) VALUES (?, ?)", username, email) + data = append(data, []byte(fmt.Sprintf("\tUser: ID=%s, Username=%s, Email=%s, CreatedAt=\n", id, username, email))...) + if err != nil { + fmt.Println(err) + } + } + } + return metadata, data, nil +} + +func (p *MySqlIngestProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.MySQLBackend} +} diff --git a/v2/processors/mysql_query_processor.go b/v2/processors/mysql_query_processor.go new file mode 100644 index 000000000..78a397669 --- /dev/null +++ b/v2/processors/mysql_query_processor.go @@ -0,0 +1,87 @@ +package processors + +import ( + "context" + "fmt" + "log" + quesma_api "quesma_v2/core" +) + +type MySqlQueryProcessor struct { + BaseProcessor +} + +func NewMySqlQueryProcessor() *MySqlQueryProcessor { + return &MySqlQueryProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} + +func (p *MySqlQueryProcessor) GetId() string { + return "mysqlquery" +} + +func (p *MySqlQueryProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + for _, m := range message { + mCasted, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("MySqlIngestProcessor: invalid message type") + } + data = mCasted + fmt.Println("MySqlQuery processor ") + data = append(data, []byte("Processed by MySql Query processor\n")...) + data = append(data, []byte("\t|\n")...) + + backendConn := p.GetBackendConnector(quesma_api.MySQLBackend) + if backendConn != nil { + fmt.Println("Backend connector found") + err := backendConn.Open() + if err != nil { + fmt.Printf("Error opening connection: %v", err) + return nil, nil, err + } + // SQL query to select all users + query := `SELECT id, username, email FROM users` + + // Execute the query + rows, err := backendConn.Query(context.Background(), query) + if err != nil { + fmt.Printf("Failed to execute query: %v\n", err) + return nil, nil, err + } + defer rows.Close() + // Iterate over the rows + + for rows.Next() { + var id int + var username, email string + err = rows.Scan(&id, &username, &email) + if err != nil { + log.Fatalf("Failed to scan row: %v\n", err) + } + + res := fmt.Sprintf("\tUser: ID=%d, Username=%s, Email=%s, CreatedAt=\n", id, username, email) + fmt.Println(res) + data = append(data, []byte(res)...) + } + + // Check for any error that occurred during row iteration + if err = rows.Err(); err != nil { + log.Fatalf("Row iteration error: %v\n", err) + } + + err = backendConn.Close() + if err != nil { + fmt.Printf("Error closing connection: %v", err) + return nil, nil, err + } + + } + } + return metadata, data, nil +} + +func (p *MySqlQueryProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.MySQLBackend} +} diff --git a/v2/processors/postgres_ingest_processor.go b/v2/processors/postgres_ingest_processor.go new file mode 100644 index 000000000..2e6f711a0 --- /dev/null +++ b/v2/processors/postgres_ingest_processor.go @@ -0,0 +1,85 @@ +package processors + +import ( + "context" + "fmt" + "github.com/google/uuid" + "log" + quesma_api "quesma_v2/core" +) + +type PostgresIngestProcessor struct { + BaseProcessor +} + +func NewPostgresIngestProcessor() *PostgresIngestProcessor { + return &PostgresIngestProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} + +func (p *PostgresIngestProcessor) GetId() string { + return "postgresingest" +} + +func (p *PostgresIngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + for _, m := range message { + mCasted, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("PostgresQueryProcessor: invalid message type") + } + data = mCasted + fmt.Println("PostgresIngest processor ") + data = append(data, []byte("\nProcessed by PostgresIngest processor\n")...) + data = append(data, []byte("\t|\n")...) + backendConn := p.GetBackendConnector(quesma_api.PgSQLBackend) + if backendConn == nil { + fmt.Println("Backend connector not found") + return metadata, data, nil + } + + err = backendConn.Open() + if err != nil { + fmt.Printf("Error opening connection: %v", err) + return nil, nil, err + } + createTableSQL := ` + CREATE TABLE IF NOT EXISTS users ( + id SERIAL PRIMARY KEY, + username VARCHAR(50) NOT NULL, + email VARCHAR(100) NOT NULL UNIQUE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + ` + + err = backendConn.Exec(context.Background(), createTableSQL) + if err != nil { + log.Fatalf("Failed to create table: %v\n", err) + } + + id := uuid.New() + username := "user" + id.String() + email := username + "@quesma.com" + + // Insert data into the users table + insertSQL := `INSERT INTO users (username, email) VALUES ($1, $2)` + + err = backendConn.Exec(context.Background(), insertSQL, username, email) + if err != nil { + fmt.Printf("Error inserting data: %v", err) + return nil, nil, err + } + data = append(data, []byte(fmt.Sprintf("\tUser: ID=%s, Username=%s, Email=%s, CreatedAt=\n", id, username, email))...) + err = backendConn.Close() + if err != nil { + fmt.Printf("Error closing connection: %v", err) + return nil, nil, err + } + } + return metadata, data, nil +} + +func (p *PostgresIngestProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.PgSQLBackend} +} diff --git a/v2/processors/postgres_query_processor.go b/v2/processors/postgres_query_processor.go new file mode 100644 index 000000000..4e0fdb145 --- /dev/null +++ b/v2/processors/postgres_query_processor.go @@ -0,0 +1,86 @@ +package processors + +import ( + "context" + "fmt" + "log" + quesma_api "quesma_v2/core" +) + +type PostgresQueryProcessor struct { + BaseProcessor +} + +func NewPostgresQueryProcessor() *PostgresQueryProcessor { + return &PostgresQueryProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} + +func (p *PostgresQueryProcessor) GetId() string { + return "postgresquery" +} + +func (p *PostgresQueryProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + for _, m := range message { + mCasted, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("PostgresQueryProcessor: invalid message type") + } + data = mCasted + fmt.Println("PostgresQuery processor ") + data = append(data, []byte("Processed by Postgres Query processor\n")...) + data = append(data, []byte("\t|\n")...) + backendConn := p.GetBackendConnector(quesma_api.PgSQLBackend) + if backendConn == nil { + fmt.Println("Backend connector not found") + return metadata, data, nil + } + fmt.Println("Backend connector found") + err = backendConn.Open() + if err != nil { + fmt.Printf("Error opening connection: %v", err) + return nil, nil, err + } + // SQL query to select all users + query := `SELECT id, username, email FROM users` + + // Execute the query + rows, err := backendConn.Query(context.Background(), query) + if err != nil { + fmt.Printf("Failed to execute query: %v\n", err) + return nil, nil, err + } + defer rows.Close() + // Iterate over the rows + + for rows.Next() { + var id int + var username, email string + err = rows.Scan(&id, &username, &email) + if err != nil { + log.Fatalf("Failed to scan row: %v\n", err) + } + res := fmt.Sprintf("\tUser: ID=%d, Username=%s, Email=%s, CreatedAt=\n", id, username, email) + fmt.Println(res) + data = append(data, []byte(res)...) + } + + // Check for any error that occurred during row iteration + if err = rows.Err(); err != nil { + log.Fatalf("Row iteration error: %v\n", err) + } + + err = backendConn.Close() + if err != nil { + fmt.Printf("Error closing connection: %v", err) + return nil, nil, err + } + } + return metadata, data, nil +} + +func (p *PostgresQueryProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.PgSQLBackend} +} diff --git a/v2/processors/postgres_to_mysql_processor.go b/v2/processors/postgres_to_mysql_processor.go new file mode 100644 index 000000000..3a52b7be4 --- /dev/null +++ b/v2/processors/postgres_to_mysql_processor.go @@ -0,0 +1,119 @@ +package processors + +import ( + "context" + "fmt" + "github.com/jackc/pgx/v5/pgproto3" + "log" + quesma_api "quesma_v2/core" +) + +type PostgresToMySqlProcessor struct { + BaseProcessor +} + +func NewPostgresToMySqlProcessor() *PostgresToMySqlProcessor { + return &PostgresToMySqlProcessor{ + BaseProcessor: NewBaseProcessor(), + } +} + +func (p *PostgresToMySqlProcessor) GetId() string { + return "postgrestomysql_processor" +} + +func (p *PostgresToMySqlProcessor) respond() ([]byte, error) { + result := []byte("QUESMA\n") + backendConn := p.GetBackendConnector(quesma_api.MySQLBackend) + if backendConn == nil { + return result, nil + } + fmt.Println("Backend connector found") + err := backendConn.Open() + if err != nil { + fmt.Printf("Error opening connection: %v", err) + } + // SQL query to select all users + query := `SELECT id, username, email FROM users` + + // Execute the query + rows, err := backendConn.Query(context.Background(), query) + if err != nil { + fmt.Printf("Failed to execute query: %v\n", err) + } + defer rows.Close() + // Iterate over the rows + + for rows.Next() { + var id int + var username, email string + err = rows.Scan(&id, &username, &email) + if err != nil { + log.Fatalf("Failed to scan row: %v\n", err) + } + + res := fmt.Sprintf("User: ID=%d, Username=%s, Email=%s, CreatedAt=\n", id, username, email) + fmt.Println(res) + result = append(result, []byte(res)...) + } + + // Check for any error that occurred during row iteration + if err = rows.Err(); err != nil { + log.Fatalf("Row iteration error: %v\n", err) + } + + err = backendConn.Close() + if err != nil { + fmt.Printf("Error closing connection: %v", err) + } + + return result, nil +} + +func (p *PostgresToMySqlProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + fmt.Println("PostgresToMySql processor ") + for _, m := range message { + msg := m.(pgproto3.FrontendMessage) + switch msg.(type) { + case *pgproto3.Query: + response, err := p.respond() + if err != nil { + return metadata, nil, fmt.Errorf("error generating query response: %w", err) + } + + buf := mustEncode((&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{ + { + Name: []byte("quesma"), + TableOID: 0, + TableAttributeNumber: 0, + DataTypeOID: 25, + DataTypeSize: -1, + TypeModifier: -1, + Format: 0, + }, + }}).Encode(nil)) + buf = mustEncode((&pgproto3.DataRow{Values: [][]byte{response}}).Encode(buf)) + buf = mustEncode((&pgproto3.CommandComplete{CommandTag: []byte("")}).Encode(buf)) + buf = mustEncode((&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)) + return metadata, buf, nil + case *pgproto3.Terminate: + return metadata, nil, nil + + default: + fmt.Println("Received other than query") + return metadata, nil, fmt.Errorf("received message other than Query from client: %#v", msg) + } + } + return metadata, nil, nil +} + +func mustEncode(buf []byte, err error) []byte { + if err != nil { + panic(err) + } + return buf +} + +func (p *PostgresToMySqlProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.MySQLBackend} +} diff --git a/v2/test_utils.go b/v2/test_utils.go new file mode 100644 index 000000000..5dd6e03b1 --- /dev/null +++ b/v2/test_utils.go @@ -0,0 +1,29 @@ +package v2 + +import ( + "bytes" + "fmt" + "io" + "net/http" +) + +func sendRequest(url string, requestBody []byte) { + // Send POST request + resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody)) + if err != nil { + fmt.Println("Error sending request:", err) + return + } + defer resp.Body.Close() + if err != nil { + fmt.Println(err) + } else { + respBody, err := io.ReadAll(resp.Body) + resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(string(respBody)) + } + } +} diff --git a/v2/v2_test.go b/v2/v2_test.go new file mode 100644 index 000000000..6248b0ae9 --- /dev/null +++ b/v2/v2_test.go @@ -0,0 +1,95 @@ +package v2 + +import ( + "context" + "github.com/stretchr/testify/assert" + "os" + "os/signal" + "quesma_v2/backend_connectors" + quesma_api "quesma_v2/core" + "quesma_v2/frontend_connectors" + "quesma_v2/processors" + "syscall" + "testing" + "time" +) + +func emitRequests(stop chan os.Signal) { + go func() { + time.Sleep(1 * time.Second) + requestBody := []byte(`{"query": {"match_all": {}}}`) + sendRequest("http://localhost:8888/_bulk", requestBody) + sendRequest("http://localhost:8888/_doc", requestBody) + sendRequest("http://localhost:8888/_search", requestBody) + sendRequest("http://localhost:8888/_search", requestBody) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + close(stop) + }() +} + +func Test_backendConnectorValidation(t *testing.T) { + var tcpProcessor quesma_api.Processor = processors.NewPostgresToMySqlProcessor() + var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + postgressPipeline.AddProcessor(tcpProcessor) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() + const endpoint = "root:password@tcp(127.0.0.1:3306)/test" + var mySqlBackendConnector quesma_api.BackendConnector = &backend_connectors.MySqlBackendConnector{ + Endpoint: endpoint, + } + postgressPipeline.AddBackendConnector(mySqlBackendConnector) + quesmaBuilder.AddPipeline(postgressPipeline) + _, err := quesmaBuilder.Build() + assert.NoError(t, err) +} + +func ab_testing_scenario() quesma_api.QuesmaBuilder { + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() + + ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888") + ingestHTTPRouter := frontend_connectors.NewHTTPRouter() + ingestHTTPRouter.AddRoute("/_bulk", bulk) + ingestHTTPRouter.AddRoute("/_doc", doc) + ingestFrontendConnector.AddRouter(ingestHTTPRouter) + var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + ingestPipeline.AddFrontendConnector(ingestFrontendConnector) + var abIngestTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABIngestTestProcessor", false) + + var ingestProcessor quesma_api.Processor = NewIngestProcessor() + var innerIngestProcessor1 quesma_api.Processor = NewInnerIngestProcessor1() + ingestProcessor.AddProcessor(innerIngestProcessor1) + var innerIngestProcessor2 quesma_api.Processor = NewInnerIngestProcessor2() + ingestProcessor.AddProcessor(innerIngestProcessor2) + + ingestPipeline.AddProcessor(ingestProcessor) + ingestPipeline.AddProcessor(abIngestTestProcessor) + + queryFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888") + queryHTTPRouter := frontend_connectors.NewHTTPRouter() + queryHTTPRouter.AddRoute("/_search", search) + queryFrontendConnector.AddRouter(queryHTTPRouter) + var queryPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + queryPipeline.AddFrontendConnector(queryFrontendConnector) + var queryProcessor quesma_api.Processor = NewQueryProcessor() + var innerQueryProcessor1 quesma_api.Processor = NewInnerQueryProcessor1() + queryProcessor.AddProcessor(innerQueryProcessor1) + var innerQueryProcessor2 quesma_api.Processor = NewInnerQueryProcessor2() + queryProcessor.AddProcessor(innerQueryProcessor2) + var abQueryTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABQueryTestProcessor", true) + + queryPipeline.AddProcessor(queryProcessor) + queryPipeline.AddProcessor(abQueryTestProcessor) + quesmaBuilder.AddPipeline(ingestPipeline) + quesmaBuilder.AddPipeline(queryPipeline) + + quesma, _ := quesmaBuilder.Build() + return quesma +} + +func Test_scenario1(t *testing.T) { + q1 := ab_testing_scenario() + q1.Start() + stop := make(chan os.Signal, 1) + emitRequests(stop) + <-stop + q1.Stop(context.Background()) +} diff --git a/v2/v2_test_objects.go b/v2/v2_test_objects.go new file mode 100644 index 000000000..45b75bf02 --- /dev/null +++ b/v2/v2_test_objects.go @@ -0,0 +1,295 @@ +package v2 + +import ( + "net/http" + quesma_api "quesma_v2/core" + "quesma_v2/frontend_connectors" + "quesma_v2/processors" + "strconv" + "sync/atomic" +) + +var responses = [][]byte{ + []byte(`{ + "took": 5, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 1, + "relation": "eq" + }, + "max_score": 1.0, + "hits": [ + { + "_index": "blog", + "_type": "_doc", + "_id": "1", + "_score": 1.0, + "_source": { + "title": "Second Post", + "author": "John Doe", + "content": "This is the second blog post.", + "published_at": "2024-11-20" + } + } + ] + } +}`), + []byte(` +{ + "took": 5, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 1, + "relation": "eq" + }, + "max_score": 1.0, + "hits": [ + { + "_index": "blog", + "_type": "_doc", + "_id": "1", + "_score": 1.0, + "_source": { + "title": "First Post", + "author": "John Doe", + "content": "This is the first blog post.", + "published_at": "2024-11-01" + } + } + ] + } +}`), +} + +func bulk(request *http.Request) (map[string]interface{}, any, error) { + _, err := frontend_connectors.ReadRequestBody(request) + if err != nil { + return nil, nil, err + } + metadata := quesma_api.MakeNewMetadata() + metadata["level"] = 0 + resp := []byte("bulk\n") + atomic.AddInt64(&correlationId, 1) + quesma_api.SetCorrelationId(metadata, correlationId) + return metadata, resp, nil +} + +func doc(request *http.Request) (map[string]interface{}, any, error) { + _, err := frontend_connectors.ReadRequestBody(request) + if err != nil { + return nil, nil, err + } + metadata := quesma_api.MakeNewMetadata() + metadata["level"] = 0 + atomic.AddInt64(&correlationId, 1) + quesma_api.SetCorrelationId(metadata, correlationId) + resp := []byte("doc\n") + + return metadata, resp, nil +} + +var correlationId int64 = 0 + +func search(request *http.Request) (map[string]interface{}, any, error) { + metadata := quesma_api.MakeNewMetadata() + metadata["level"] = 0 + atomic.AddInt64(&correlationId, 1) + quesma_api.SetCorrelationId(metadata, correlationId) + return metadata, request, nil +} + +type IngestProcessor struct { + processors.BaseProcessor +} + +func NewIngestProcessor() *IngestProcessor { + return &IngestProcessor{BaseProcessor: processors.NewBaseProcessor()} +} + +func (p *IngestProcessor) GetId() string { + return "IngestProcessor" +} + +func (p *IngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + for _, m := range message { + data, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("IngestProcessor: invalid message type") + } + + level := metadata["level"].(int) + data = append(data, strconv.Itoa(level)...) + data = append(data, []byte(p.GetId())...) + data = append(data, []byte("\n")...) + } + return metadata, data, nil +} + +type InnerQueryProcessor2 struct { + processors.BaseProcessor + reqNum int +} + +func NewInnerQueryProcessor2() *InnerQueryProcessor2 { + return &InnerQueryProcessor2{ + BaseProcessor: processors.NewBaseProcessor(), + reqNum: 0, + } +} + +func (p *InnerQueryProcessor2) GetId() string { + return "InnerQueryProcessor2" +} + +func (p *InnerQueryProcessor2) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + if len(message) != 1 { + panic("InnerQueryProcessor2: expect only one message") + } + request, err := quesma_api.CheckedCast[*http.Request](message[0]) + if err != nil { + panic("QueryProcessor: invalid message type") + } + + _, err = frontend_connectors.ReadRequestBody(request) + if err != nil { + return nil, nil, err + } + // Simulate a search response + resp := make([]byte, 0) + resp = append(resp, responses[0]...) + return metadata, resp, nil +} + +type InnerQueryProcessor1 struct { + processors.BaseProcessor + reqNum int +} + +func NewInnerQueryProcessor1() *InnerQueryProcessor1 { + return &InnerQueryProcessor1{ + BaseProcessor: processors.NewBaseProcessor(), + reqNum: 0, + } +} + +func (p *InnerQueryProcessor1) GetId() string { + return "InnerQueryProcessor1" +} + +func (p *InnerQueryProcessor1) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + if len(message) != 1 { + panic("InnerQueryProcessor1: expect only one message") + } + request, err := quesma_api.CheckedCast[*http.Request](message[0]) + if err != nil { + panic("QueryProcessor: invalid message type") + } + _, err = frontend_connectors.ReadRequestBody(request) + if err != nil { + return nil, nil, err + } + // Simulate a search response + responseIndex := p.reqNum % 2 + p.reqNum++ + resp := make([]byte, 0) + resp = append(resp, responses[responseIndex]...) + return metadata, resp, nil +} + +type InnerIngestProcessor2 struct { + processors.BaseProcessor +} + +func NewInnerIngestProcessor2() *InnerIngestProcessor2 { + return &InnerIngestProcessor2{ + BaseProcessor: processors.NewBaseProcessor(), + } +} + +func (p *InnerIngestProcessor2) GetId() string { + return "InnerIngestProcessor2" +} + +func (p *InnerIngestProcessor2) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + for _, m := range message { + data, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("InnerIngestProcessor2: invalid message type") + } + level := metadata["level"].(int) + data = append(data, strconv.Itoa(level)...) + data = append(data, []byte(p.GetId())...) + data = append(data, []byte("\n")...) + } + return metadata, data, nil +} + +type InnerIngestProcessor1 struct { + processors.BaseProcessor +} + +func NewInnerIngestProcessor1() *InnerIngestProcessor1 { + return &InnerIngestProcessor1{ + BaseProcessor: processors.NewBaseProcessor(), + } +} + +func (p *InnerIngestProcessor1) GetId() string { + return "InnerIngestProcessor1" +} + +func (p *InnerIngestProcessor1) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + for _, m := range message { + data, err := quesma_api.CheckedCast[[]byte](m) + if err != nil { + panic("InnerIngestProcessor1: invalid message type") + } + level := metadata["level"].(int) + data = append(data, strconv.Itoa(level)...) + data = append(data, []byte(p.GetId())...) + data = append(data, []byte("\n")...) + } + return metadata, data, nil +} + +type QueryProcessor struct { + processors.BaseProcessor +} + +func NewQueryProcessor() *QueryProcessor { + return &QueryProcessor{ + BaseProcessor: processors.NewBaseProcessor(), + } +} + +func (p *QueryProcessor) GetId() string { + return "QueryProcessor" +} + +func (p *QueryProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + if len(message) != 1 { + panic("QueryProcessor: expect only one message") + } + request, err := quesma_api.CheckedCast[*http.Request](message[0]) + if err != nil { + panic("QueryProcessor: invalid message type") + } + return metadata, request, nil +}