Skip to content

Commit 5b024de

Browse files
committed
Revert to using remote subscription and service account
1 parent eec7232 commit 5b024de

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

cmd/etl_worker/app-ndt.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ env_variables:
4242
BIGQUERY_DATASET: 'mlab_sandbox'
4343
ANNOTATE_IP: 'true'
4444
# Location of subscription, and service account key.
45-
SUBSCRIPTION_PROJECT: 'mlab-sandbox'
45+
SUBSCRIPTION_PROJECT: 'mlab-oti'
4646
SUBSCRIPTION_NAME: 'sandbox-pipeline'
47+
SUBSCRIPTION_KEY_BUCKET: 'gs://keys-mlab-sandbox/'
48+
SUBSCRIPTION_KEY_FILE: 'pubsub-reader.mlab-oti.json'
4749
# TODO add custom service-account, instead of using default credentials.

cmd/etl_worker/etl_worker.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ import (
88
"log"
99
"net/http"
1010
"os"
11+
"os/exec"
1112
"runtime"
1213
"strconv"
1314
"sync/atomic"
1415
"time"
1516

17+
"google.golang.org/api/option"
18+
1619
"cloud.google.com/go/pubsub"
1720
"golang.org/x/net/context"
1821

@@ -255,12 +258,22 @@ func setMaxInFlight() {
255258
}
256259

257260
func runPubSubHandler() error {
261+
keybucket := os.Getenv("SUBSCRIPTION_KEY_BUCKET")
262+
keyfile := os.Getenv("SUBSCRIPTION_KEY_FILE")
263+
cmd := exec.Command("gsutil", "cp", keybucket+keyfile, "/tmp/"+keyfile)
264+
stdoutStderr, err := cmd.CombinedOutput()
265+
if err != nil {
266+
log.Println(stdoutStderr)
267+
return err
268+
}
269+
258270
ctx := context.Background()
271+
opt := option.WithServiceAccountFile("/tmp/" + keyfile)
259272
// Must use the project where the subscription resides, or else
260273
// we can't find it.
261274
proj := os.Getenv("SUBSCRIPTION_PROJECT")
262275
subscription := os.Getenv("SUBSCRIPTION_NAME")
263-
client, err := pubsub.NewClient(ctx, proj)
276+
client, err := pubsub.NewClient(ctx, proj, opt)
264277

265278
if err != nil {
266279
return err
@@ -305,6 +318,7 @@ func runPubSubHandler() error {
305318
log.Println(outcome)
306319
// TODO(gfr) Remove once we it looks ok.
307320
msg.Nack()
321+
log.Println("Warning - Nack and sleep 30 seconds")
308322
time.Sleep(30 * time.Second)
309323
} else {
310324
msg.Ack()

0 commit comments

Comments
 (0)