Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bun backend #382

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions backend/postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,12 @@ Configuration for the PostgreSQL connection pool of the microservice.
* postgres user to access the database
* `POSTGRES_DB` default: `postgres`
* database to access
* `POSTGRES_MAX_RETRIES` default: `5`
* Maximum number of retries before giving up
* `POSTGRES_RETRY_STATEMENT_TIMEOUT` default: `false`
* Whether to retry queries cancelled because of statement_timeout
* `POSTGRES_MIN_RETRY_BACKOFF` default: `250ms`
* Minimum backoff between each retry
* `POSTGRES_MAX_RETRY_BACKOFF` default: `4s`
* Maximum backoff between each retry
* `POSTGRES_DIAL_TIMEOUT` default: `5s`
* Dial timeout for establishing new connections
* `POSTGRES_READ_TIMEOUT` default: `30s`
* Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking
* `POSTGRES_WRITE_TIMEOUT` default: `30s`
* Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking.
* `POSTGRES_POOL_SIZE` default: `100`
* Maximum number of socket connections
* `POSTGRES_MIN_IDLE_CONNECTIONS` default: `10`
* Minimum number of idle connections which is useful when establishing new connection is slow
* `POSTGRES_MAX_CONN_AGE` default: `30m`
* Connection age at which client retires (closes) the connection
* `POSTGRES_POOL_TIMEOUT` default: `31s`
* Time for which client waits for free connection if all connections are busy before returning an error
* `POSTGRES_IDLE_TIMEOUT` default: `5m`
* Amount of time after which client closes idle connections
* `POSTGRES_IDLE_CHECK_FREQUENCY` default: `1m`
* Frequency of idle checks made by idle connections reaper
* `POSTGRES_HEALTH_CHECK_TABLE_NAME` default: `healthcheck`
* Name of the Table that is created to try if database is writeable
* `POSTGRES_HEALTH_CHECK_RESULT_TTL` default: `10s`
Expand All @@ -53,7 +33,6 @@ Prometheus metrics exposed.
* `pace_postgres_query_total{database}` Collects stats about the number of postgres queries made
* `pace_postgres_query_failed{database}` Collects stats about the number of postgres queries failed
* `pace_postgres_query_duration_seconds{database}` Collects performance metrics for each postgres query
* `pace_postgres_query_rows_total{database}` Collects stats about the number of rows returned by a postgres query
* `pace_postgres_query_affected_total{database}` Collects stats about the number of rows affected by a postgres query
* `pace_postgres_connection_pool_hits{database}` Collects number of times free connection was found in the pool
* `pace_postgres_connection_pool_misses{database}` Collects number of times free connection was NOT found in the pool
Expand Down
16 changes: 9 additions & 7 deletions backend/postgres/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,34 @@ import (
"io"
"net"

"github.com/go-pg/pg"
"github.com/uptrace/bun/driver/pgdriver"
)

var ErrNotUnique = errors.New("not unique")

func IsErrConnectionFailed(err error) bool {
// go-pg has this check internally for network errors
// bun has this check internally for network errors
if errors.Is(err, io.EOF) {
return true
}

// go-pg has this check internally for network errors
// bun has this check internally for network errors
_, ok := err.(net.Error)
if ok {
return true
}

// go-pg has similar check for integrity violation issues, here we check network issues
pgErr, ok := err.(pg.Error)
if ok {
// bun has similar check for integrity violation issues, here we check network issues
var pgErr pgdriver.Error

if errors.As(err, &pgErr) {
code := pgErr.Field('C')
// We check on error codes of Class 08 — Connection Exception.
// https://www.postgresql.org/docs/10/errcodes-appendix.html
if code[0:2] == "08" {
if len(code) > 2 && code[0:2] == "08" {
return true
}
}

return false
}
19 changes: 4 additions & 15 deletions backend/postgres/errors_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package postgres_test

import (
"context"
"errors"
"fmt"
"io"
"testing"

"github.com/go-pg/pg"
"github.com/stretchr/testify/require"

pbpostgres "github.com/pace/bricks/backend/postgres"
Expand All @@ -19,13 +19,10 @@ func TestIsErrConnectionFailed(t *testing.T) {
})

t.Run("connection failed (net.Error)", func(t *testing.T) {
db := pbpostgres.CustomConnectionPool(&pg.Options{}) // invalid connection
_, err := db.Exec("")
require.True(t, pbpostgres.IsErrConnectionFailed(err))
})
ctx := context.Background()

t.Run("connection failed (pg.Error)", func(t *testing.T) {
err := error(mockPGError{m: map[byte]string{'C': "08000"}})
db := pbpostgres.NewDB(ctx, pbpostgres.WithHost("foobar")) // invalid connection
_, err := db.Exec("")
require.True(t, pbpostgres.IsErrConnectionFailed(err))
})

Expand All @@ -34,11 +31,3 @@ func TestIsErrConnectionFailed(t *testing.T) {
require.False(t, pbpostgres.IsErrConnectionFailed(err))
})
}

