Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/TykTechnologies/tyk-pump

go 1.25
go 1.25.0

require (
github.com/DataDog/datadog-go v4.7.0+incompatible
Expand All @@ -22,6 +22,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb v1.11.5
github.com/influxdata/influxdb-client-go/v2 v2.6.0
github.com/jackc/pgx/v5 v5.9.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e
github.com/mitchellh/mapstructure v1.3.1
Expand All @@ -37,7 +38,7 @@ require (
github.com/segmentio/analytics-go v0.0.0-20160711225931-bdb0aeca8a99
github.com/segmentio/kafka-go v0.3.6
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.11.1
golang.org/x/net v0.49.0
google.golang.org/protobuf v1.34.2
gopkg.in/alecthomas/kingpin.v2 v2.2.6
Expand Down Expand Up @@ -88,6 +89,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.3 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
Expand All @@ -111,7 +113,7 @@ require (
github.com/redis/go-redis/v9 v9.7.3 // indirect
github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
15 changes: 8 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,13 @@ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgS
github.com/jackc/pgx/v4 v4.13.0/go.mod h1:9P4X524sErlaxj0XSGZk7s+LD0eOyu1ZDUrrpznYDF0=
github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA=
github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 h1:E1bpycfzgfdJWK32+GOJDYVrep2fbX6cN6tYiXd+CGY=
github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
Expand Down Expand Up @@ -375,19 +379,16 @@ github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+z
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo=
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
Expand Down
64 changes: 59 additions & 5 deletions pumps/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
"encoding/hex"
"errors"
"fmt"
"regexp"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/stdlib"
"github.com/mitchellh/mapstructure"
"gopkg.in/vmihailenco/msgpack.v2"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -85,14 +90,63 @@
MigrateShardedTables bool `json:"migrate_sharded_tables" mapstructure:"migrate_sharded_tables"`
}

var timeZoneMatcher = regexp.MustCompile("(time_zone|TimeZone)=(.*?)($|&| )")

// monthEncodePlan converts time.Month to int for pgx encoding.
// pgx v5's TryWrapBuiltinTypeEncodePlan matches time.Month as fmt.Stringer
// (producing "May") before TryWrapFindUnderlyingTypeEncodePlan can convert it
// to its underlying int. This plan is prepended to the encode chain so the
// int conversion happens first. See TT-16980 and https://github.com/jackc/pgx/issues/2157
type monthEncodePlan struct {
next pgtype.EncodePlan
}

func (p *monthEncodePlan) SetNext(next pgtype.EncodePlan) { p.next = next }

func (p *monthEncodePlan) Encode(value any, buf []byte) ([]byte, error) {
return p.next.Encode(int(value.(time.Month)), buf)
}

func Dialect(cfg *SQLConf) (gorm.Dialector, error) {
switch cfg.Type {
case "postgres":
// Example connection_string: `"host=localhost user=gorm password=gorm DB.name=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"`
return postgres.New(postgres.Config{
DSN: cfg.ConnectionString,
PreferSimpleProtocol: cfg.Postgres.PreferSimpleProtocol,
}), nil
// We build the *sql.DB ourselves instead of letting the gorm postgres
// driver do it. So we can inject an AfterConnect callback that registers
// time.Month as PostgreSQL int8. Without this, pgx v5's simple protocol
// encodes time.Month via String() ("May") instead of the underlying int,
// which PostgreSQL rejects for bigint columns.
// See TT-16980 and https://github.com/jackc/pgx/issues/2157
pgxConfig, err := pgx.ParseConfig(cfg.ConnectionString)
if err != nil {
return nil, err
}

Check warning on line 122 in pumps/sql.go

View check run for this annotation

probelabs / Visor: performance

performance Issue

The custom `TryWrapEncodePlanFunc` is prepended to the `pgx` encoding plan chain, causing it to execute for every value being sent to the database. This introduces a type-assertion check on a critical hot path (database serialization). While the overhead per value is minimal, the cumulative impact could become noticeable in very high-throughput environments.
Raw output
This is an acceptable trade-off to fix the underlying bug without introducing breaking changes to data structures. For future optimization, consider investigating if `pgx` allows registering a specific codec for the `time.Month` type directly, which would avoid the overhead of checking every value that passes through the driver.
if cfg.Postgres.PreferSimpleProtocol {
pgxConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
}
// Replicate timezone handling from gorm.io/driver/postgres.
if result := timeZoneMatcher.FindStringSubmatch(cfg.ConnectionString); len(result) > 2 {
pgxConfig.RuntimeParams["timezone"] = result[2]
}
sqlDB := stdlib.OpenDB(*pgxConfig,
stdlib.OptionAfterConnect(func(ctx context.Context, conn *pgx.Conn) error {
tm := conn.TypeMap()

Check warning on line 132 in pumps/sql.go

View check run for this annotation

probelabs / Visor: architecture

architecture Issue

The custom PostgreSQL connection logic manually replicates DSN parsing (specifically for timezone extraction) from the `gorm.io/driver/postgres` library. While necessary for this fix, it creates a maintenance dependency. If the GORM driver's DSN parsing logic changes in a future version, this implementation will become outdated and could lead to behavioral inconsistencies.
Raw output
To mitigate this, add a more specific comment linking to the source file and version of the GORM driver from which this logic was copied. This will serve as a clear reminder for future developers to verify this implementation when upgrading the GORM dependency.
// Prepend a custom encode plan that converts time.Month to int
// before pgx's fmt.Stringer wrapper can fire.
tm.TryWrapEncodePlanFuncs = append(
[]pgtype.TryWrapEncodePlanFunc{
func(value any) (pgtype.WrappedEncodePlanNextSetter, any, bool) {
if m, ok := value.(time.Month); ok {
return &monthEncodePlan{}, int(m), true
}
return nil, nil, false
},
},
tm.TryWrapEncodePlanFuncs...,
)
return nil
}),
)
return postgres.New(postgres.Config{Conn: sqlDB}), nil
case "mysql":
return mysql.New(mysql.Config{
DSN: cfg.ConnectionString,
Expand Down
40 changes: 40 additions & 0 deletions pumps/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/vmihailenco/msgpack.v2"
)

Expand Down Expand Up @@ -504,3 +505,42 @@ func TestBuildIndexName(t *testing.T) {
})
}
}

func TestSQLWriteData_PreferSimpleProtocol_Month(t *testing.T) {
skipTestIfNoPostgres(t)

pmp := SQLPump{}
cfg := newSQLConfig(false)
cfg["postgres"] = map[string]interface{}{"prefer_simple_protocol": true}

err := pmp.Init(cfg)
if err != nil {
t.Fatal("SQL Pump couldn't be initialized with err:", err)
}

defer func() {
require.NoError(t, pmp.db.Migrator().DropTable(analytics.SQLTable))
}()

rec := analytics.AnalyticsRecord{
APIID: "api-simple-proto",
OrgID: "org-simple-proto",
TimeStamp: time.Now(),
Month: time.May,
}

errWrite := pmp.WriteData(context.TODO(), []interface{}{rec})
if errWrite != nil {
t.Fatal("SQL Pump couldn't write records with err:", errWrite)
}

var dbRecords []analytics.AnalyticsRecord
err = pmp.db.Table(analytics.SQLTable).Find(&dbRecords).Error
if err != nil {
t.Fatal("couldn't read records back:", err)
}

if assert.Equal(t, 1, len(dbRecords), "expected 1 record in DB -- insert likely failed due to pgx v5 time.Month encoding bug") {
assert.Equal(t, time.May, dbRecords[0].Month, "month should round-trip as integer 5, not a string")
}
}
Loading