Skip to content

Commit fe9ad5a

Browse files
Merge pull request #382 from pace/bun-poc
Add Bun backend
2 parents 623eae7 + 929a6e9 commit fe9ad5a

22 files changed

+558
-1249
lines changed

backend/postgres/README.md

-21
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,12 @@ Configuration for the PostgreSQL connection pool of the microservice.
1515
* postgres user to access the database
1616
* `POSTGRES_DB` default: `postgres`
1717
* database to access
18-
* `POSTGRES_MAX_RETRIES` default: `5`
19-
* Maximum number of retries before giving up
20-
* `POSTGRES_RETRY_STATEMENT_TIMEOUT` default: `false`
21-
* Whether to retry queries cancelled because of statement_timeout
22-
* `POSTGRES_MIN_RETRY_BACKOFF` default: `250ms`
23-
* Minimum backoff between each retry
24-
* `POSTGRES_MAX_RETRY_BACKOFF` default: `4s`
25-
* Maximum backoff between each retry
2618
* `POSTGRES_DIAL_TIMEOUT` default: `5s`
2719
* Dial timeout for establishing new connections
2820
* `POSTGRES_READ_TIMEOUT` default: `30s`
2921
* Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking
3022
* `POSTGRES_WRITE_TIMEOUT` default: `30s`
3123
* Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking.
32-
* `POSTGRES_POOL_SIZE` default: `100`
33-
* Maximum number of socket connections
34-
* `POSTGRES_MIN_IDLE_CONNECTIONS` default: `10`
35-
* Minimum number of idle connections which is useful when establishing new connection is slow
36-
* `POSTGRES_MAX_CONN_AGE` default: `30m`
37-
* Connection age at which client retires (closes) the connection
38-
* `POSTGRES_POOL_TIMEOUT` default: `31s`
39-
* Time for which client waits for free connection if all connections are busy before returning an error
40-
* `POSTGRES_IDLE_TIMEOUT` default: `5m`
41-
* Amount of time after which client closes idle connections
42-
* `POSTGRES_IDLE_CHECK_FREQUENCY` default: `1m`
43-
* Frequency of idle checks made by idle connections reaper
4424
* `POSTGRES_HEALTH_CHECK_TABLE_NAME` default: `healthcheck`
4525
* Name of the Table that is created to try if database is writeable
4626
* `POSTGRES_HEALTH_CHECK_RESULT_TTL` default: `10s`
@@ -53,7 +33,6 @@ Prometheus metrics exposed.
5333
* `pace_postgres_query_total{database}` Collects stats about the number of postgres queries made
5434
* `pace_postgres_query_failed{database}` Collects stats about the number of postgres queries failed
5535
* `pace_postgres_query_duration_seconds{database}` Collects performance metrics for each postgres query
56-
* `pace_postgres_query_rows_total{database}` Collects stats about the number of rows returned by a postgres query
5736
* `pace_postgres_query_affected_total{database}` Collects stats about the number of rows affected by a postgres query
5837
* `pace_postgres_connection_pool_hits{database}` Collects number of times free connection was found in the pool
5938
* `pace_postgres_connection_pool_misses{database}` Collects number of times free connection was NOT found in the pool

backend/postgres/errors.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,34 @@ import (
77
"io"
88
"net"
99

10-
"github.com/go-pg/pg"
10+
"github.com/uptrace/bun/driver/pgdriver"
1111
)
1212

1313
var ErrNotUnique = errors.New("not unique")
1414

