Skip to content

Commit fa1bca4

Browse files
authored
inject task.Factory into ProcessGKETask (#870)
* inject task.Factory into ProcessGKETask * PR feedback tweaks * unsaved files
1 parent 6214ded commit fa1bca4

File tree

7 files changed

+214
-124
lines changed

7 files changed

+214
-124
lines changed

cmd/etl_worker/etl_worker.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@ import (
1414
"sync/atomic"
1515
"time"
1616

17-
"cloud.google.com/go/storage"
17+
gcs "cloud.google.com/go/storage"
1818
"github.com/prometheus/client_golang/prometheus/promhttp"
1919

20-
"github.com/m-lab/go/bqx"
2120
"github.com/m-lab/go/prometheusx"
2221
"github.com/m-lab/go/rtx"
2322

2423
"github.com/m-lab/etl/active"
2524
"github.com/m-lab/etl/bq"
2625
"github.com/m-lab/etl/etl"
26+
"github.com/m-lab/etl/factory"
2727
"github.com/m-lab/etl/metrics"
28+
"github.com/m-lab/etl/storage"
29+
"github.com/m-lab/etl/task"
2830
"github.com/m-lab/etl/worker"
2931

3032
// Enable profiling. For more background and usage information, see:
@@ -121,7 +123,7 @@ func handleRequest(rwr http.ResponseWriter, rq *http.Request) {
121123

122124
// Throttle by grabbing a semaphore from channel.
123125
if shouldThrottle() {
124-
metrics.TaskCount.WithLabelValues("unknown", "worker", "TooManyRequests").Inc()
126+
metrics.TaskCount.WithLabelValues("unknown", "TooManyRequests").Inc()
125127
rwr.WriteHeader(http.StatusTooManyRequests)
126128
fmt.Fprintf(rwr, `{"message": "Too many tasks."}`)
127129
return
@@ -177,7 +179,7 @@ func subworker(rawFileName string, executionCount, retryCount int, age time.Dura
177179
// This handles base64 encoding, and requires a gs:// prefix.
178180
fn, err := etl.GetFilename(rawFileName)
179181
if err != nil {
180-
metrics.TaskCount.WithLabelValues("unknown", "worker", "BadRequest").Inc()
182+
metrics.TaskCount.WithLabelValues("unknown", "BadRequest").Inc()
181183
log.Printf("Invalid filename: %s\n", fn)
182184
return http.StatusBadRequest, `{"message": "Invalid filename."}`
183185
}
@@ -217,36 +219,28 @@ func setMaxInFlight() {
217219
}
218220

219221
type runnable struct {
220-
storage.ObjectAttrs
222+
tf task.Factory
223+
gcs.ObjectAttrs
221224
}
222225

223226
func (r *runnable) Run() error {
224227
path := fmt.Sprintf("gs://%s/%s", r.Bucket, r.Name)
225-
data, err := etl.ValidateTestPath(path)
228+
dp, err := etl.ValidateTestPath(path)
226229
if err != nil {
227230
log.Printf("Invalid filename: %v\n", err)
228231
return err
229232
}
230233

231-
// TODO This is short term hack to fix the injection bug.
232-
// It will be removed in the third PR that introduces Factories
233-
dataType := data.GetDataType()
234-
pdt := bqx.PDT{Project: dataType.BigqueryProject(), Dataset: dataType.Dataset(), Table: dataType.Table()}
235-
client, err := bq.GetClient(pdt.Project)
236-
if err != nil {
237-
return err
238-
}
239-
up := client.Dataset(pdt.Dataset).Table(pdt.Table).Uploader()
240-
// This avoids problems when a single row of the insert has invalid
241-
// data. We then have to carefully parse the returned error object.
242-
up.SkipInvalidRows = true
243-
244234
start := time.Now()
245235
log.Println("Processing", path)
246-
// TODO pass in storage client, or pass in TestSource.
247-
statusCode, err := worker.ProcessGKETask(path, up, nil) // Use default uploader and annotator
236+
237+
statusCode := http.StatusOK
238+
pErr := worker.ProcessGKETask(dp, r.tf)
239+
if pErr != nil {
240+
statusCode = pErr.Code()
241+
}
248242
metrics.DurationHistogram.WithLabelValues(
249-
data.DataType, http.StatusText(statusCode)).Observe(
243+
dp.DataType, http.StatusText(statusCode)).Observe(
250244
time.Since(start).Seconds())
251245
return err
252246
}
@@ -256,8 +250,17 @@ func (r *runnable) Info() string {
256250
return r.Name
257251
}
258252

259-
func toRunnable(obj *storage.ObjectAttrs) active.Runnable {
260-
return &runnable{*obj}
253+
func toRunnable(obj *gcs.ObjectAttrs) active.Runnable {
254+
c, err := storage.GetStorageClient(false)
255+
if err != nil {
256+
return nil // TODO add an error?
257+
}
258+
taskFactory := worker.StandardTaskFactory{
259+
Annotator: factory.DefaultAnnotatorFactory(),
260+
Sink: bq.NewSinkFactory(),
261+
Source: storage.GCSSourceFactory(c),
262+
}
263+
return &runnable{&taskFactory, *obj}
261264
}
262265

263266
func mustGardenerAPI(ctx context.Context, jobServer string) *active.GardenerAPI {

etl/globals.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func ValidateTestPath(path string) (DataPath, error) {
129129

130130
dataType := dp.GetDataType()
131131
if dataType == INVALID {
132-
metrics.TaskCount.WithLabelValues(dp.TableBase(), "worker", "BadRequest").Inc()
132+
metrics.TaskCount.WithLabelValues(string(dataType), "BadRequest").Inc()
133133
log.Printf("Invalid filename: %s\n", path)
134134
return DataPath{}, ErrBadDataType
135135
}

metrics/metrics.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ var (
165165
Name: "etl_files_processed",
166166
Help: "Number of files processed.",
167167
},
168+
// TODO maybe change to host and exp/type? Maybe drop day_of_week?
168169
[]string{"rsync_host_module", "day_of_week"},
169170
)
170171

@@ -179,8 +180,8 @@ var (
179180
Name: "etl_task_count",
180181
Help: "Number of tasks/archive files processed.",
181182
},
182-
// Go package or filename, and Status
183-
[]string{"table", "package", "status"},
183+
// table/datatype, and Status
184+
[]string{"table", "status"},
184185
)
185186

186187
// TestCount counts the number of tests successfully processed by the parsers.

metrics/metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestMetrics(t *testing.T) {
9999
metrics.PTPollutedCount.WithLabelValues("x")
100100
metrics.PTTestCount.WithLabelValues("x")
101101
metrics.RowSizeHistogram.WithLabelValues("x")
102-
metrics.TaskCount.WithLabelValues("x", "x", "x")
102+
metrics.TaskCount.WithLabelValues("x", "x")
103103
metrics.TestCount.WithLabelValues("x", "x", "x")
104104
metrics.WarningCount.WithLabelValues("x", "x", "x")
105105
metrics.WorkerCount.WithLabelValues("x")

task/task.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,13 @@ func (tt *Task) SetMaxFileSize(max int64) {
6161
// ProcessAllTests loops through all the tests in a tar file, calls the
6262
// injected parser to parse them, and inserts them into bigquery. Returns the
6363
// number of files processed.
64+
// TODO pass in the datatype label.
6465
func (tt *Task) ProcessAllTests() (int, error) {
6566
if tt.Parser == nil {
6667
panic("Parser is nil")
6768
}
68-
metrics.WorkerState.WithLabelValues(tt.TableName(), "task").Inc()
69-
defer metrics.WorkerState.WithLabelValues(tt.TableName(), "task").Dec()
69+
metrics.WorkerState.WithLabelValues(tt.Type(), "task").Inc()
70+
defer metrics.WorkerState.WithLabelValues(tt.Type(), "task").Dec()
7071
files := 0
7172
nilData := 0
7273
var testname string
@@ -86,7 +87,7 @@ OUTER:
8687
tt.meta["filename"], testname, files,
8788
time.Since(tt.meta["parse_time"].(time.Time)), err)
8889
metrics.TestCount.WithLabelValues(
89-
tt.Parser.TableName(), "unknown", "oversize file").Inc()
90+
tt.Type(), "unknown", "oversize file").Inc()
9091
continue OUTER
9192
default:
9293
// We are seeing several of these per hour, a little more than
@@ -104,7 +105,7 @@ OUTER:
104105
time.Since(tt.meta["parse_time"].(time.Time)), err)
105106

106107
metrics.TestCount.WithLabelValues(
107-
tt.Parser.TableName(), "unknown", "unrecovered").Inc()
108+
tt.Type(), "unknown", "unrecovered").Inc()
108109
// Since we don't understand these errors, safest thing to do is
109110
// stop processing the tar file (and task).
110111
break OUTER
@@ -120,18 +121,18 @@ OUTER:
120121
kind, parsable := tt.Parser.IsParsable(testname, data)
121122
if !parsable {
122123
metrics.FileSizeHistogram.WithLabelValues(
123-
tt.Parser.TableName(), kind, "ignored").Observe(float64(len(data)))
124+
tt.Type(), kind, "ignored").Observe(float64(len(data)))
124125
// Don't bother calling ParseAndInsert since this is unparsable.
125126
continue
126127
} else {
127128
metrics.FileSizeHistogram.WithLabelValues(
128-
tt.Parser.TableName(), kind, "parsed").Observe(float64(len(data)))
129+
tt.Type(), kind, "parsed").Observe(float64(len(data)))
129130
}
130131
err = tt.Parser.ParseAndInsert(tt.meta, testname, data)
131132
// Shouldn't have any of these, as they should be handled in ParseAndInsert.
132133
if err != nil {
133134
metrics.TaskCount.WithLabelValues(
134-
tt.TableName(), "Task", "ParseAndInsertError").Inc()
135+
tt.Type(), "ParseAndInsertError").Inc()
135136
log.Printf("%v", err)
136137
// TODO(dev) Handle this error properly!
137138
continue

0 commit comments

Comments
 (0)