Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/en/resources/sources/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ project: "my-project-id"
# - "https://www.googleapis.com/auth/bigquery"
# - "https://www.googleapis.com/auth/drive.readonly"
# maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50.
# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes).
```

Initialize a BigQuery source that uses the client's access token:
Expand All @@ -160,6 +161,7 @@ useClientOAuth: true
# - "https://www.googleapis.com/auth/bigquery"
# - "https://www.googleapis.com/auth/drive.readonly"
# maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50.
# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes).
```

## Reference
Expand All @@ -175,3 +177,4 @@ useClientOAuth: true
| scopes | []string | false | A list of OAuth 2.0 scopes to use for the credentials. If not provided, default scopes are used. |
| impersonateServiceAccount | string | false | Service account email to impersonate when making BigQuery and Dataplex API calls. The authenticated principal must have the `roles/iam.serviceAccountTokenCreator` role on the target service account. [Learn More](https://cloud.google.com/iam/docs/service-account-impersonation) |
| maxQueryResultRows | int | false | The maximum number of rows to return from a query. Defaults to 50. |
| maximumBytesBilled | int64 | false | The maximum bytes billed per query. When set, queries that exceed this limit fail before executing. |
10 changes: 10 additions & 0 deletions internal/sources/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Config struct {
ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"`
Scopes StringOrStringSlice `yaml:"scopes"`
MaxQueryResultRows int `yaml:"maxQueryResultRows"`
MaximumBytesBilled int64 `yaml:"maximumBytesBilled" validate:"gte=0"`
}

// StringOrStringSlice is a custom type that can unmarshal both a single string
Expand Down Expand Up @@ -157,6 +158,7 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
RestService: restService,
TokenSource: tokenSource,
MaxQueryResultRows: r.MaxQueryResultRows,
MaximumBytesBilled: r.MaximumBytesBilled,
ClientCreator: clientCreator,
AuthTokenHeaderName: "Authorization",
}
Expand Down Expand Up @@ -291,6 +293,7 @@ type Source struct {
TokenSource oauth2.TokenSource
AuthTokenHeaderName string
MaxQueryResultRows int
MaximumBytesBilled int64
ClientCreator BigqueryClientCreator
AllowedDatasets map[string]struct{}
sessionMutex sync.Mutex
Expand Down Expand Up @@ -472,6 +475,10 @@ func (s *Source) GetMaxQueryResultRows() int {
return s.MaxQueryResultRows
}

func (s *Source) GetMaximumBytesBilled() int64 {
return s.MaximumBytesBilled
}

func (s *Source) BigQueryClientCreator() BigqueryClientCreator {
return s.ClientCreator
}
Expand Down Expand Up @@ -572,6 +579,9 @@ func (s *Source) RunSQL(ctx context.Context, bqClient *bigqueryapi.Client, state
if connProps != nil {
query.ConnectionProperties = connProps
}
if s.MaximumBytesBilled > 0 {
query.MaxBytesBilled = s.MaximumBytesBilled
}

// This block handles SELECT statements, which return a row set.
// We iterate through the results, convert each row into a map of
Expand Down
84 changes: 84 additions & 0 deletions internal/sources/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ func TestParseFromYamlBigQuery(t *testing.T) {
},
},
},
{
desc: "with maximum bytes billed example",
in: `
kind: sources
name: my-instance
type: bigquery
project: my-project
location: us
maximumBytesBilled: 10737418240
`,
want: map[string]sources.SourceConfig{
"my-instance": bigquery.Config{
Name: "my-instance",
Type: bigquery.SourceType,
Project: "my-project",
Location: "us",
MaximumBytesBilled: 10737418240,
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
Expand Down Expand Up @@ -279,6 +299,17 @@ func TestFailParseFromYaml(t *testing.T) {
`,
err: "error unmarshaling sources: unable to parse source \"my-instance\" as \"bigquery\": Key: 'Config.Project' Error:Field validation for 'Project' failed on the 'required' tag",
},
{
desc: "negative maximum bytes billed",
in: `
kind: sources
name: my-instance
type: bigquery
project: my-project
maximumBytesBilled: -1
`,
err: "error unmarshaling sources: unable to parse source \"my-instance\" as \"bigquery\": [1:21] Key: 'Config.MaximumBytesBilled' Error:Field validation for 'MaximumBytesBilled' failed on the 'gte' tag\n> 1 | maximumBytesBilled: -1\n ^\n 2 | name: my-instance\n 3 | project: my-project\n 4 | type: bigquery",
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
Expand Down Expand Up @@ -347,6 +378,59 @@ func TestInitialize_MaxQueryResultRows(t *testing.T) {
}
}

func TestInitialize_MaximumBytesBilled(t *testing.T) {
ctx, err := testutils.ContextWithNewLogger()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
ctx = util.WithUserAgent(ctx, "test-agent")
tracer := noop.NewTracerProvider().Tracer("")

tcs := []struct {
desc string
cfg bigquery.Config
want int64
}{
{
desc: "default value",
cfg: bigquery.Config{
Name: "test-default",
Type: bigquery.SourceType,
Project: "test-project",
UseClientOAuth: "true",
},
want: 0,
},
{
desc: "configured value",
cfg: bigquery.Config{
Name: "test-configured",
Type: bigquery.SourceType,
Project: "test-project",
UseClientOAuth: "true",
MaximumBytesBilled: 10737418240,
},
want: 10737418240,
},
}

for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
src, err := tc.cfg.Initialize(ctx, tracer)
if err != nil {
t.Fatalf("Initialize failed: %v", err)
}
bqSrc, ok := src.(*bigquery.Source)
if !ok {
t.Fatalf("Expected *bigquery.Source, got %T", src)
}
if bqSrc.MaximumBytesBilled != tc.want {
t.Errorf("MaximumBytesBilled = %d, want %d", bqSrc.MaximumBytesBilled, tc.want)
}
})
}
}

func TestNormalizeValue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type compatibleSource interface {
BigQueryClient() *bigqueryapi.Client
UseClientAuthorization() bool
GetAuthTokenHeaderName() string
GetMaximumBytesBilled() int64
IsDatasetAllowed(projectID, datasetID string) bool
BigQueryAllowedDatasets() []string
BigQuerySession() bigqueryds.BigQuerySessionProvider
Expand Down Expand Up @@ -217,7 +218,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
{Key: "session_id", Value: session.ID},
}
}
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps)
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps, source.GetMaximumBytesBilled())
if err != nil {
return nil, util.ProcessGcpError(err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/tools/bigquery/bigquerycommon/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// DryRunQuery performs a dry run of the SQL query to validate it and get metadata.
func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty) (*bigqueryrestapi.Job, error) {
func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty, maximumBytesBilled int64) (*bigqueryrestapi.Job, error) {
useLegacySql := false

restConnProps := make([]*bigqueryrestapi.ConnectionProperty, len(connProps))
Expand All @@ -46,6 +46,7 @@ func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, proj
UseLegacySql: &useLegacySql,
ConnectionProperties: restConnProps,
QueryParameters: params,
MaximumBytesBilled: maximumBytesBilled,
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type compatibleSource interface {
BigQueryWriteMode() string
UseClientAuthorization() bool
GetAuthTokenHeaderName() string
GetMaximumBytesBilled() int64
IsDatasetAllowed(projectID, datasetID string) bool
BigQueryAllowedDatasets() []string
RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error)
Expand Down Expand Up @@ -187,7 +188,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
}

dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps)
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps, source.GetMaximumBytesBilled())
if err != nil {
return nil, util.NewClientServerError("query validation failed", http.StatusInternalServerError, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type compatibleSource interface {
BigQueryClient() *bigqueryapi.Client
UseClientAuthorization() bool
GetAuthTokenHeaderName() string
GetMaximumBytesBilled() int64
IsDatasetAllowed(projectID, datasetID string) bool
BigQueryAllowedDatasets() []string
BigQuerySession() bigqueryds.BigQuerySessionProvider
Expand Down Expand Up @@ -194,7 +195,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
{Key: "session_id", Value: session.ID},
}
}
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps)
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps, source.GetMaximumBytesBilled())
if err != nil {
return nil, util.ProcessGcpError(err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/tools/bigquery/bigquerysql/bigquerysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type compatibleSource interface {
BigQuerySession() bigqueryds.BigQuerySessionProvider
UseClientAuthorization() bool
GetAuthTokenHeaderName() string
GetMaximumBytesBilled() int64
RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error)
RunSQL(context.Context, *bigqueryapi.Client, string, string, []bigqueryapi.QueryParameter, []*bigqueryapi.ConnectionProperty) (any, error)
}
Expand Down Expand Up @@ -205,7 +206,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, util.NewClientServerError("failed to retrieve BigQuery client", http.StatusInternalServerError, err)
}

dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps)
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps, source.GetMaximumBytesBilled())
if err != nil {
return nil, util.ProcessGcpError(err)
}
Expand Down