@@ -193,13 +193,39 @@ impl Coordinator {
193193 /// If the request fails due to an expired access token, we will retry once
194194 /// with a fresh one.
195195 pub async fn poll_commands ( & mut self ) -> Result < Option < NodeCommand > > {
196- let response = self . send ( RequestType :: PollCommands ) . await ?;
197- let data = response. bytes ( ) . await ?;
198- let pending: PendingNodeCommand = serde_json:: from_slice ( & data) ?;
196+ let request = PollCommandsRequest {
197+ machine_id : self . registration . machine_id ,
198+ } ;
199+
200+ let url = self . registration . dynamic_config . commands_url . clone ( ) ;
201+ let request = self
202+ . client
203+ . get ( url)
204+ . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
205+ . json ( & request) ;
206+
207+ let pending: PendingNodeCommand = self
208+ . send_request ( request)
209+ . await
210+ . context ( "PollCommands" ) ?
211+ . json ( )
212+ . await
213+ . context ( "parsing PollCommands response" ) ?;
199214
200215 if let Some ( envelope) = pending. envelope {
201- let request = RequestType :: ClaimCommand ( envelope. message_id ) ;
202- self . send ( request) . await ?;
216+ let request = ClaimNodeCommandRequest {
217+ machine_id : self . registration . machine_id ,
218+ message_id : envelope. message_id ,
219+ } ;
220+
221+ let url = self . registration . dynamic_config . commands_url . clone ( ) ;
222+ let request = self
223+ . client
224+ . delete ( url)
225+ . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
226+ . json ( & request) ;
227+
228+ self . send_request ( request) . await . context ( "ClaimCommand" ) ?;
203229
204230 Ok ( Some ( envelope. command ) )
205231 } else {
@@ -212,27 +238,55 @@ impl Coordinator {
212238 event,
213239 machine_id : self . registration . machine_id ,
214240 } ;
215- let request = RequestType :: EmitEvent ( & envelope) ;
216- self . send ( request) . await ?;
241+
242+ let url = self . registration . dynamic_config . events_url . clone ( ) ;
243+ let request = self
244+ . client
245+ . post ( url)
246+ . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
247+ . json ( & envelope) ;
248+
249+ self . send_request ( request) . await . context ( "EmitEvent" ) ?;
217250
218251 Ok ( ( ) )
219252 }
220253
221254 async fn can_schedule ( & mut self , work_set : & WorkSet ) -> Result < CanSchedule > {
222- let request = RequestType :: CanSchedule ( work_set) ;
223- let response = self . send ( request) . await ?;
255+ // Temporary: assume one work unit per work set.
256+ //
257+ // In the future, we will probably want the same behavior, but we will
258+ // need to make sure that other the work units in the set have their states
259+ // updated if necessary.
260+ let task_id = work_set. work_units [ 0 ] . task_id ;
261+ let envelope = CanScheduleRequest {
262+ machine_id : self . registration . machine_id ,
263+ task_id,
264+ } ;
224265
225- let can_schedule : CanSchedule = response . json ( ) . await ? ;
266+ debug ! ( "checking if able to schedule task ID = {}" , task_id ) ;
226267
268+ let mut url = self . registration . config . onefuzz_url . clone ( ) ;
269+ url. set_path ( "/api/agents/can_schedule" ) ;
270+ let request = self
271+ . client
272+ . post ( url)
273+ . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
274+ . json ( & envelope) ;
275+
276+ let can_schedule: CanSchedule = self
277+ . send_request ( request)
278+ . await
279+ . context ( "CanSchedule" ) ?
280+ . json ( )
281+ . await
282+ . context ( "parsing CanSchedule response" ) ?;
227283 Ok ( can_schedule)
228284 }
229285
230- // The lifetime is needed by an argument type. We can't make it anonymous,
231- // as clippy suggests, because `'_` is not allowed in this binding site.
232- #[ allow( clippy:: needless_lifetimes) ]
233- async fn send < ' a > ( & mut self , request_type : RequestType < ' a > ) -> Result < Response > {
234- let request = self . get_request_builder ( request_type. clone ( ) ) ;
286+ async fn send_request ( & mut self , request : RequestBuilder ) -> Result < Response > {
235287 let mut response = request
288+ . try_clone ( )
289+ . ok_or_else ( || anyhow ! ( "unable to clone request" ) ) ?
236290 . send_retry (
237291 |code| match code {
238292 StatusCode :: UNAUTHORIZED => RetryCheck :: Fail ,
@@ -253,7 +307,6 @@ impl Coordinator {
253307 debug ! ( "retrying request after refreshing access token" ) ;
254308
255309 // And try one more time.
256- let request = self . get_request_builder ( request_type) ;
257310 response = request
258311 . send_retry_default ( )
259312 . await
@@ -269,95 +322,6 @@ impl Coordinator {
269322
270323 Ok ( response)
271324 }
272-
273- fn get_request_builder ( & self , request_type : RequestType < ' _ > ) -> RequestBuilder {
274- match request_type {
275- RequestType :: PollCommands => self . poll_commands_request ( ) ,
276- RequestType :: ClaimCommand ( message_id) => self . claim_command_request ( message_id) ,
277- RequestType :: EmitEvent ( event) => self . emit_event_request ( event) ,
278- RequestType :: CanSchedule ( work_set) => self . can_schedule_request ( work_set) ,
279- }
280- }
281-
282- fn poll_commands_request ( & self ) -> RequestBuilder {
283- let request = PollCommandsRequest {
284- machine_id : self . registration . machine_id ,
285- } ;
286-
287- let url = self . registration . dynamic_config . commands_url . clone ( ) ;
288- let request_builder = self
289- . client
290- . get ( url)
291- . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
292- . json ( & request) ;
293-
294- request_builder
295- }
296-
297- fn claim_command_request ( & self , message_id : String ) -> RequestBuilder {
298- let request = ClaimNodeCommandRequest {
299- machine_id : self . registration . machine_id ,
300- message_id,
301- } ;
302-
303- let url = self . registration . dynamic_config . commands_url . clone ( ) ;
304- let request_builder = self
305- . client
306- . delete ( url)
307- . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
308- . json ( & request) ;
309-
310- request_builder
311- }
312-
313- fn emit_event_request ( & self , event : & NodeEventEnvelope ) -> RequestBuilder {
314- let url = self . registration . dynamic_config . events_url . clone ( ) ;
315- let request_builder = self
316- . client
317- . post ( url)
318- . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
319- . json ( event) ;
320-
321- request_builder
322- }
323-
324- fn can_schedule_request ( & self , work_set : & WorkSet ) -> RequestBuilder {
325- // Temporary: assume one work unit per work set.
326- //
327- // In the future, we will probably want the same behavior, but we will
328- // need to make sure that other the work units in the set have their states
329- // updated if necessary.
330- let task_id = work_set. work_units [ 0 ] . task_id ;
331- let can_schedule = CanScheduleRequest {
332- machine_id : self . registration . machine_id ,
333- task_id,
334- } ;
335-
336- debug ! ( "checking if able to schedule task ID = {}" , task_id) ;
337-
338- let mut url = self . registration . config . onefuzz_url . clone ( ) ;
339- url. set_path ( "/api/agents/can_schedule" ) ;
340- let request_builder = self
341- . client
342- . post ( url)
343- . bearer_auth ( self . token . secret ( ) . expose_ref ( ) )
344- . json ( & can_schedule) ;
345-
346- request_builder
347- }
348- }
349-
350- // Enum to thunk creation of requests.
351- //
352- // The upstream `Request` type is not `Clone`, so we can't retry a request
353- // without rebuilding it. We use this enum to dispatch to a private method,
354- // avoiding borrowck conflicts that occur when capturing `self`.
355- #[ derive( Clone , Debug , Eq , PartialEq ) ]
356- enum RequestType < ' a > {
357- PollCommands ,
358- ClaimCommand ( String ) ,
359- EmitEvent ( & ' a NodeEventEnvelope ) ,
360- CanSchedule ( & ' a WorkSet ) ,
361325}
362326
363327#[ cfg( test) ]
0 commit comments