Skip to content

Commit c9cebcc

Browse files
authored
feat(bigquery): support per-job reservation assignment (#12078)
* feat(bigquery): support per-job reservation assignment This PR adds support for explicitly selecting a reservation to use when executing a job. It also includes support for optimized query execution (running via jobs.query vs jobs.insert).
1 parent d544809 commit c9cebcc

File tree

8 files changed

+63
-4
lines changed

8 files changed

+63
-4
lines changed

bigquery/copy.go

+9
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ type CopyConfig struct {
7373
// Experimental: this option is experimental and may be modified or removed in future versions,
7474
// regardless of any other documented package stability guarantees.
7575
JobTimeout time.Duration
76+
77+
// The reservation that job would use. User can specify a reservation to
78+
// execute the job. If reservation is not set, reservation is determined
79+
// based on the rules defined by the reservation assignments. The expected
80+
// format is
81+
// `projects/{project}/locations/{location}/reservations/{reservation}`.
82+
Reservation string
7683
}
7784

7885
func (c *CopyConfig) toBQ() *bq.JobConfiguration {
@@ -91,6 +98,7 @@ func (c *CopyConfig) toBQ() *bq.JobConfiguration {
9198
OperationType: string(c.OperationType),
9299
},
93100
JobTimeoutMs: c.JobTimeout.Milliseconds(),
101+
Reservation: c.Reservation,
94102
}
95103
}
96104

@@ -103,6 +111,7 @@ func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig {
103111
DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration),
104112
OperationType: TableCopyOperationType(q.Copy.OperationType),
105113
JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
114+
Reservation: q.Reservation,
106115
}
107116
for _, t := range q.Copy.SourceTables {
108117
cc.Srcs = append(cc.Srcs, bqToTable(t, c))

bigquery/copy_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,15 @@ func TestCopy(t *testing.T) {
8888
WriteDisposition: WriteTruncate,
8989
DestinationEncryptionConfig: &EncryptionConfig{KMSKeyName: "keyName"},
9090
Labels: map[string]string{"a": "b"},
91+
Reservation: "reservation/1",
9192
},
9293
want: func() *bq.Job {
9394
j := defaultCopyJob()
9495
j.Configuration.Labels = map[string]string{"a": "b"}
9596
j.Configuration.Copy.CreateDisposition = "CREATE_NEVER"
9697
j.Configuration.Copy.WriteDisposition = "WRITE_TRUNCATE"
9798
j.Configuration.Copy.DestinationEncryptionConfiguration = &bq.EncryptionConfiguration{KmsKeyName: "keyName"}
99+
j.Configuration.Reservation = "reservation/1"
98100
return j
99101
}(),
100102
},

bigquery/extract.go

+9
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ type ExtractConfig struct {
5757
// Experimental: this option is experimental and may be modified or removed in future versions,
5858
// regardless of any other documented package stability guarantees.
5959
JobTimeout time.Duration
60+
61+
// The reservation that job would use. User can specify a reservation to
62+
// execute the job. If reservation is not set, reservation is determined
63+
// based on the rules defined by the reservation assignments. The expected
64+
// format is
65+
// `projects/{project}/locations/{location}/reservations/{reservation}`.
66+
Reservation string
6067
}
6168

6269
func (e *ExtractConfig) toBQ() *bq.JobConfiguration {
@@ -77,6 +84,7 @@ func (e *ExtractConfig) toBQ() *bq.JobConfiguration {
7784
UseAvroLogicalTypes: e.UseAvroLogicalTypes,
7885
},
7986
JobTimeoutMs: e.JobTimeout.Milliseconds(),
87+
Reservation: e.Reservation,
8088
}
8189
if e.Src != nil {
8290
cfg.Extract.SourceTable = e.Src.toBQ()
@@ -106,6 +114,7 @@ func bqToExtractConfig(q *bq.JobConfiguration, c *Client) *ExtractConfig {
106114
SrcModel: bqToModel(qe.SourceModel, c),
107115
UseAvroLogicalTypes: qe.UseAvroLogicalTypes,
108116
JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
117+
Reservation: q.Reservation,
109118
}
110119
}
111120

bigquery/extract_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,15 @@ func TestExtract(t *testing.T) {
8585
DisableHeader: true,
8686
Labels: map[string]string{"a": "b"},
8787
JobTimeout: 8 * time.Second,
88+
Reservation: "reservation/1",
8889
},
8990
want: func() *bq.Job {
9091
j := defaultExtractJob()
9192
j.Configuration.Labels = map[string]string{"a": "b"}
9293
j.Configuration.JobTimeoutMs = 8000
9394
f := false
9495
j.Configuration.Extract.PrintHeader = &f
96+
j.Configuration.Reservation = "reservation/1"
9597
return j
9698
}(),
9799
},

bigquery/load.go