type mockPGError struct {
m map[byte]string
}

func (err mockPGError) Field(k byte) string { return err.m[k] }
func (err mockPGError) IntegrityViolation() bool { return false }
func (err mockPGError) Error() string { return fmt.Sprintf("%+v", err.m) }
91 changes: 91 additions & 0 deletions backend/postgres/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright © 2024 by PACE Telematics GmbH. All rights reserved.

package postgres

import (
"context"
"database/sql"
"time"

"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/uptrace/bun"
)

type queryExecutor interface {
Exec(ctx context.Context, dest ...interface{}) (sql.Result, error)
}

// HealthCheck checks the state of a postgres connection. It must not be changed
// after it was registered as a health check.
type HealthCheck struct {
state servicehealthcheck.ConnectionState

createTableQueryExecutor queryExecutor
deleteQueryExecutor queryExecutor
dropTableQueryExecutor queryExecutor
insertQueryExecutor queryExecutor
selectQueryExecutor queryExecutor
}

type healthcheck struct {
bun.BaseModel

OK bool `bun:"column:ok"`
}

// NewHealthCheck creates a new HealthCheck instance.
func NewHealthCheck(db *bun.DB) *HealthCheck {
return &HealthCheck{
createTableQueryExecutor: db.NewCreateTable().Model((*healthcheck)(nil)).ModelTableExpr(cfg.HealthCheckTableName).IfNotExists(),
deleteQueryExecutor: db.NewDelete().ModelTableExpr(cfg.HealthCheckTableName).Where("TRUE"),
dropTableQueryExecutor: db.NewDropTable().ModelTableExpr(cfg.HealthCheckTableName).IfExists(),
insertQueryExecutor: db.NewInsert().ModelTableExpr(cfg.HealthCheckTableName).Model(&healthcheck{OK: true}),
selectQueryExecutor: db.NewRaw("SELECT 1;"),
}
}

// Init initializes the test table
func (h *HealthCheck) Init(ctx context.Context) error {
_, err := h.createTableQueryExecutor.Exec(ctx)
return err
}

// HealthCheck performs the read test on the database. If enabled, it performs a
// write test as well.
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
if time.Since(h.state.LastChecked()) <= cfg.HealthCheckResultTTL {
// the last result of the Health Check is still not outdated
return h.state.GetState()
}

// Readcheck
if _, err := h.selectQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// writecheck - add Data to configured Table
if _, err := h.insertQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// and while we're at it, check delete as well (so as not to clutter the database
// because UPSERT is impractical here
if _, err := h.deleteQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// If no error occurred set the State of this Health Check to healthy
h.state.SetHealthy()

return h.state.GetState()
}

// CleanUp drops the test table.
func (h *HealthCheck) CleanUp(ctx context.Context) error {
_, err := h.dropTableQueryExecutor.Exec(ctx)

return err
}
65 changes: 0 additions & 65 deletions backend/postgres/health_postgres.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ package postgres

import (
"context"
"database/sql"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/go-pg/pg/orm"
"github.com/stretchr/testify/require"

http2 "github.com/pace/bricks/http"
"github.com/pace/bricks/maintenance/errors"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/pace/bricks/maintenance/log"
"github.com/stretchr/testify/require"
)

func setup() *http.Response {
Expand Down Expand Up @@ -52,7 +53,7 @@ type testPool struct {
err error
}

func (t *testPool) Exec(ctx context.Context, query interface{}, params ...interface{}) (res orm.Result, err error) {
func (t *testPool) Exec(ctx context.Context, dest ...any) (sql.Result, error) {
return nil, t.err
}

Expand All @@ -63,16 +64,25 @@ func TestHealthCheckCaching(t *testing.T) {
cfg.HealthCheckResultTTL = time.Minute
requiredErr := errors.New("TestHealthCheckCaching")
pool := &testPool{err: requiredErr}
h := &HealthCheck{Pool: pool}
h := &HealthCheck{
createTableQueryExecutor: pool,
deleteQueryExecutor: pool,
dropTableQueryExecutor: pool,
insertQueryExecutor: pool,
selectQueryExecutor: pool,
}

res := h.HealthCheck(ctx)
// get the error for the first time
require.Equal(t, servicehealthcheck.Err, res.State)
require.Equal(t, "TestHealthCheckCaching", res.Msg)

res = h.HealthCheck(ctx)
pool.err = nil
// getting the cached error
require.Equal(t, servicehealthcheck.Err, res.State)
require.Equal(t, "TestHealthCheckCaching", res.Msg)

// Resetting the TTL to get a uncached result
cfg.HealthCheckResultTTL = 0
res = h.HealthCheck(ctx)
Expand Down
Loading