diff --git a/go.mod b/go.mod index 4ac709c50..0e929bc4f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/TykTechnologies/tyk-pump -go 1.25 +go 1.25.0 require ( github.com/DataDog/datadog-go v4.7.0+incompatible @@ -22,6 +22,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/influxdata/influxdb v1.11.5 github.com/influxdata/influxdb-client-go/v2 v2.6.0 + github.com/jackc/pgx/v5 v5.9.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e github.com/mitchellh/mapstructure v1.3.1 @@ -37,7 +38,7 @@ require ( github.com/segmentio/analytics-go v0.0.0-20160711225931-bdb0aeca8a99 github.com/segmentio/kafka-go v0.3.6 github.com/sirupsen/logrus v1.9.3 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.11.1 golang.org/x/net v0.49.0 google.golang.org/protobuf v1.34.2 gopkg.in/alecthomas/kingpin.v2 v2.2.6 @@ -88,6 +89,7 @@ require ( github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgtype v1.14.0 // indirect github.com/jackc/pgx/v4 v4.18.3 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect @@ -111,7 +113,7 @@ require ( github.com/redis/go-redis/v9 v9.7.3 // indirect github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect github.com/shirou/gopsutil v3.20.11+incompatible // indirect - github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/go.sum b/go.sum index 961355438..87ea698bb 100644 --- a/go.sum +++ b/go.sum @@ -225,9 +225,13 @@ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgS github.com/jackc/pgx/v4 v4.13.0/go.mod h1:9P4X524sErlaxj0XSGZk7s+LD0eOyu1ZDUrrpznYDF0= github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= +github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 h1:E1bpycfzgfdJWK32+GOJDYVrep2fbX6cN6tYiXd+CGY= github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -375,19 +379,16 @@ github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+z github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo= github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= diff --git a/pumps/sql.go b/pumps/sql.go index 55614a594..1cde43a61 100644 --- a/pumps/sql.go +++ b/pumps/sql.go @@ -5,11 +5,16 @@ import ( "encoding/hex" "errors" "fmt" + "regexp" "sync" + "time" "github.com/sirupsen/logrus" "github.com/TykTechnologies/tyk-pump/analytics" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/stdlib" "github.com/mitchellh/mapstructure" "gopkg.in/vmihailenco/msgpack.v2" "gorm.io/gorm/clause" @@ -85,14 +90,63 @@ type SQLConf struct { MigrateShardedTables bool `json:"migrate_sharded_tables" mapstructure:"migrate_sharded_tables"` } +var timeZoneMatcher = regexp.MustCompile("(time_zone|TimeZone)=(.*?)($|&| )") + +// monthEncodePlan converts time.Month to int for pgx encoding. +// pgx v5's TryWrapBuiltinTypeEncodePlan matches time.Month as fmt.Stringer +// (producing "May") before TryWrapFindUnderlyingTypeEncodePlan can convert it +// to its underlying int. This plan is prepended to the encode chain so the +// int conversion happens first. See TT-16980 and https://github.com/jackc/pgx/issues/2157 +type monthEncodePlan struct { + next pgtype.EncodePlan +} + +func (p *monthEncodePlan) SetNext(next pgtype.EncodePlan) { p.next = next } + +func (p *monthEncodePlan) Encode(value any, buf []byte) ([]byte, error) { + return p.next.Encode(int(value.(time.Month)), buf) +} + func Dialect(cfg *SQLConf) (gorm.Dialector, error) { switch cfg.Type { case "postgres": - // Example connection_string: `"host=localhost user=gorm password=gorm DB.name=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"` - return postgres.New(postgres.Config{ - DSN: cfg.ConnectionString, - PreferSimpleProtocol: cfg.Postgres.PreferSimpleProtocol, - }), nil + // We build the *sql.DB ourselves instead of letting the gorm postgres + // driver do it. So we can inject an AfterConnect callback that registers + // time.Month as PostgreSQL int8. Without this, pgx v5's simple protocol + // encodes time.Month via String() ("May") instead of the underlying int, + // which PostgreSQL rejects for bigint columns. + // See TT-16980 and https://github.com/jackc/pgx/issues/2157 + pgxConfig, err := pgx.ParseConfig(cfg.ConnectionString) + if err != nil { + return nil, err + } + if cfg.Postgres.PreferSimpleProtocol { + pgxConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol + } + // Replicate timezone handling from gorm.io/driver/postgres. + if result := timeZoneMatcher.FindStringSubmatch(cfg.ConnectionString); len(result) > 2 { + pgxConfig.RuntimeParams["timezone"] = result[2] + } + sqlDB := stdlib.OpenDB(*pgxConfig, + stdlib.OptionAfterConnect(func(ctx context.Context, conn *pgx.Conn) error { + tm := conn.TypeMap() + // Prepend a custom encode plan that converts time.Month to int + // before pgx's fmt.Stringer wrapper can fire. + tm.TryWrapEncodePlanFuncs = append( + []pgtype.TryWrapEncodePlanFunc{ + func(value any) (pgtype.WrappedEncodePlanNextSetter, any, bool) { + if m, ok := value.(time.Month); ok { + return &monthEncodePlan{}, int(m), true + } + return nil, nil, false + }, + }, + tm.TryWrapEncodePlanFuncs..., + ) + return nil + }), + ) + return postgres.New(postgres.Config{Conn: sqlDB}), nil case "mysql": return mysql.New(mysql.Config{ DSN: cfg.ConnectionString, diff --git a/pumps/sql_test.go b/pumps/sql_test.go index 509beca60..f49e06311 100644 --- a/pumps/sql_test.go +++ b/pumps/sql_test.go @@ -8,6 +8,7 @@ import ( "github.com/TykTechnologies/tyk-pump/analytics" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "gopkg.in/vmihailenco/msgpack.v2" ) @@ -504,3 +505,42 @@ func TestBuildIndexName(t *testing.T) { }) } } + +func TestSQLWriteData_PreferSimpleProtocol_Month(t *testing.T) { + skipTestIfNoPostgres(t) + + pmp := SQLPump{} + cfg := newSQLConfig(false) + cfg["postgres"] = map[string]interface{}{"prefer_simple_protocol": true} + + err := pmp.Init(cfg) + if err != nil { + t.Fatal("SQL Pump couldn't be initialized with err:", err) + } + + defer func() { + require.NoError(t, pmp.db.Migrator().DropTable(analytics.SQLTable)) + }() + + rec := analytics.AnalyticsRecord{ + APIID: "api-simple-proto", + OrgID: "org-simple-proto", + TimeStamp: time.Now(), + Month: time.May, + } + + errWrite := pmp.WriteData(context.TODO(), []interface{}{rec}) + if errWrite != nil { + t.Fatal("SQL Pump couldn't write records with err:", errWrite) + } + + var dbRecords []analytics.AnalyticsRecord + err = pmp.db.Table(analytics.SQLTable).Find(&dbRecords).Error + if err != nil { + t.Fatal("couldn't read records back:", err) + } + + if assert.Equal(t, 1, len(dbRecords), "expected 1 record in DB -- insert likely failed due to pgx v5 time.Month encoding bug") { + assert.Equal(t, time.May, dbRecords[0].Month, "month should round-trip as integer 5, not a string") + } +}