+9
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ type LoadConfig struct {
110110
// For more information, see:
111111
// https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#columnnamecharactermap
112112
ColumnNameCharacterMap ColumnNameCharacterMap
113+
114+
// The reservation that job would use. User can specify a reservation to
115+
// execute the job. If reservation is not set, reservation is determined
116+
// based on the rules defined by the reservation assignments. The expected
117+
// format is
118+
// `projects/{project}/locations/{location}/reservations/{reservation}`.
119+
Reservation string
113120
}
114121

115122
func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
@@ -140,6 +147,7 @@ func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
140147
config.Load.ConnectionProperties = append(config.Load.ConnectionProperties, v.toBQ())
141148
}
142149
media := l.Src.populateLoadConfig(config.Load)
150+
config.Reservation = l.Reservation
143151
return config, media
144152
}
145153

@@ -160,6 +168,7 @@ func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
160168
ReferenceFileSchemaURI: q.Load.ReferenceFileSchemaUri,
161169
CreateSession: q.Load.CreateSession,
162170
ColumnNameCharacterMap: ColumnNameCharacterMap(q.Load.ColumnNameCharacterMap),
171+
Reservation: q.Reservation,
163172
}
164173
if q.JobTimeoutMs > 0 {
165174
lc.JobTimeout = time.Duration(q.JobTimeoutMs) * time.Millisecond

bigquery/load_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ func TestLoad(t *testing.T) {
140140
return g
141141
}(),
142142
config: LoadConfig{
143-
JobTimeout: 4 * time.Second,
143+
JobTimeout: 4 * time.Second,
144+
Reservation: "reservation/1",
144145
},
145146
want: func() *bq.Job {
146147
j := defaultLoadJob()
@@ -149,6 +150,7 @@ func TestLoad(t *testing.T) {
149150
j.Configuration.Load.AllowQuotedNewlines = true
150151
j.Configuration.Load.IgnoreUnknownValues = true
151152
j.Configuration.JobTimeoutMs = 4000
153+
j.Configuration.Reservation = "reservation/1"
152154
return j
153155
}(),
154156
},

bigquery/query.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ type QueryConfig struct {
152152

153153
// Force usage of Storage API if client is available. For test scenarios
154154
forceStorageAPI bool
155+
156+
// The reservation that job would use. User can specify a reservation to
157+
// execute the job. If reservation is not set, reservation is determined
158+
// based on the rules defined by the reservation assignments. The expected
159+
// format is
160+
// `projects/{project}/locations/{location}/reservations/{reservation}`.
161+
Reservation string
155162
}
156163

157164
func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
@@ -224,9 +231,10 @@ func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
224231
qconf.ConnectionProperties = bqcp
225232
}
226233
jc := &bq.JobConfiguration{
227-
Labels: qc.Labels,
228-
DryRun: qc.DryRun,
229-
Query: qconf,
234+
Labels: qc.Labels,
235+
DryRun: qc.DryRun,
236+
Reservation: qc.Reservation,
237+
Query: qconf,
230238
}
231239
if qc.JobTimeout > 0 {
232240
jc.JobTimeoutMs = qc.JobTimeout.Milliseconds()
@@ -255,6 +263,7 @@ func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
255263
CreateSession: qq.CreateSession,
256264
}
257265
qc.UseStandardSQL = !qc.UseLegacySQL
266+
qc.Reservation = q.Reservation
258267

259268
if len(qq.TableDefinitions) > 0 {
260269
qc.TableDefinitions = make(map[string]ExternalData)
@@ -469,6 +478,7 @@ func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
469478
UseLegacySql: &pfalse,
470479
MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
471480
RequestId: uid.NewSpace("request", nil).New(),
481+
Reservation: q.Reservation,
472482
Labels: q.Labels,
473483
FormatOptions: &bq.DataFormatOptions{
474484
UseInt64Timestamp: true,

bigquery/query_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,20 @@ func TestQuery(t *testing.T) {
357357
return j
358358
}(),
359359
},
360+
{
361+
dst: c.Dataset("dataset-id").Table("table-id"),
362+
src: &QueryConfig{
363+
Q: "query string",
364+
Reservation: "reservation/1",
365+
DefaultProjectID: "def-project-id",
366+
DefaultDatasetID: "def-dataset-id",
367+
},
368+
want: func() *bq.Job {
369+
j := defaultQueryJob()
370+
j.Configuration.Reservation = "reservation/1"
371+
return j
372+
}(),
373+
},
360374
}
361375
for i, tc := range testCases {
362376
query := c.Query("")
@@ -459,6 +473,7 @@ func TestProbeFastPath(t *testing.T) {
459473
Labels: map[string]string{
460474
"key": "val",
461475
},
476+
Reservation: "reservation/1",
462477
},
463478
wantReq: &bq.QueryRequest{
464479
Query: "foo",
@@ -479,6 +494,7 @@ func TestProbeFastPath(t *testing.T) {
479494
FormatOptions: &bq.DataFormatOptions{
480495
UseInt64Timestamp: true,
481496
},
497+
Reservation: "reservation/1",
482498
},
483499
},
484500
{

0 commit comments

Comments
 (0)