Skip to content

Commit 839cb87

Browse files
authored
Migrate to go/cloud/gcs (#936)
* use gcsfake * migrate to go/cloud/gcs
1 parent 1e2ae58 commit 839cb87

File tree

5 files changed

+40
-299
lines changed

5 files changed

+40
-299
lines changed

active/active.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ import (
1717
"sync"
1818
"time"
1919

20-
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
2120
"google.golang.org/api/iterator"
2221

2322
"cloud.google.com/go/storage"
24-
"github.com/m-lab/etl/cloud/gcs"
2523
"github.com/m-lab/etl/metrics"
24+
"github.com/m-lab/go/cloud/gcs"
2625
"github.com/m-lab/go/logx"
2726
)
2827

@@ -46,9 +45,9 @@ type FileLister func(ctx context.Context) ([]*storage.ObjectAttrs, int64, error)
4645
// FileListerFunc creates a function that returns a slice of *storage.ObjectAttrs.
4746
// On certain GCS errors, it may return partial result and an error.
4847
// TODO - consider moving this to GardenerAPI.
49-
func FileListerFunc(sc stiface.Client, prefix string, filter *regexp.Regexp) FileLister {
48+
func FileListerFunc(bh *gcs.BucketHandle, prefix string, filter *regexp.Regexp) FileLister {
5049
return func(ctx context.Context) ([]*storage.ObjectAttrs, int64, error) {
51-
return gcs.GetFilesSince(ctx, sc, prefix, filter, time.Time{})
50+
return bh.GetFilesSince(ctx, prefix, filter, time.Time{})
5251
}
5352
}
5453

active/active_test.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@ import (
1010
"testing"
1111
"time"
1212

13+
"cloud.google.com/go/storage"
1314
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
14-
"github.com/m-lab/go/logx"
1515
"golang.org/x/sync/errgroup"
1616
"google.golang.org/api/iterator"
1717

18-
"cloud.google.com/go/storage"
1918
"github.com/m-lab/etl/active"
20-
"github.com/m-lab/go/cloudtest"
19+
"github.com/m-lab/go/cloud/gcs"
20+
"github.com/m-lab/go/logx"
21+
"github.com/m-lab/go/rtx"
22+
23+
"github.com/m-lab/go/cloudtest/gcsfake"
2124
)
2225

2326
func init() {
@@ -77,23 +80,25 @@ func newCounter(t *testing.T) *counter {
7780
}
7881

7982
func testClient() stiface.Client {
80-
client := cloudtest.GCSClient{}
83+
client := gcsfake.GCSClient{}
8184
client.AddTestBucket("foobar",
82-
cloudtest.BucketHandle{
85+
gcsfake.BucketHandle{
8386
ObjAttrs: []*storage.ObjectAttrs{
84-
&storage.ObjectAttrs{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/obj1", Updated: time.Now()},
85-
&storage.ObjectAttrs{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/obj2", Updated: time.Now()},
86-
&storage.ObjectAttrs{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/obj3", Updated: time.Date(2000, 01, 01, 02, 03, 04, 0, time.UTC)},
87-
&storage.ObjectAttrs{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/subdir/obj4", Updated: time.Now()},
88-
&storage.ObjectAttrs{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/subdir/obj5", Updated: time.Now()},
89-
&storage.ObjectAttrs{Bucket: "foobar", Name: "ndt/tcpinfo/2019/01/01/obj3", Updated: time.Date(2000, 01, 01, 02, 03, 04, 0, time.UTC)},
90-
&storage.ObjectAttrs{Bucket: "foobar", Name: "obj6", Updated: time.Now()},
87+
{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/obj1", Updated: time.Now()},
88+
{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/obj2", Updated: time.Now()},
89+
{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/obj3", Updated: time.Date(2000, 01, 01, 02, 03, 04, 0, time.UTC)},
90+
{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/subdir/obj4", Updated: time.Now()},
91+
{Bucket: "foobar", Name: "ndt/ndt5/2019/01/01/subdir/obj5", Updated: time.Now()},
92+
{Bucket: "foobar", Name: "ndt/tcpinfo/2019/01/01/obj3", Updated: time.Date(2000, 01, 01, 02, 03, 04, 0, time.UTC)},
93+
{Bucket: "foobar", Name: "obj6", Updated: time.Now()},
9194
}})
9295
return client
9396
}
9497

9598
func standardLister() active.FileLister {
96-
return active.FileListerFunc(testClient(), "gs://foobar/ndt/ndt5/2019/01/01/", nil)
99+
bh, err := gcs.GetBucket(context.Background(), testClient(), "foobar")
100+
rtx.Must(err, "GetBucket failed")
101+
return active.FileListerFunc(bh, "ndt/ndt5/2019/01/01/", nil)
97102
}
98103

99104
func runAll(ctx context.Context, rSrc active.RunnableSource) (*errgroup.Group, error) {

active/poller.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
job "github.com/m-lab/etl-gardener/client"
2222
"github.com/m-lab/etl-gardener/tracker"
23+
"github.com/m-lab/go/cloud/gcs"
2324
"github.com/m-lab/go/rtx"
2425

2526
"github.com/m-lab/etl/metrics"
@@ -128,21 +129,34 @@ func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job track
128129
}
129130
}
130131

132+
func failMetric(j tracker.Job, label string) {
133+
JobFailures.WithLabelValues(
134+
j.Experiment+"/"+j.Datatype, j.Date.Format("2006"), label).Inc()
135+
}
136+
131137
// JobFileSource creates a gcsSource for the job.
132138
func (g *GardenerAPI) JobFileSource(ctx context.Context, job tracker.Job,
133139
toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error) {
134140

135141
filter, err := regexp.Compile(job.Filter)
136142
if err != nil {
137-
JobFailures.WithLabelValues(
138-
job.Experiment+"/"+job.Datatype, job.Date.Format("2006"), "filter compile").Inc()
143+
failMetric(job, "filter compile")
144+
return nil, err
145+
}
146+
bh, err := gcs.GetBucket(ctx, g.gcs, job.Bucket)
147+
if err != nil {
148+
failMetric(job, "filesource")
139149
return nil, err
140150
}
141-
lister := FileListerFunc(g.gcs, job.Path(), filter)
151+
prefix, err := job.Prefix()
152+
if err != nil {
153+
failMetric(job, "prefix")
154+
return nil, err
155+
}
156+
lister := FileListerFunc(bh, prefix, filter)
142157
gcsSource, err := NewGCSSource(ctx, job.Path(), lister, toRunnable)
143158
if err != nil {
144-
JobFailures.WithLabelValues(
145-
job.Experiment+"/"+job.Datatype, job.Date.Format("2006"), "filesource").Inc()
159+
failMetric(job, "GCSSource")
146160
return nil, err
147161
}
148162
return gcsSource, nil
@@ -217,5 +231,4 @@ func (g *GardenerAPI) Poll(ctx context.Context,
217231
// Status adds a small amount of status info to w.
218232
func (g *GardenerAPI) Status(w http.ResponseWriter) {
219233
fmt.Fprintf(w, "Gardener API: %s\n", g.trackerBase.String())
220-
221234
}

cloud/gcs/gcs.go

Lines changed: 0 additions & 162 deletions
This file was deleted.

0 commit comments

Comments
 (0)