Skip to content

Commit cab5beb

Browse files
authored
Merge pull request #513 from m-lab/sandbox-dataset
choose dataset based on datatype and batch
2 parents a11dbee + 88668da commit cab5beb

File tree

4 files changed

+46
-11
lines changed

4 files changed

+46
-11
lines changed

bq/insert.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func NewInserter(dataset string, dt etl.DataType, partition time.Time) (etl.Inse
6060
suffix = "$" + partition.Format("20060102")
6161
}
6262

63+
// TODO - remove dataset parameter.
64+
dataset = etl.DataTypeToDataset(dt)
65+
6366
return NewBQInserter(
6467
etl.InserterParams{Dataset: dataset, Table: table, Suffix: suffix,
6568
Timeout: 15 * time.Minute, BufferSize: dt.BQBufferSize(), RetryDelay: 30 * time.Second},

cmd/etl_worker/etl_worker.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,7 @@ func subworker(rawFileName string, executionCount, retryCount int) (status int,
209209
dateFormat := "20060102"
210210
date, err := time.Parse(dateFormat, data.PackedDate)
211211

212-
dataset, ok := os.LookupEnv("BIGQUERY_DATASET")
213-
if !ok {
214-
// TODO - make this fatal.
215-
dataset = "mlab_sandbox"
216-
}
212+
dataset := etl.DataTypeToDataset(dataType)
217213
ins, err := bq.NewInserter(dataset, dataType, date)
218214
if err != nil {
219215
metrics.TaskCount.WithLabelValues(data.TableBase(), string(dataType), "NewInserterError").Inc()

etl/globals.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,17 @@ import (
1313
"github.com/m-lab/etl/metrics"
1414
)
1515

16+
// IsBatch indicates this process is a batch processing service.
17+
var IsBatch bool
18+
19+
// OmitDeltas indicates we should NOT process all snapshots.
20+
var OmitDeltas bool
21+
22+
func init() {
23+
IsBatch, _ = strconv.ParseBool(os.Getenv("NDT_BATCH"))
24+
OmitDeltas, _ = strconv.ParseBool(os.Getenv("NDT_OMIT_DELTAS"))
25+
}
26+
1627
// YYYYMMDD is a regexp string for identifying dense dates.
1728
const YYYYMMDD = `\d{4}[01]\d[0123]\d`
1829

@@ -109,8 +120,7 @@ func (fn *DataPath) TableBase() string {
109120
// IsBatchService return true if this is a NDT batch service.
110121
// TODO - update this to BATCH_SERVICE, so it makes sense for other pipelines.
111122
func IsBatchService() bool {
112-
isBatch, _ := strconv.ParseBool(os.Getenv("NDT_BATCH"))
113-
return isBatch
123+
return IsBatch
114124
}
115125

116126
// GetMetroName extracts metro name like "acc" from file name like
@@ -167,8 +177,7 @@ type DataType string
167177
func (dt DataType) BQBufferSize() int {
168178
// Special case for NDT when omitting deltas.
169179
if dt == NDT {
170-
omitDeltas, _ := strconv.ParseBool(os.Getenv("NDT_OMIT_DELTAS"))
171-
if omitDeltas {
180+
if OmitDeltas {
172181
return dataTypeToBQBufferSize[NDT_OMIT_DELTAS]
173182
}
174183
}
@@ -218,8 +227,21 @@ var (
218227
// queue_pusher.go
219228
)
220229

221-
// AddPanicMetric captures panics, increments the
222-
// panic metric, and then repanics.
230+
// DataTypeToDataset returns the appropriate dataset to use.
231+
// This is a bit of a hack, but works for our current needs.
232+
func DataTypeToDataset(dt DataType) string {
233+
if dt == SS {
234+
return "private"
235+
}
236+
237+
if IsBatchService() {
238+
return "batch"
239+
}
240+
241+
return "base_tables"
242+
}
243+
244+
// CountPanics updates the PanicCount metric, then repanics.
223245
// It must be wrapped in a defer.
224246
// Examples:
225247
// For function that returns an error:

etl/globals_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,17 @@ func TestCountPanics(t *testing.T) {
198198

199199
rePanic()
200200
}
201+
202+
func TestSidestreamDataset(t *testing.T) {
203+
if etl.DataTypeToDataset(etl.SS) != "private" {
204+
t.Error("For SS, should be private:", etl.DataTypeToDataset(etl.SS))
205+
}
206+
etl.IsBatch = true
207+
if etl.DataTypeToDataset(etl.NDT) != "batch" {
208+
t.Error("For IsBatchService, should be batch:", etl.DataTypeToDataset(etl.NDT))
209+
}
210+
etl.IsBatch = false
211+
if etl.DataTypeToDataset(etl.NDT) != "base_tables" {
212+
t.Error("For IsBatchService, should be batch:", etl.DataTypeToDataset(etl.NDT))
213+
}
214+
}

0 commit comments

Comments
 (0)