Skip to content

Commit 6752bb3

Browse files
committed
Change to use a local subscription
1 parent 7a4b798 commit 6752bb3

File tree

2 files changed

+3
-18
lines changed

2 files changed

+3
-18
lines changed

cmd/etl_worker/app-ndt.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ env_variables:
4242
BIGQUERY_DATASET: 'mlab_sandbox'
4343
ANNOTATE_IP: 'true'
4444
# Location of subscription, and service account key.
45-
SUBSCRIPTION_PROJECT: 'mlab-oti'
45+
SUBSCRIPTION_PROJECT: 'mlab-sandbox'
4646
SUBSCRIPTION_NAME: 'sandbox-pipeline'
47-
SUBSCRIPTION_KEY_BUCKET: 'gs://keys-mlab-sandbox/'
48-
SUBSCRIPTION_KEY_FILE: 'pubsub-reader.mlab-oti.json'
4947
# TODO add custom service-account, instead of using default credentials.

cmd/etl_worker/etl_worker.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@ import (
88
"log"
99
"net/http"
1010
"os"
11-
"os/exec"
1211
"runtime"
1312
"strconv"
1413
"sync/atomic"
1514
"time"
1615

17-
"google.golang.org/api/option"
18-
1916
"cloud.google.com/go/pubsub"
2017
"golang.org/x/net/context"
2118

@@ -258,22 +255,12 @@ func setMaxInFlight() {
258255
}
259256

260257
func runPubSubHandler() error {
261-
keybucket := os.Getenv("SUBSCRIPTION_KEY_BUCKET")
262-
keyfile := os.Getenv("SUBSCRIPTION_KEY_FILE")
263-
cmd := exec.Command("gsutil", "cp", keybucket+keyfile, "/tmp/"+keyfile)
264-
stdoutStderr, err := cmd.CombinedOutput()
265-
if err != nil {
266-
log.Println(stdoutStderr)
267-
return err
268-
}
269-
270258
ctx := context.Background()
271-
opt := option.WithServiceAccountFile("/tmp/" + keyfile)
272259
// Must use the project where the subscription resides, or else
273260
// we can't find it.
274261
proj := os.Getenv("SUBSCRIPTION_PROJECT")
275262
subscription := os.Getenv("SUBSCRIPTION_NAME")
276-
client, err := pubsub.NewClient(ctx, proj, opt)
263+
client, err := pubsub.NewClient(ctx, proj)
277264

278265
if err != nil {
279266
return err
@@ -318,7 +305,7 @@ func runPubSubHandler() error {
318305
log.Println(outcome)
319306
// TODO(gfr) Remove once we it looks ok.
320307
msg.Nack()
321-
time.Sleep(30*time.Second)
308+
time.Sleep(30 * time.Second)
322309
} else {
323310
msg.Ack()
324311
}

0 commit comments

Comments
 (0)