Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/integrations/alloydb/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,4 @@ The interface is identical, so there's no additional configuration required on t
| user | string | false | Name of the Postgres user to connect as (e.g. "my-pg-user"). Defaults to IAM auth using [ADC][adc] email if unspecified. |
| password | string | false | Password of the Postgres user (e.g. "my-password"). Defaults to attempting IAM authentication if unspecified. |
| ipType | string | false | IP Type of the AlloyDB instance; must be one of `public` or `private`. Default: `public`. |
| queryExecMode | string | false | pgx query execution mode. Valid values: `cache_statement` (default), `cache_describe`, `describe_exec`, `exec`, `simple_protocol`. Useful with connection poolers that don't support prepared statement caching. |
1 change: 1 addition & 0 deletions docs/en/integrations/cloud-sql-pg/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,4 @@ The interface is identical, so there's no additional configuration required on t
| user | string | false | Name of the Postgres user to connect as (e.g. "my-pg-user"). Defaults to IAM auth using [ADC][adc] email if unspecified. |
| password | string | false | Password of the Postgres user (e.g. "my-password"). Defaults to attempting IAM authentication if unspecified. |
| ipType | string | false | IP Type of the Cloud SQL instance; must be one of `public`, `private`, or `psc`. Default: `public`. |
| queryExecMode | string | false | pgx query execution mode. Valid values: `cache_statement` (default), `cache_describe`, `describe_exec`, `exec`, `simple_protocol`. Useful with connection poolers that don't support prepared statement caching. |
31 changes: 19 additions & 12 deletions internal/sources/alloydbpg/alloydb_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,25 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources
}

type Config struct {
Name string `yaml:"name" validate:"required"`
Type string `yaml:"type" validate:"required"`
Project string `yaml:"project" validate:"required"`
Region string `yaml:"region" validate:"required"`
Cluster string `yaml:"cluster" validate:"required"`
Instance string `yaml:"instance" validate:"required"`
IPType sources.IPType `yaml:"ipType" validate:"required"`
User string `yaml:"user"`
Password string `yaml:"password"`
Database string `yaml:"database" validate:"required"`
Name string `yaml:"name" validate:"required"`
Type string `yaml:"type" validate:"required"`
Project string `yaml:"project" validate:"required"`
Region string `yaml:"region" validate:"required"`
Cluster string `yaml:"cluster" validate:"required"`
Instance string `yaml:"instance" validate:"required"`
IPType sources.IPType `yaml:"ipType" validate:"required"`
User string `yaml:"user"`
Password string `yaml:"password"`
Database string `yaml:"database" validate:"required"`
QueryExecMode string `yaml:"queryExecMode" validate:"omitempty,oneof=cache_statement cache_describe describe_exec exec simple_protocol"`
}

func (r Config) SourceConfigType() string {
return SourceType
}

func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
pool, err := initAlloyDBPgConnectionPool(ctx, tracer, r.Name, r.Project, r.Region, r.Cluster, r.Instance, r.IPType.String(), r.User, r.Password, r.Database)
pool, err := initAlloyDBPgConnectionPool(ctx, tracer, r.Name, r.Project, r.Region, r.Cluster, r.Instance, r.IPType.String(), r.User, r.Password, r.Database, r.QueryExecMode)
if err != nil {
return nil, fmt.Errorf("unable to create pool: %w", err)
}
Expand Down Expand Up @@ -183,7 +184,7 @@ func getConnectionConfig(ctx context.Context, user, pass, dbname string) (string
return dsn, useIAM, nil
}