1515
func IsErrConnectionFailed(err error) bool {
16-
// go-pg has this check internally for network errors
16+
// bun has this check internally for network errors
1717
if errors.Is(err, io.EOF) {
1818
return true
1919
}
2020

21-
// go-pg has this check internally for network errors
21+
// bun has this check internally for network errors
2222
_, ok := err.(net.Error)
2323
if ok {
2424
return true
2525
}
2626

27-
// go-pg has similar check for integrity violation issues, here we check network issues
28-
pgErr, ok := err.(pg.Error)
29-
if ok {
27+
// bun has similar check for integrity violation issues, here we check network issues
28+
var pgErr pgdriver.Error
29+
30+
if errors.As(err, &pgErr) {
3031
code := pgErr.Field('C')
3132
// We check on error codes of Class 08 — Connection Exception.
3233
// https://www.postgresql.org/docs/10/errcodes-appendix.html
33-
if code[0:2] == "08" {
34+
if len(code) > 2 && code[0:2] == "08" {
3435
return true
3536
}
3637
}
38+
3739
return false
3840
}

backend/postgres/errors_test.go

+4-15
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package postgres_test
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"io"
78
"testing"
89

9-
"github.com/go-pg/pg"
1010
"github.com/stretchr/testify/require"
1111

1212
pbpostgres "github.com/pace/bricks/backend/postgres"
@@ -19,13 +19,10 @@ func TestIsErrConnectionFailed(t *testing.T) {
1919
})
2020

2121
t.Run("connection failed (net.Error)", func(t *testing.T) {
22-
db := pbpostgres.CustomConnectionPool(&pg.Options{}) // invalid connection
23-
_, err := db.Exec("")
24-
require.True(t, pbpostgres.IsErrConnectionFailed(err))
25-
})
22+
ctx := context.Background()
2623

27-
t.Run("connection failed (pg.Error)", func(t *testing.T) {
28-
err := error(mockPGError{m: map[byte]string{'C': "08000"}})
24+
db := pbpostgres.NewDB(ctx, pbpostgres.WithHost("foobar")) // invalid connection
25+
_, err := db.Exec("")
2926
require.True(t, pbpostgres.IsErrConnectionFailed(err))
3027
})
3128

@@ -34,11 +31,3 @@ func TestIsErrConnectionFailed(t *testing.T) {
3431
require.False(t, pbpostgres.IsErrConnectionFailed(err))
3532
})
3633
}
37-
38-
type mockPGError struct {
39-
m map[byte]string
40-
}
41-
42-
func (err mockPGError) Field(k byte) string { return err.m[k] }
43-
func (err mockPGError) IntegrityViolation() bool { return false }
44-
func (err mockPGError) Error() string { return fmt.Sprintf("%+v", err.m) }

backend/postgres/health.go

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright © 2024 by PACE Telematics GmbH. All rights reserved.
2+
3+
package postgres
4+
5+
import (
6+
"context"
7+
"database/sql"
8+
"time"
9+
10+
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
11+
"github.com/uptrace/bun"
12+
)
13+
14+
type queryExecutor interface {
15+
Exec(ctx context.Context, dest ...interface{}) (sql.Result, error)
16+
}
17+
18+
// HealthCheck checks the state of a postgres connection. It must not be changed
19+
// after it was registered as a health check.
20+
type HealthCheck struct {
21+
state servicehealthcheck.ConnectionState
22+
23+
createTableQueryExecutor queryExecutor
24+
deleteQueryExecutor queryExecutor
25+
dropTableQueryExecutor queryExecutor
26+
insertQueryExecutor queryExecutor
27+
selectQueryExecutor queryExecutor
28+
}
29+
30+
type healthcheck struct {
31+
bun.BaseModel
32+
33+
OK bool `bun:"column:ok"`
34+
}
35+
36+
// NewHealthCheck creates a new HealthCheck instance.
37+
func NewHealthCheck(db *bun.DB) *HealthCheck {
38+
return &HealthCheck{
39+
createTableQueryExecutor: db.NewCreateTable().Model((*healthcheck)(nil)).ModelTableExpr(cfg.HealthCheckTableName).IfNotExists(),
40+
deleteQueryExecutor: db.NewDelete().ModelTableExpr(cfg.HealthCheckTableName).Where("TRUE"),
41+
dropTableQueryExecutor: db.NewDropTable().ModelTableExpr(cfg.HealthCheckTableName).IfExists(),
42+
insertQueryExecutor: db.NewInsert().ModelTableExpr(cfg.HealthCheckTableName).Model(&healthcheck{OK: true}),
43+
selectQueryExecutor: db.NewRaw("SELECT 1;"),
44+
}
45+
}
46+
47+
// Init initializes the test table
48+
func (h *HealthCheck) Init(ctx context.Context) error {
49+
_, err := h.createTableQueryExecutor.Exec(ctx)
50+
return err
51+
}
52+
53+
// HealthCheck performs the read test on the database. If enabled, it performs a
54+
// write test as well.
55+
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
56+
if time.Since(h.state.LastChecked()) <= cfg.HealthCheckResultTTL {
57+
// the last result of the Health Check is still not outdated
58+
return h.state.GetState()
59+
}
60+
61+
// Readcheck
62+
if _, err := h.selectQueryExecutor.Exec(ctx); err != nil {
63+
h.state.SetErrorState(err)
64+
return h.state.GetState()
65+
}
66+
67+
// writecheck - add Data to configured Table
68+
if _, err := h.insertQueryExecutor.Exec(ctx); err != nil {
69+
h.state.SetErrorState(err)
70+
return h.state.GetState()
71+
}
72+
73+
// and while we're at it, check delete as well (so as not to clutter the database
74+
// because UPSERT is impractical here
75+
if _, err := h.deleteQueryExecutor.Exec(ctx); err != nil {
76+
h.state.SetErrorState(err)
77+
return h.state.GetState()
78+
}
79+
80+
// If no error occurred set the State of this Health Check to healthy
81+
h.state.SetHealthy()
82+
83+
return h.state.GetState()
84+
}
85+
86+
// CleanUp drops the test table.
87+
func (h *HealthCheck) CleanUp(ctx context.Context) error {
88+
_, err := h.dropTableQueryExecutor.Exec(ctx)
89+
90+
return err
91+
}

