diff --git a/docs/en/integrations/bigquery/source.md b/docs/en/integrations/bigquery/source.md index ae93fbd06823..c0334131290f 100644 --- a/docs/en/integrations/bigquery/source.md +++ b/docs/en/integrations/bigquery/source.md @@ -113,6 +113,7 @@ project: "my-project-id" # - "https://www.googleapis.com/auth/bigquery" # - "https://www.googleapis.com/auth/drive.readonly" # maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50. +# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes). ``` Initialize a BigQuery source that uses the client's access token: @@ -133,6 +134,7 @@ useClientOAuth: true # - "https://www.googleapis.com/auth/bigquery" # - "https://www.googleapis.com/auth/drive.readonly" # maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50. +# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes). ``` ## Reference @@ -148,3 +150,4 @@ useClientOAuth: true | scopes | []string | false | A list of OAuth 2.0 scopes to use for the credentials. If not provided, default scopes are used. | | impersonateServiceAccount | string | false | Service account email to impersonate when making BigQuery and Dataplex API calls. The authenticated principal must have the `roles/iam.serviceAccountTokenCreator` role on the target service account. [Learn More](https://cloud.google.com/iam/docs/service-account-impersonation) | | maxQueryResultRows | int | false | The maximum number of rows to return from a query. Defaults to 50. | +| maximumBytesBilled | int64 | false | The maximum bytes billed per query. When set, queries that exceed this limit fail before executing. | diff --git a/internal/sources/bigquery/bigquery.go b/internal/sources/bigquery/bigquery.go index 73e11fdbd2da..297c1c85e5b2 100644 --- a/internal/sources/bigquery/bigquery.go +++ b/internal/sources/bigquery/bigquery.go @@ -90,6 +90,7 @@ type Config struct { ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"` Scopes StringOrStringSlice `yaml:"scopes"` MaxQueryResultRows int `yaml:"maxQueryResultRows"` + MaximumBytesBilled int64 `yaml:"maximumBytesBilled" validate:"gte=0"` } // StringOrStringSlice is a custom type that can unmarshal both a single string @@ -157,6 +158,7 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So RestService: restService, TokenSource: tokenSource, MaxQueryResultRows: r.MaxQueryResultRows, + MaximumBytesBilled: r.MaximumBytesBilled, ClientCreator: clientCreator, AuthTokenHeaderName: "Authorization", } @@ -291,6 +293,7 @@ type Source struct { TokenSource oauth2.TokenSource AuthTokenHeaderName string MaxQueryResultRows int + MaximumBytesBilled int64 ClientCreator BigqueryClientCreator AllowedDatasets map[string]struct{} sessionMutex sync.Mutex @@ -472,6 +475,10 @@ func (s *Source) GetMaxQueryResultRows() int { return s.MaxQueryResultRows } +func (s *Source) GetMaximumBytesBilled() int64 { + return s.MaximumBytesBilled +} + func (s *Source) BigQueryClientCreator() BigqueryClientCreator { return s.ClientCreator } @@ -572,6 +579,9 @@ func (s *Source) RunSQL(ctx context.Context, bqClient *bigqueryapi.Client, state if connProps != nil { query.ConnectionProperties = connProps } + if s.MaximumBytesBilled > 0 { + query.MaxBytesBilled = s.MaximumBytesBilled + } // This block handles SELECT statements, which return a row set. // We iterate through the results, convert each row into a map of diff --git a/internal/sources/bigquery/bigquery_test.go b/internal/sources/bigquery/bigquery_test.go index ae182f39b102..399ec9078a09 100644 --- a/internal/sources/bigquery/bigquery_test.go +++ b/internal/sources/bigquery/bigquery_test.go @@ -237,6 +237,26 @@ func TestParseFromYamlBigQuery(t *testing.T) { }, }, }, + { + desc: "with maximum bytes billed example", + in: ` + kind: source + name: my-instance + type: bigquery + project: my-project + location: us + maximumBytesBilled: 10737418240 + `, + want: map[string]sources.SourceConfig{ + "my-instance": bigquery.Config{ + Name: "my-instance", + Type: bigquery.SourceType, + Project: "my-project", + Location: "us", + MaximumBytesBilled: 10737418240, + }, + }, + }, } for _, tc := range tcs { t.Run(tc.desc, func(t *testing.T) { @@ -279,6 +299,17 @@ func TestFailParseFromYaml(t *testing.T) { `, err: "error unmarshaling source: unable to parse source \"my-instance\" as \"bigquery\": Key: 'Config.Project' Error:Field validation for 'Project' failed on the 'required' tag", }, + { + desc: "negative maximum bytes billed", + in: ` + kind: source + name: my-instance + type: bigquery + project: my-project + maximumBytesBilled: -1 + `, + err: "error unmarshaling source: unable to parse source \"my-instance\" as \"bigquery\": [1:21] Key: 'Config.MaximumBytesBilled' Error:Field validation for 'MaximumBytesBilled' failed on the 'gte' tag\n> 1 | maximumBytesBilled: -1\n ^\n 2 | name: my-instance\n 3 | project: my-project\n 4 | type: bigquery", + }, } for _, tc := range tcs { t.Run(tc.desc, func(t *testing.T) { @@ -347,6 +378,59 @@ func TestInitialize_MaxQueryResultRows(t *testing.T) { } } +func TestInitialize_MaximumBytesBilled(t *testing.T) { + ctx, err := testutils.ContextWithNewLogger() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + ctx = util.WithUserAgent(ctx, "test-agent") + tracer := noop.NewTracerProvider().Tracer("") + + tcs := []struct { + desc string + cfg bigquery.Config + want int64 + }{ + { + desc: "default value", + cfg: bigquery.Config{ + Name: "test-default", + Type: bigquery.SourceType, + Project: "test-project", + UseClientOAuth: "true", + }, + want: 0, + }, + { + desc: "configured value", + cfg: bigquery.Config{ + Name: "test-configured", + Type: bigquery.SourceType, + Project: "test-project", + UseClientOAuth: "true", + MaximumBytesBilled: 10737418240, + }, + want: 10737418240, + }, + } + + for _, tc := range tcs { + t.Run(tc.desc, func(t *testing.T) { + src, err := tc.cfg.Initialize(ctx, tracer) + if err != nil { + t.Fatalf("Initialize failed: %v", err) + } + bqSrc, ok := src.(*bigquery.Source) + if !ok { + t.Fatalf("Expected *bigquery.Source, got %T", src) + } + if bqSrc.MaximumBytesBilled != tc.want { + t.Errorf("MaximumBytesBilled = %d, want %d", bqSrc.MaximumBytesBilled, tc.want) + } + }) + } +} + func TestNormalizeValue(t *testing.T) { tests := []struct { name string diff --git a/internal/tools/bigquery/bigqueryanalyzecontribution/bigqueryanalyzecontribution.go b/internal/tools/bigquery/bigqueryanalyzecontribution/bigqueryanalyzecontribution.go index bde33415d947..4ba6d0650915 100644 --- a/internal/tools/bigquery/bigqueryanalyzecontribution/bigqueryanalyzecontribution.go +++ b/internal/tools/bigquery/bigqueryanalyzecontribution/bigqueryanalyzecontribution.go @@ -53,6 +53,7 @@ type compatibleSource interface { BigQueryClient() *bigqueryapi.Client UseClientAuthorization() bool GetAuthTokenHeaderName() string + GetMaximumBytesBilled() int64 IsDatasetAllowed(projectID, datasetID string) bool BigQueryAllowedDatasets() []string BigQuerySession() bigqueryds.BigQuerySessionProvider @@ -219,7 +220,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para {Key: "session_id", Value: session.ID}, } } - dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps) + dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps, source.GetMaximumBytesBilled()) if err != nil { return nil, util.ProcessGcpError(err) } diff --git a/internal/tools/bigquery/bigquerycommon/util.go b/internal/tools/bigquery/bigquerycommon/util.go index 4d19c85a712b..7e3a3baa772a 100644 --- a/internal/tools/bigquery/bigquerycommon/util.go +++ b/internal/tools/bigquery/bigquerycommon/util.go @@ -26,7 +26,7 @@ import ( ) // DryRunQuery performs a dry run of the SQL query to validate it and get metadata. -func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty) (*bigqueryrestapi.Job, error) { +func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty, maximumBytesBilled int64) (*bigqueryrestapi.Job, error) { useLegacySql := false restConnProps := make([]*bigqueryrestapi.ConnectionProperty, len(connProps)) @@ -46,6 +46,7 @@ func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, proj UseLegacySql: &useLegacySql, ConnectionProperties: restConnProps, QueryParameters: params, + MaximumBytesBilled: maximumBytesBilled, }, }, } diff --git a/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go b/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go index 4fdc89908835..ac7c27848e4d 100644 --- a/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go +++ b/internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go @@ -55,6 +55,7 @@ type compatibleSource interface { BigQueryWriteMode() string UseClientAuthorization() bool GetAuthTokenHeaderName() string + GetMaximumBytesBilled() int64 IsDatasetAllowed(projectID, datasetID string) bool BigQueryAllowedDatasets() []string RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error) @@ -189,7 +190,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para } } - dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps) + dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps, source.GetMaximumBytesBilled()) if err != nil { return nil, util.NewClientServerError("query validation failed", http.StatusInternalServerError, err) } diff --git a/internal/tools/bigquery/bigqueryforecast/bigqueryforecast.go b/internal/tools/bigquery/bigqueryforecast/bigqueryforecast.go index 38c89fe93bb5..422cfaeb601e 100644 --- a/internal/tools/bigquery/bigqueryforecast/bigqueryforecast.go +++ b/internal/tools/bigquery/bigqueryforecast/bigqueryforecast.go @@ -52,6 +52,7 @@ type compatibleSource interface { BigQueryClient() *bigqueryapi.Client UseClientAuthorization() bool GetAuthTokenHeaderName() string + GetMaximumBytesBilled() int64 IsDatasetAllowed(projectID, datasetID string) bool BigQueryAllowedDatasets() []string BigQuerySession() bigqueryds.BigQuerySessionProvider @@ -196,7 +197,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para {Key: "session_id", Value: session.ID}, } } - dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps) + dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps, source.GetMaximumBytesBilled()) if err != nil { return nil, util.ProcessGcpError(err) } diff --git a/internal/tools/bigquery/bigquerysql/bigquerysql.go b/internal/tools/bigquery/bigquerysql/bigquerysql.go index 58c8cd40c49a..add7a351da37 100644 --- a/internal/tools/bigquery/bigquerysql/bigquerysql.go +++ b/internal/tools/bigquery/bigquerysql/bigquerysql.go @@ -54,6 +54,7 @@ type compatibleSource interface { BigQuerySession() bigqueryds.BigQuerySessionProvider UseClientAuthorization() bool GetAuthTokenHeaderName() string + GetMaximumBytesBilled() int64 RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error) RunSQL(context.Context, *bigqueryapi.Client, string, string, []bigqueryapi.QueryParameter, []*bigqueryapi.ConnectionProperty) (any, error) } @@ -225,7 +226,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para return nil, util.NewClientServerError("failed to retrieve BigQuery client", http.StatusInternalServerError, err) } - dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps) + dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps, source.GetMaximumBytesBilled()) if err != nil { return nil, util.ProcessGcpError(err) }