Skip to content

Commit ef79b75

Browse files
authored
Merge pull request #283 from m-lab/pubsub1
refactor worker for reuse with pubsub
2 parents a104026 + 697ea2e commit ef79b75

File tree

1 file changed

+35
-42
lines changed

1 file changed

+35
-42
lines changed

cmd/etl_worker/etl_worker.go

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -87,36 +87,29 @@ func decrementInFlight() {
8787
atomic.AddInt32(&inFlight, -1)
8888
}
8989

90-
func worker(w http.ResponseWriter, r *http.Request) {
91-
// TODO(dev) Check how many times a request has already been attempted.
92-
93-
// These keep track of the (nested) state of the worker.
94-
metrics.WorkerState.WithLabelValues("worker").Inc()
95-
defer metrics.WorkerState.WithLabelValues("worker").Dec()
96-
90+
// TODO(gfr) unify counting for http and pubsub paths?
91+
func worker(rwr http.ResponseWriter, rq *http.Request) {
9792
// Throttle by grabbing a semaphore from channel.
9893
if shouldThrottle() {
9994
metrics.TaskCount.WithLabelValues("unknown", "TooManyRequests").Inc()
100-
w.WriteHeader(http.StatusTooManyRequests)
101-
fmt.Fprintf(w, `{"message": "Too many tasks."}`)
95+
rwr.WriteHeader(http.StatusTooManyRequests)
96+
fmt.Fprintf(rwr, `{"message": "Too many tasks."}`)
10297
return
10398
}
99+
104100
// Decrement counter when worker finishes.
105101
defer decrementInFlight()
106102

107-
metrics.WorkerCount.Inc()
108-
defer metrics.WorkerCount.Dec()
109-
110103
var err error
111-
retryCountStr := r.Header.Get("X-AppEngine-TaskRetryCount")
104+
retryCountStr := rq.Header.Get("X-AppEngine-TaskRetryCount")
112105
retryCount := 0
113106
if retryCountStr != "" {
114107
retryCount, err = strconv.Atoi(retryCountStr)
115108
if err != nil {
116109
log.Printf("Invalid retries string: %s\n", retryCountStr)
117110
}
118111
}
119-
executionCountStr := r.Header.Get("X-AppEngine-TaskExecutionCount")
112+
executionCountStr := rq.Header.Get("X-AppEngine-TaskExecutionCount")
120113
executionCount := 0
121114
if executionCountStr != "" {
122115
executionCount, err = strconv.Atoi(executionCountStr)
@@ -125,20 +118,35 @@ func worker(w http.ResponseWriter, r *http.Request) {
125118
}
126119
}
127120

128-
r.ParseForm()
121+
rq.ParseForm()
129122
// Log request data.
130-
for key, value := range r.Form {
123+
for key, value := range rq.Form {
131124
log.Printf("Form: %q == %q\n", key, value)
132125
}
133126

127+
rawFileName := rq.FormValue("filename")
128+
status, msg := subworker(rawFileName, executionCount, retryCount)
129+
rwr.WriteHeader(status)
130+
fmt.Fprintf(rwr, msg)
131+
}
132+
133+
func subworker(rawFileName string, executionCount, retryCount int) (status int, msg string) {
134+
// TODO(dev) Check how many times a request has already been attempted.
135+
136+
// These keep track of the (nested) state of the worker.
137+
metrics.WorkerState.WithLabelValues("worker").Inc()
138+
defer metrics.WorkerState.WithLabelValues("worker").Dec()
139+
140+
metrics.WorkerCount.Inc()
141+
defer metrics.WorkerCount.Dec()
142+
143+
var err error
134144
// This handles base64 encoding, and requires a gs:// prefix.
135-
fn, err := storage.GetFilename(r.FormValue("filename"))
145+
fn, err := storage.GetFilename(rawFileName)
136146
if err != nil {
137147
metrics.TaskCount.WithLabelValues("unknown", "BadRequest").Inc()
138148
log.Printf("Invalid filename: %s\n", fn)
139-
w.WriteHeader(http.StatusBadRequest)
140-
fmt.Fprintf(w, `{"message": "Invalid filename."}`)
141-
return
149+
return http.StatusBadRequest, `{"message": "Invalid filename."}`
142150
}
143151

144152
// TODO(dev): log the originating task queue name from headers.
@@ -148,38 +156,30 @@ func worker(w http.ResponseWriter, r *http.Request) {
148156
data, err := etl.ValidateTestPath(fn)
149157
if err != nil {
150158
log.Printf("Invalid filename: %v\n", err)
151-
w.WriteHeader(http.StatusBadRequest)
152-
fmt.Fprintf(w, `{"message": "Invalid filename."}`)
153-
return
159+
return http.StatusBadRequest, `{"message": "Invalid filename."}`
154160
}
155161
dataType := data.GetDataType()
156162

157163
// Move this into Validate function
158164
if dataType == etl.INVALID {
159165
metrics.TaskCount.WithLabelValues("unknown", "BadRequest").Inc()
160166
log.Printf("Invalid filename: %s\n", fn)
161-
w.WriteHeader(http.StatusBadRequest)
162-
fmt.Fprintf(w, `{"message": "Invalid filename."}`)
163-
return
167+
return http.StatusBadRequest, `{"message": "Invalid filename."}`
164168
}
165169

166170
client, err := storage.GetStorageClient(false)
167171
if err != nil {
168172
metrics.TaskCount.WithLabelValues("unknown", "ServiceUnavailable").Inc()
169173
log.Printf("Error getting storage client: %v\n", err)
170-
w.WriteHeader(http.StatusServiceUnavailable)
171-
fmt.Fprintf(w, `{"message": "Could not create client."}`)
172-
return
174+
return http.StatusServiceUnavailable, `{"message": "Could not create client."}`
173175
}
174176

175177
// TODO - add a timer for reading the file.
176178
tr, err := storage.NewETLSource(client, fn)
177179
if err != nil {
178180
metrics.TaskCount.WithLabelValues(string(dataType), "ETLSourceError").Inc()
179181
log.Printf("Error opening gcs file: %v", err)
180-
w.WriteHeader(http.StatusInternalServerError)
181-
fmt.Fprintf(w, `{"message": "Problem opening gcs file."}`)
182-
return
182+
return http.StatusInternalServerError, `{"message": "Problem opening gcs file."}`
183183
// TODO - anything better we could do here?
184184
}
185185
defer tr.Close()
@@ -196,9 +196,7 @@ func worker(w http.ResponseWriter, r *http.Request) {
196196
if err != nil {
197197
metrics.TaskCount.WithLabelValues(string(dataType), "NewInserterError").Inc()
198198
log.Printf("Error creating BQ Inserter: %v", err)
199-
w.WriteHeader(http.StatusInternalServerError)
200-
fmt.Fprintf(w, `{"message": "Problem creating BQ inserter."}`)
201-
return
199+
return http.StatusInternalServerError, `{"message": "Problem creating BQ inserter."}`
202200
// TODO - anything better we could do here?
203201
}
204202

@@ -222,17 +220,12 @@ func worker(w http.ResponseWriter, r *http.Request) {
222220
if err != nil {
223221
metrics.TaskCount.WithLabelValues(string(dataType), "TaskError").Inc()
224222
log.Printf("Error Processing Tests: %v", err)
225-
w.WriteHeader(http.StatusInternalServerError)
226-
fmt.Fprintf(w, `{"message": "Error in ProcessAllTests"}`)
227-
return
223+
return http.StatusInternalServerError, `{"message": "Error in ProcessAllTests"}`
228224
// TODO - anything better we could do here?
229225
}
230226

231-
// TODO - if there are any errors, consider sending back a meaningful response
232-
// for web browser and queue-pusher debugging.
233-
fmt.Fprintf(w, `{"message": "Success"}`)
234-
235227
metrics.TaskCount.WithLabelValues(string(dataType), "OK").Inc()
228+
return http.StatusOK, `{"message": "Success"}`
236229
}
237230

238231
func healthCheckHandler(w http.ResponseWriter, r *http.Request) {

0 commit comments

Comments
 (0)