Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb v1.11.5
github.com/influxdata/influxdb-client-go/v2 v2.6.0
github.com/jackc/pgx/v5 v5.9.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e
github.com/mitchellh/mapstructure v1.3.1
Expand Down Expand Up @@ -82,7 +83,6 @@ require (
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.9.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
Expand Down
64 changes: 59 additions & 5 deletions pumps/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"encoding/hex"
"errors"
"fmt"
"regexp"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/stdlib"
"github.com/mitchellh/mapstructure"
"gopkg.in/vmihailenco/msgpack.v2"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -85,14 +90,63 @@ type SQLConf struct {
MigrateShardedTables bool `json:"migrate_sharded_tables" mapstructure:"migrate_sharded_tables"`
}

var timeZoneMatcher = regexp.MustCompile("(time_zone|TimeZone)=(.*?)($|&| )")

// monthEncodePlan converts time.Month to int for pgx encoding.
// pgx v5's TryWrapBuiltinTypeEncodePlan matches time.Month as fmt.Stringer
// (producing "May") before TryWrapFindUnderlyingTypeEncodePlan can convert it
// to its underlying int. This plan is prepended to the encode chain so the
// int conversion happens first. See TT-16980 and https://github.com/jackc/pgx/issues/2157
type monthEncodePlan struct {
next pgtype.EncodePlan
}

func (p *monthEncodePlan) SetNext(next pgtype.EncodePlan) { p.next = next }

func (p *monthEncodePlan) Encode(value any, buf []byte) ([]byte, error) {
return p.next.Encode(int(value.(time.Month)), buf)
}

func Dialect(cfg *SQLConf) (gorm.Dialector, error) {
switch cfg.Type {
case "postgres":
// Example connection_string: `"host=localhost user=gorm password=gorm DB.name=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"`
return postgres.New(postgres.Config{
DSN: cfg.ConnectionString,
PreferSimpleProtocol: cfg.Postgres.PreferSimpleProtocol,
}), nil
// We build the *sql.DB ourselves instead of letting the gorm postgres
// driver do it. So we can inject an AfterConnect callback that registers
// time.Month as PostgreSQL int8. Without this, pgx v5's simple protocol
// encodes time.Month via String() ("May") instead of the underlying int,
// which PostgreSQL rejects for bigint columns.
// See TT-16980 and https://github.com/jackc/pgx/issues/2157
pgxConfig, err := pgx.ParseConfig(cfg.ConnectionString)
if err != nil {
return nil, err
}
if cfg.Postgres.PreferSimpleProtocol {
pgxConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
}
// Replicate timezone handling from gorm.io/driver/postgres.
if result := timeZoneMatcher.FindStringSubmatch(cfg.ConnectionString); len(result) > 2 {
pgxConfig.RuntimeParams["timezone"] = result[2]
}
sqlDB := stdlib.OpenDB(*pgxConfig,
stdlib.OptionAfterConnect(func(ctx context.Context, conn *pgx.Conn) error {
tm := conn.TypeMap()
// Prepend a custom encode plan that converts time.Month to int
// before pgx's fmt.Stringer wrapper can fire.
tm.TryWrapEncodePlanFuncs = append(
[]pgtype.TryWrapEncodePlanFunc{
func(value any) (pgtype.WrappedEncodePlanNextSetter, any, bool) {
if m, ok := value.(time.Month); ok {
return &monthEncodePlan{}, int(m), true
}
return nil, nil, false
},
},
tm.TryWrapEncodePlanFuncs...,
)
return nil
}),
)
return postgres.New(postgres.Config{Conn: sqlDB}), nil
case "mysql":
return mysql.New(mysql.Config{
DSN: cfg.ConnectionString,
Expand Down
45 changes: 40 additions & 5 deletions pumps/sql_pgxv5_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gorm.io/gorm"
gorm_logger "gorm.io/gorm/logger"
)
Expand Down Expand Up @@ -448,11 +449,6 @@
// the driver that must not regress.
func TestPreferSimpleProtocol_Postgres(t *testing.T) {
skipTestIfNoPostgres(t)
t.Skip("prefer_simple_protocol=true is unsupported with pgx v5: " +
"pgx v5's TryWrapBuiltinTypeEncodePlan adds an fmt.Stringer branch that " +
"text-encodes time.Month as a month name (e.g. 'May') instead of an integer, " +
"which PostgreSQL rejects against bigint columns. The test body below is " +
"preserved so it can be re-enabled if a driver/model fix is shipped. See TT-16932.")

cfg := newSQLConfig(false)
cfg["postgres"] = map[string]interface{}{"prefer_simple_protocol": true}
Expand Down Expand Up @@ -645,3 +641,42 @@
assert.LessOrEqual(t, stats.Idle, 2,
"idle connections should not exceed stdlib default MaxIdleConns=2; got %d", stats.Idle)
}

func TestSQLWriteData_PreferSimpleProtocol_Month(t *testing.T) {
skipTestIfNoPostgres(t)

pmp := SQLPump{}
cfg := newSQLConfig(false)
cfg["postgres"] = map[string]interface{}{"prefer_simple_protocol": true}

Check warning on line 650 in pumps/sql_pgxv5_test.go

View check run for this annotation

probelabs / Visor: quality

architecture Issue

The deferred function for cleaning up the database table is registered after `pmp.Init()` is called. If `pmp.Init()` fails after the table has been created but before the function returns successfully, the `defer` will not be registered and the test table will be left behind, potentially causing subsequent tests to fail.
Raw output
To ensure cleanup always runs, register the `defer` statement before calling `pmp.Init()` and ensure the deferred function can safely handle a `nil` or partially initialized `pmp.db` object.

err := pmp.Init(cfg)
if err != nil {
t.Fatal("SQL Pump couldn't be initialized with err:", err)
}

Check warning on line 655 in pumps/sql_pgxv5_test.go

View check run for this annotation

probelabs / Visor: quality

logic Issue

Using `require.NoError` inside a deferred function is unsafe. If this assertion fails, it will call `t.FailNow()`, preventing any other deferred functions from executing. This can lead to incomplete test cleanup.
Raw output
Replace `require.NoError` with `assert.NoError` in deferred functions to ensure that all cleanup operations are attempted, even if one of them fails.

defer func() {
require.NoError(t, pmp.db.Migrator().DropTable(analytics.SQLTable))
}()

rec := analytics.AnalyticsRecord{
APIID: "api-simple-proto",
OrgID: "org-simple-proto",
TimeStamp: time.Now(),
Month: time.May,
}

errWrite := pmp.WriteData(context.TODO(), []interface{}{rec})
if errWrite != nil {
t.Fatal("SQL Pump couldn't write records with err:", errWrite)
}

var dbRecords []analytics.AnalyticsRecord
err = pmp.db.Table(analytics.SQLTable).Find(&dbRecords).Error
if err != nil {
t.Fatal("couldn't read records back:", err)
}

if assert.Equal(t, 1, len(dbRecords), "expected 1 record in DB -- insert likely failed due to pgx v5 time.Month encoding bug") {
assert.Equal(t, time.May, dbRecords[0].Month, "month should round-trip as integer 5, not a string")
}
}
Loading