func initAlloyDBPgConnectionPool(ctx context.Context, tracer trace.Tracer, name, project, region, cluster, instance, ipType, user, pass, dbname string) (*pgxpool.Pool, error) {
func initAlloyDBPgConnectionPool(ctx context.Context, tracer trace.Tracer, name, project, region, cluster, instance, ipType, user, pass, dbname, queryExecMode string) (*pgxpool.Pool, error) {
//nolint:all // Reassigned ctx
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name)
defer span.End()
Expand All @@ -197,6 +198,12 @@ func initAlloyDBPgConnectionPool(ctx context.Context, tracer trace.Tracer, name,
if err != nil {
return nil, fmt.Errorf("unable to parse connection uri: %w", err)
}
execMode, err := sources.ParsePGXQueryExecMode(queryExecMode)
if err != nil {
return nil, err
}
config.ConnConfig.DefaultQueryExecMode = execMode

// Create a new dialer with options
userAgent, err := util.UserAgentFromContext(ctx)
if err != nil {
Expand Down
48 changes: 48 additions & 0 deletions internal/sources/alloydbpg/alloydb_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,37 @@ func TestParseFromYamlAlloyDBPg(t *testing.T) {
},
},
},
{
desc: "example with query exec mode",
in: `
kind: source
name: my-pg-instance
type: alloydb-postgres
project: my-project
region: my-region
cluster: my-cluster
instance: my-instance
database: my_db
user: my_user
password: my_pass
queryExecMode: simple_protocol
`,
want: map[string]sources.SourceConfig{
"my-pg-instance": alloydbpg.Config{
Name: "my-pg-instance",
Type: alloydbpg.SourceType,
Project: "my-project",
Region: "my-region",
Cluster: "my-cluster",
Instance: "my-instance",
IPType: "public",
Database: "my_db",
User: "my_user",
Password: "my_pass",
QueryExecMode: "simple_protocol",
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
Expand Down Expand Up @@ -190,6 +221,23 @@ func TestFailParseFromYaml(t *testing.T) {
`,
err: "error unmarshaling source: unable to parse source \"my-pg-instance\" as \"alloydb-postgres\": Key: 'Config.Project' Error:Field validation for 'Project' failed on the 'required' tag",
},
{
desc: "invalid query exec mode",
in: `
kind: source
name: my-pg-instance
type: alloydb-postgres
project: my-project
region: my-region
cluster: my-cluster
instance: my-instance
database: my_db
user: my_user
password: my_pass
queryExecMode: invalid_mode
`,
err: "error unmarshaling source: unable to parse source \"my-pg-instance\" as \"alloydb-postgres\": [7:16] Key: 'Config.QueryExecMode' Error:Field validation for 'QueryExecMode' failed on the 'oneof' tag\n 4 | name: my-pg-instance\n 5 | password: my_pass\n 6 | project: my-project\n> 7 | queryExecMode: invalid_mode\n ^\n 8 | region: my-region\n 9 | type: alloydb-postgres\n 10 | user: my_user",
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
Expand Down
28 changes: 17 additions & 11 deletions internal/sources/cloudsqlpg/cloud_sql_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,24 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources
}

type Config struct {
Name string `yaml:"name" validate:"required"`
Type string `yaml:"type" validate:"required"`
Project string `yaml:"project" validate:"required"`
Region string `yaml:"region" validate:"required"`
Instance string `yaml:"instance" validate:"required"`
IPType sources.IPType `yaml:"ipType" validate:"required"`
Database string `yaml:"database" validate:"required"`
User string `yaml:"user"`
Password string `yaml:"password"`
Name string `yaml:"name" validate:"required"`
Type string `yaml:"type" validate:"required"`
Project string `yaml:"project" validate:"required"`
Region string `yaml:"region" validate:"required"`
Instance string `yaml:"instance" validate:"required"`
IPType sources.IPType `yaml:"ipType" validate:"required"`
Database string `yaml:"database" validate:"required"`
User string `yaml:"user"`
Password string `yaml:"password"`
QueryExecMode string `yaml:"queryExecMode" validate:"omitempty,oneof=cache_statement cache_describe describe_exec exec simple_protocol"`
}

func (r Config) SourceConfigType() string {
return SourceType
}

func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
pool, err := initCloudSQLPgConnectionPool(ctx, tracer, r.Name, r.Project, r.Region, r.Instance, r.IPType.String(), r.User, r.Password, r.Database)
pool, err := initCloudSQLPgConnectionPool(ctx, tracer, r.Name, r.Project, r.Region, r.Instance, r.IPType.String(), r.User, r.Password, r.Database, r.QueryExecMode)
if err != nil {
return nil, fmt.Errorf("unable to create pool: %w", err)
}
Expand Down Expand Up @@ -170,7 +171,7 @@ func getConnectionConfig(ctx context.Context, user, pass, dbname string) (string
return dsn, useIAM, nil
}

func initCloudSQLPgConnectionPool(ctx context.Context, tracer trace.Tracer, name, project, region, instance, ipType, user, pass, dbname string) (*pgxpool.Pool, error) {
func initCloudSQLPgConnectionPool(ctx context.Context, tracer trace.Tracer, name, project, region, instance, ipType, user, pass, dbname, queryExecMode string) (*pgxpool.Pool, error) {
//nolint:all // Reassigned ctx
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name)
defer span.End()
Expand All @@ -185,6 +186,11 @@ func initCloudSQLPgConnectionPool(ctx context.Context, tracer trace.Tracer, name
if err != nil {
return nil, fmt.Errorf("unable to parse connection uri: %w", err)
}
execMode, err := sources.ParsePGXQueryExecMode(queryExecMode)
if err != nil {
return nil, err
}
config.ConnConfig.DefaultQueryExecMode = execMode

// Create a new dialer with options
userAgent, err := util.UserAgentFromContext(ctx)
Expand Down
45 changes: 45 additions & 0 deletions internal/sources/cloudsqlpg/cloud_sql_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,35 @@ func TestParseFromYamlCloudSQLPg(t *testing.T) {
},
},
},
{
desc: "example with query exec mode",
in: `
kind: source
name: my-pg-instance
type: cloud-sql-postgres
project: my-project
region: my-region
instance: my-instance
database: my_db
user: my_user
password: my_pass
queryExecMode: simple_protocol
`,
want: map[string]sources.SourceConfig{
"my-pg-instance": cloudsqlpg.Config{
Name: "my-pg-instance",
Type: cloudsqlpg.SourceType,
Project: "my-project",
Region: "my-region",
Instance: "my-instance",
IPType: "public",
Database: "my_db",
User: "my_user",
Password: "my_pass",
QueryExecMode: "simple_protocol",
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
Expand Down Expand Up @@ -209,6 +238,22 @@ func TestFailParseFromYaml(t *testing.T) {
`,
err: "error unmarshaling source: unable to parse source \"my-pg-instance\" as \"cloud-sql-postgres\": Key: 'Config.Project' Error:Field validation for 'Project' failed on the 'required' tag",
},
{
desc: "invalid query exec mode",
in: `
kind: source
name: my-pg-instance
type: cloud-sql-postgres
project: my-project
region: my-region
instance: my-instance
database: my_db
user: my_user
password: my_pass
queryExecMode: invalid_mode
`,
err: "error unmarshaling source: unable to parse source \"my-pg-instance\" as \"cloud-sql-postgres\": [6:16] Key: 'Config.QueryExecMode' Error:Field validation for 'QueryExecMode' failed on the 'oneof' tag\n 3 | name: my-pg-instance\n 4 | password: my_pass\n 5 | project: my-project\n> 6 | queryExecMode: invalid_mode\n ^\n 7 | region: my-region\n 8 | type: cloud-sql-postgres\n 9 | user: my_user",
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
Expand Down
38 changes: 38 additions & 0 deletions internal/sources/pgx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sources

import (
"fmt"

"github.com/jackc/pgx/v5"
)

func ParsePGXQueryExecMode(queryExecMode string) (pgx.QueryExecMode, error) {
switch queryExecMode {
case "", "cache_statement":
return pgx.QueryExecModeCacheStatement, nil
case "cache_describe":
return pgx.QueryExecModeCacheDescribe, nil
case "describe_exec":
return pgx.QueryExecModeDescribeExec, nil
case "exec":
return pgx.QueryExecModeExec, nil
case "simple_protocol":
return pgx.QueryExecModeSimpleProtocol, nil
default:
return 0, fmt.Errorf("invalid queryExecMode %q: must be one of %q, %q, %q, %q, or %q", queryExecMode, "cache_statement", "cache_describe", "describe_exec", "exec", "simple_protocol")
}
}
15 changes: 1 addition & 14 deletions internal/sources/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,5 @@ func BuildPostgresURL(host, port, user, pass, dbname string, queryParams map[str
}

func ParseQueryExecMode(queryExecMode string) (pgx.QueryExecMode, error) {
switch queryExecMode {
case "", "cache_statement":
return pgx.QueryExecModeCacheStatement, nil
case "cache_describe":
return pgx.QueryExecModeCacheDescribe, nil
case "describe_exec":
return pgx.QueryExecModeDescribeExec, nil
case "exec":
return pgx.QueryExecModeExec, nil
case "simple_protocol":
return pgx.QueryExecModeSimpleProtocol, nil
default:
return 0, fmt.Errorf("invalid queryExecMode %q: must be one of %q, %q, %q, %q, or %q", queryExecMode, "cache_statement", "cache_describe", "describe_exec", "exec", "simple_protocol")
}
return sources.ParsePGXQueryExecMode(queryExecMode)
}