@@ -2,9 +2,7 @@ package active
22
33import (
44 "context"
5- "errors"
65 "fmt"
7- "io/ioutil"
86 "log"
97 "net/http"
108 "net/url"
@@ -19,7 +17,7 @@ import (
1917 "google.golang.org/api/iterator"
2018 "google.golang.org/api/option"
2119
22- job "github.com/m-lab/etl-gardener/client"
20+ gardener "github.com/m-lab/etl-gardener/client/v2 "
2321 "github.com/m-lab/etl-gardener/tracker"
2422 "github.com/m-lab/go/cloud/gcs"
2523 "github.com/m-lab/go/rtx"
@@ -46,11 +44,13 @@ var JobFailures = promauto.NewCounterVec(
4644type GardenerAPI struct {
4745 trackerBase url.URL
4846 gcs stiface.Client
47+ jobs * gardener.JobClient
4948}
5049
5150// NewGardenerAPI creates a GardenerAPI.
5251func NewGardenerAPI (trackerBase url.URL , gcs stiface.Client ) * GardenerAPI {
53- return & GardenerAPI {trackerBase : trackerBase , gcs : gcs }
52+ c := gardener .NewJobClient (trackerBase )
53+ return & GardenerAPI {trackerBase : trackerBase , gcs : gcs , jobs : c }
5454}
5555
5656// MustStorageClient creates a default GCS client.
@@ -60,43 +60,12 @@ func MustStorageClient(ctx context.Context) stiface.Client {
6060 return stiface .AdaptClient (c )
6161}
6262
63- // TODO migrate this to m-lab/go
64- func post (ctx context.Context , url url.URL ) ([]byte , int , error ) {
65- ctx , cancel := context .WithTimeout (ctx , time .Minute )
66- defer cancel ()
67- req , reqErr := http .NewRequestWithContext (ctx , "POST" , url .String (), nil )
68- if reqErr != nil {
69- return nil , 0 , reqErr
70- }
71- resp , postErr := http .DefaultClient .Do (req )
72- if postErr != nil {
73- return nil , 0 , postErr // Documentation says we can ignore body.
74- }
75-
76- // Gauranteed to have a non-nil response and body.
77- defer resp .Body .Close ()
78- b , err := ioutil .ReadAll (resp .Body ) // Documentation recommends reading body.
79- return b , resp .StatusCode , err
80- }
81-
82- // TODO add retry in case gardener is offline (during redeployment)
83- // TODO add metrics to track latency, retries, and errors.
84- func postAndIgnoreResponse (ctx context.Context , url url.URL ) error {
85- _ , status , err := post (ctx , url )
86- if err != nil {
87- return err
88- }
89- if status != http .StatusOK {
90- return errors .New (http .StatusText (status ))
91- }
92- return nil
93- }
94-
9563// RunAll will execute functions provided by Next() until there are no more,
9664// or the context is canceled.
97- func (g * GardenerAPI ) RunAll (ctx context.Context , rSrc RunnableSource , job tracker.Job ) (* errgroup.Group , error ) {
65+ func (g * GardenerAPI ) RunAll (ctx context.Context , rSrc RunnableSource , jt * tracker.JobWithTarget ) (* errgroup.Group , error ) {
9866 eg := & errgroup.Group {}
9967 count := 0
68+ job := jt .Job
10069 for {
10170 run , err := rSrc .Next (ctx )
10271 if err != nil {
@@ -111,9 +80,8 @@ func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job track
11180 }
11281 }
11382
114- heartbeat := tracker .HeartbeatURL (g .trackerBase , job )
115- if postErr := postAndIgnoreResponse (ctx , * heartbeat ); postErr != nil {
116- log .Println (postErr , "on heartbeat for" , job .Path ())
83+ if err := g .jobs .Heartbeat (ctx , jt .ID ); err != nil {
84+ log .Println (err , "on heartbeat for" , job .Path ())
11785 }
11886
11987 debug .Println ("Starting func" )
@@ -131,9 +99,8 @@ func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job track
13199
132100 err = run .Run (ctx )
133101 if err == nil {
134- update := tracker .UpdateURL (g .trackerBase , job , tracker .Parsing , run .Info ())
135- if postErr := postAndIgnoreResponse (ctx , * update ); postErr != nil {
136- log .Println (postErr , "on update for" , job .Path ())
102+ if err := g .jobs .Update (ctx , jt .ID , tracker .Parsing , run .Info ()); err != nil {
103+ log .Println (err , "on update for" , job .Path ())
137104 }
138105 }
139106 return
@@ -178,52 +145,48 @@ func (g *GardenerAPI) JobFileSource(ctx context.Context, job tracker.Job,
178145}
179146
180147// NextJob requests a new job from Gardener service.
181- func (g * GardenerAPI ) NextJob (ctx context.Context ) (tracker.JobWithTarget , error ) {
182- return job . NextJob (ctx , g . trackerBase )
148+ func (g * GardenerAPI ) NextJob (ctx context.Context ) (* tracker.JobWithTarget , error ) {
149+ return g . jobs . Next (ctx )
183150}
184151
185152func (g * GardenerAPI ) pollAndRun (ctx context.Context ,
186153 toRunnable func (o * storage.ObjectAttrs ) Runnable , tokens TokenSource ) error {
187- job , err := g .NextJob (ctx )
154+ jt , err := g .jobs . Next (ctx )
188155 if err != nil {
189156 log .Println (err , "on Gardener client.NextJob()" )
190157 return err
191158 }
192159
193- log .Println (job , "filter:" , job .Filter )
194- gcsSource , err := g .JobFileSource (ctx , job .Job , toRunnable )
160+ log .Println (jt , "filter:" , jt . Job .Filter )
161+ gcsSource , err := g .JobFileSource (ctx , jt .Job , toRunnable )
195162 if err != nil {
196163 log .Println (err , "on JobFileSource" )
197164 return err
198165 }
199166 src := Throttle (gcsSource , tokens )
200167
201- log .Println ("Running" , job .Path ())
202-
203- update := tracker .UpdateURL (g .trackerBase , job .Job , tracker .Parsing , "starting tasks" )
204- if postErr := postAndIgnoreResponse (ctx , * update ); postErr != nil {
205- log .Println (postErr )
168+ log .Println ("Running" , jt .Job .Path ())
169+ if err := g .jobs .Update (ctx , jt .ID , tracker .Parsing , "starting tasks" ); err != nil {
170+ log .Println (err )
206171 }
207172
208- eg , err := g .RunAll (ctx , src , job . Job )
173+ eg , err := g .RunAll (ctx , src , jt )
209174 if err != nil {
210175 log .Println (err )
211176 }
212177
213178 // Once all are dispatched, we want to wait until all have completed
214179 // before posting the state change.
215180 go func () {
216- log .Println ("all tasks dispatched for" , job .Path ())
181+ log .Println ("all tasks dispatched for" , jt . Job .Path ())
217182 err := eg .Wait ()
218183 if err != nil {
219- log .Println (err , "on wait for" , job .Path ())
184+ log .Println (err , "on wait for" , jt . Job .Path ())
220185 } else {
221- log .Println ("finished" , job .Path ())
186+ log .Println ("finished" , jt . Job .Path ())
222187 }
223- update := tracker .UpdateURL (g .trackerBase , job .Job , tracker .ParseComplete , "" )
224- // TODO - should this have a retry?
225- if postErr := postAndIgnoreResponse (ctx , * update ); postErr != nil {
226- log .Println (postErr )
188+ if err := g .jobs .Update (ctx , jt .ID , tracker .ParseComplete , "" ); err != nil {
189+ log .Println (err )
227190 }
228191 }()
229192
0 commit comments