Skip to content

Commit 04d8357

Browse files
committed
feat(bigquery): add maximumBytesBilled source config
1 parent 9598a6a commit 04d8357

File tree

8 files changed

+107
-5
lines changed

8 files changed

+107
-5
lines changed

docs/en/resources/sources/bigquery.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ project: "my-project-id"
140140
# - "https://www.googleapis.com/auth/bigquery"
141141
# - "https://www.googleapis.com/auth/drive.readonly"
142142
# maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50.
143+
# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes).
143144
```
144145

145146
Initialize a BigQuery source that uses the client's access token:
@@ -160,6 +161,7 @@ useClientOAuth: true
160161
# - "https://www.googleapis.com/auth/bigquery"
161162
# - "https://www.googleapis.com/auth/drive.readonly"
162163
# maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50.
164+
# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes).
163165
```
164166

165167
## Reference
@@ -175,3 +177,4 @@ useClientOAuth: true
175177
| scopes | []string | false | A list of OAuth 2.0 scopes to use for the credentials. If not provided, default scopes are used. |
176178
| impersonateServiceAccount | string | false | Service account email to impersonate when making BigQuery and Dataplex API calls. The authenticated principal must have the `roles/iam.serviceAccountTokenCreator` role on the target service account. [Learn More](https://cloud.google.com/iam/docs/service-account-impersonation) |
177179
| maxQueryResultRows | int | false | The maximum number of rows to return from a query. Defaults to 50. |
180+
| maximumBytesBilled | int64 | false | The maximum bytes billed per query. When set, queries that exceed this limit fail before executing. |

internal/sources/bigquery/bigquery.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ type Config struct {
9090
ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"`
9191
Scopes StringOrStringSlice `yaml:"scopes"`
9292
MaxQueryResultRows int `yaml:"maxQueryResultRows"`
93+
MaximumBytesBilled int64 `yaml:"maximumBytesBilled" validate:"gte=0"`
9394
}
9495

9596
// StringOrStringSlice is a custom type that can unmarshal both a single string
@@ -157,6 +158,7 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
157158
RestService: restService,
158159
TokenSource: tokenSource,
159160
MaxQueryResultRows: r.MaxQueryResultRows,
161+
MaximumBytesBilled: r.MaximumBytesBilled,
160162
ClientCreator: clientCreator,
161163
AuthTokenHeaderName: "Authorization",
162164
}
@@ -291,6 +293,7 @@ type Source struct {
291293
TokenSource oauth2.TokenSource
292294
AuthTokenHeaderName string
293295
MaxQueryResultRows int
296+
MaximumBytesBilled int64
294297
ClientCreator BigqueryClientCreator
295298
AllowedDatasets map[string]struct{}
296299
sessionMutex sync.Mutex
@@ -472,6 +475,10 @@ func (s *Source) GetMaxQueryResultRows() int {
472475
return s.MaxQueryResultRows
473476
}
474477

478+
func (s *Source) GetMaximumBytesBilled() int64 {
479+
return s.MaximumBytesBilled
480+
}
481+
475482
func (s *Source) BigQueryClientCreator() BigqueryClientCreator {
476483
return s.ClientCreator
477484
}
@@ -572,6 +579,9 @@ func (s *Source) RunSQL(ctx context.Context, bqClient *bigqueryapi.Client, state
572579
if connProps != nil {
573580
query.ConnectionProperties = connProps
574581
}
582+
if s.MaximumBytesBilled > 0 {
583+
query.MaxBytesBilled = s.MaximumBytesBilled
584+
}
575585

576586
// This block handles SELECT statements, which return a row set.
577587
// We iterate through the results, convert each row into a map of

internal/sources/bigquery/bigquery_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,26 @@ func TestParseFromYamlBigQuery(t *testing.T) {
237237
},
238238
},
239239
},
240+
{
241+
desc: "with maximum bytes billed example",
242+
in: `
243+
kind: sources
244+
name: my-instance
245+
type: bigquery
246+
project: my-project
247+
location: us
248+
maximumBytesBilled: 10737418240
249+
`,
250+
want: map[string]sources.SourceConfig{
251+
"my-instance": bigquery.Config{
252+
Name: "my-instance",
253+
Type: bigquery.SourceType,
254+
Project: "my-project",
255+
Location: "us",
256+
MaximumBytesBilled: 10737418240,
257+
},
258+
},
259+
},
240260
}
241261
for _, tc := range tcs {
242262
t.Run(tc.desc, func(t *testing.T) {
@@ -279,6 +299,17 @@ func TestFailParseFromYaml(t *testing.T) {
279299
`,
280300
err: "error unmarshaling sources: unable to parse source \"my-instance\" as \"bigquery\": Key: 'Config.Project' Error:Field validation for 'Project' failed on the 'required' tag",
281301
},
302+
{
303+
desc: "negative maximum bytes billed",
304+
in: `
305+
kind: sources
306+
name: my-instance
307+
type: bigquery
308+
project: my-project
309+
maximumBytesBilled: -1
310+
`,
311+
err: "error unmarshaling sources: unable to parse source \"my-instance\" as \"bigquery\": [1:21] Key: 'Config.MaximumBytesBilled' Error:Field validation for 'MaximumBytesBilled' failed on the 'gte' tag\n> 1 | maximumBytesBilled: -1\n ^\n 2 | name: my-instance\n 3 | project: my-project\n 4 | type: bigquery",
312+
},
282313
}
283314
for _, tc := range tcs {
284315
t.Run(tc.desc, func(t *testing.T) {
@@ -347,6 +378,59 @@ func TestInitialize_MaxQueryResultRows(t *testing.T) {
347378
}
348379
}
349380

381+
func TestInitialize_MaximumBytesBilled(t *testing.T) {
382+
ctx, err := testutils.ContextWithNewLogger()
383+
if err != nil {
384+
t.Fatalf("unexpected error: %s", err)
385+
}
386+
ctx = util.WithUserAgent(ctx, "test-agent")
387+
tracer := noop.NewTracerProvider().Tracer("")
388+
389+
tcs := []struct {
390+
desc string
391+
cfg bigquery.Config
392+
want int64
393+
}{
394+
{
395+
desc: "default value",
396+
cfg: bigquery.Config{
397+
Name: "test-default",
398+
Type: bigquery.SourceType,
399+
Project: "test-project",
400+
UseClientOAuth: "true",
401+
},
402+
want: 0,
403+
},
404+
{
405+
desc: "configured value",
406+
cfg: bigquery.Config{
407+
Name: "test-configured",
408+
Type: bigquery.SourceType,
409+
Project: "test-project",
410+
UseClientOAuth: "true",
411+
MaximumBytesBilled: 10737418240,
412+
},
413+
want: 10737418240,
414+
},
415+
}
416+
417+
for _, tc := range tcs {
418+
t.Run(tc.desc, func(t *testing.T) {
419+
src, err := tc.cfg.Initialize(ctx, tracer)
420+
if err != nil {
421+
t.Fatalf("Initialize failed: %v", err)
422+
}
423+
bqSrc, ok := src.(*bigquery.Source)
424+
if !ok {
425+
t.Fatalf("Expected *bigquery.Source, got %T", src)
426+
}
427+
if bqSrc.MaximumBytesBilled != tc.want {
428+
t.Errorf("MaximumBytesBilled = %d, want %d", bqSrc.MaximumBytesBilled, tc.want)
429+
}
430+
})
431+
}
432+
}
433+
350434
func TestNormalizeValue(t *testing.T) {
351435
tests := []struct {
352436
name string

internal/tools/bigquery/bigqueryanalyzecontribution/bigqueryanalyzecontribution.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type compatibleSource interface {
5353
BigQueryClient() *bigqueryapi.Client
5454
UseClientAuthorization() bool
5555
GetAuthTokenHeaderName() string
56+
GetMaximumBytesBilled() int64
5657
IsDatasetAllowed(projectID, datasetID string) bool
5758
BigQueryAllowedDatasets() []string
5859
BigQuerySession() bigqueryds.BigQuerySessionProvider
@@ -217,7 +218,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
217218
{Key: "session_id", Value: session.ID},
218219
}
219220
}
220-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps)
221+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps, source.GetMaximumBytesBilled())
221222
if err != nil {
222223
return nil, util.ProcessGcpError(err)
223224
}

internal/tools/bigquery/bigquerycommon/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
)
2727

2828
// DryRunQuery performs a dry run of the SQL query to validate it and get metadata.
29-
func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty) (*bigqueryrestapi.Job, error) {
29+
func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty, maximumBytesBilled int64) (*bigqueryrestapi.Job, error) {
3030
useLegacySql := false
3131

3232
restConnProps := make([]*bigqueryrestapi.ConnectionProperty, len(connProps))
@@ -46,6 +46,7 @@ func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, proj
4646
UseLegacySql: &useLegacySql,
4747
ConnectionProperties: restConnProps,
4848
QueryParameters: params,
49+
MaximumBytesBilled: maximumBytesBilled,
4950
},
5051
},
5152
}

internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type compatibleSource interface {
5555
BigQueryWriteMode() string
5656
UseClientAuthorization() bool
5757
GetAuthTokenHeaderName() string
58+
GetMaximumBytesBilled() int64
5859
IsDatasetAllowed(projectID, datasetID string) bool
5960
BigQueryAllowedDatasets() []string
6061
RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error)
@@ -187,7 +188,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
187188
}
188189
}
189190

190-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps)
191+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps, source.GetMaximumBytesBilled())
191192
if err != nil {
192193
return nil, util.NewClientServerError("query validation failed", http.StatusInternalServerError, err)
193194
}

internal/tools/bigquery/bigqueryforecast/bigqueryforecast.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type compatibleSource interface {
5252
BigQueryClient() *bigqueryapi.Client
5353
UseClientAuthorization() bool
5454
GetAuthTokenHeaderName() string
55+
GetMaximumBytesBilled() int64
5556
IsDatasetAllowed(projectID, datasetID string) bool
5657
BigQueryAllowedDatasets() []string
5758
BigQuerySession() bigqueryds.BigQuerySessionProvider
@@ -194,7 +195,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
194195
{Key: "session_id", Value: session.ID},
195196
}
196197
}
197-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps)
198+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps, source.GetMaximumBytesBilled())
198199
if err != nil {
199200
return nil, util.ProcessGcpError(err)
200201
}

internal/tools/bigquery/bigquerysql/bigquerysql.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type compatibleSource interface {
5353
BigQuerySession() bigqueryds.BigQuerySessionProvider
5454
UseClientAuthorization() bool
5555
GetAuthTokenHeaderName() string
56+
GetMaximumBytesBilled() int64
5657
RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error)
5758
RunSQL(context.Context, *bigqueryapi.Client, string, string, []bigqueryapi.QueryParameter, []*bigqueryapi.ConnectionProperty) (any, error)
5859
}
@@ -205,7 +206,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
205206
return nil, util.NewClientServerError("failed to retrieve BigQuery client", http.StatusInternalServerError, err)
206207
}
207208

208-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps)
209+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps, source.GetMaximumBytesBilled())
209210
if err != nil {
210211
return nil, util.ProcessGcpError(err)
211212
}

0 commit comments

Comments
 (0)