backend/postgres/health_postgres.go

-65
This file was deleted.

backend/postgres/health_postgres_test.go backend/postgres/health_test.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ package postgres
44

55
import (
66
"context"
7+
"database/sql"
78
"io"
89
"net/http"
910
"net/http/httptest"
1011
"strings"
1112
"testing"
1213
"time"
1314

14-
"github.com/go-pg/pg/orm"
15+
"github.com/stretchr/testify/require"
16+
1517
http2 "github.com/pace/bricks/http"
1618
"github.com/pace/bricks/maintenance/errors"
1719
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
1820
"github.com/pace/bricks/maintenance/log"
19-
"github.com/stretchr/testify/require"
2021
)
2122

2223
func setup() *http.Response {
@@ -52,7 +53,7 @@ type testPool struct {
5253
err error
5354
}
5455

55-
func (t *testPool) Exec(ctx context.Context, query interface{}, params ...interface{}) (res orm.Result, err error) {
56+
func (t *testPool) Exec(ctx context.Context, dest ...any) (sql.Result, error) {
5657
return nil, t.err
5758
}
5859

@@ -63,16 +64,25 @@ func TestHealthCheckCaching(t *testing.T) {
6364
cfg.HealthCheckResultTTL = time.Minute
6465
requiredErr := errors.New("TestHealthCheckCaching")
6566
pool := &testPool{err: requiredErr}
66-
h := &HealthCheck{Pool: pool}
67+
h := &HealthCheck{
68+
createTableQueryExecutor: pool,
69+
deleteQueryExecutor: pool,
70+
dropTableQueryExecutor: pool,
71+
insertQueryExecutor: pool,
72+
selectQueryExecutor: pool,
73+
}
74+
6775
res := h.HealthCheck(ctx)
6876
// get the error for the first time
6977
require.Equal(t, servicehealthcheck.Err, res.State)
7078
require.Equal(t, "TestHealthCheckCaching", res.Msg)
79+
7180
res = h.HealthCheck(ctx)
7281
pool.err = nil
7382
// getting the cached error
7483
require.Equal(t, servicehealthcheck.Err, res.State)
7584
require.Equal(t, "TestHealthCheckCaching", res.Msg)
85+
7686
// Resetting the TTL to get a uncached result
7787
cfg.HealthCheckResultTTL = 0
7888
res = h.HealthCheck(ctx)

0 commit comments

Comments
 (0)