@@ -71,7 +71,7 @@ impl NexusManager {
7171 let task_stream_input = stream:: select_with_strategy (
7272 UnboundedReceiverStream :: new ( cancels_rx) . map ( TaskStreamInput :: from) ,
7373 source_stream
74- . map ( TaskStreamInput :: from)
74+ . map ( |p| TaskStreamInput :: from ( Box :: new ( p ) ) )
7575 . chain ( stream:: once ( async move { TaskStreamInput :: SourceComplete } ) ) ,
7676 |_: & mut ( ) | PollNext :: Left ,
7777 ) ;
@@ -244,90 +244,92 @@ where
244244 self . source_stream
245245 . filter_map ( move |t| {
246246 let res = match t {
247- TaskStreamInput :: Poll ( Ok ( t) ) => {
248- if let Some ( dur) = t. resp . sched_to_start ( ) {
249- self . metrics . nexus_task_sched_to_start_latency ( dur) ;
250- } ;
247+ TaskStreamInput :: Poll ( t) => match * t {
248+ Ok ( t) => {
249+ if let Some ( dur) = t. resp . sched_to_start ( ) {
250+ self . metrics . nexus_task_sched_to_start_latency ( dur) ;
251+ } ;
251252
252- let tt = TaskToken ( t. resp . task_token . clone ( ) ) ;
253- let mut timeout_task = None ;
254- if let Some ( timeout_str) = t
255- . resp
256- . request
257- . as_ref ( )
258- . and_then ( |r| r. header . get ( REQUEST_TIMEOUT_HEADER ) )
259- {
260- if let Ok ( timeout_dur) = parse_request_timeout ( timeout_str) {
261- let tt_clone = tt. clone ( ) ;
262- let cancels_tx = self . cancels_tx . clone ( ) ;
263- timeout_task = Some ( tokio:: task:: spawn ( async move {
264- tokio:: time:: sleep ( timeout_dur) . await ;
265- debug ! (
253+ let tt = TaskToken ( t. resp . task_token . clone ( ) ) ;
254+ let mut timeout_task = None ;
255+ if let Some ( timeout_str) = t
256+ . resp
257+ . request
258+ . as_ref ( )
259+ . and_then ( |r| r. header . get ( REQUEST_TIMEOUT_HEADER ) )
260+ {
261+ if let Ok ( timeout_dur) = parse_request_timeout ( timeout_str) {
262+ let tt_clone = tt. clone ( ) ;
263+ let cancels_tx = self . cancels_tx . clone ( ) ;
264+ timeout_task = Some ( tokio:: task:: spawn ( async move {
265+ tokio:: time:: sleep ( timeout_dur) . await ;
266+ debug ! (
266267 task_token=%tt_clone,
267268 "Timing out nexus task due to elapsed local timeout timer"
268269 ) ;
269- let _ = cancels_tx. send ( CancelNexusTask {
270- task_token : tt_clone. 0 ,
271- reason : NexusTaskCancelReason :: TimedOut . into ( ) ,
272- } ) ;
273- } ) ) ;
274- } else {
275- // This could auto-respond and fail the nexus task, but given that
276- // the server is going to try to parse this as well, and all we're
277- // doing with this parsing is notifying the handler of a local
278- // timeout, it seems reasonable to rely on server to handle this.
279- warn ! (
270+ let _ = cancels_tx. send ( CancelNexusTask {
271+ task_token : tt_clone. 0 ,
272+ reason : NexusTaskCancelReason :: TimedOut . into ( ) ,
273+ } ) ;
274+ } ) ) ;
275+ } else {
276+ // This could auto-respond and fail the nexus task, but given that
277+ // the server is going to try to parse this as well, and all we're
278+ // doing with this parsing is notifying the handler of a local
279+ // timeout, it seems reasonable to rely on server to handle this.
280+ warn ! (
280281 "Failed to parse nexus timeout header value '{}'" ,
281282 timeout_str
282283 ) ;
284+ }
283285 }
284- }
285286
286- let ( service, operation, request_kind) = t
287- . resp
288- . request
289- . as_ref ( )
290- . and_then ( |r| r. variant . as_ref ( ) )
291- . map ( |v| match v {
292- Variant :: StartOperation ( s) => (
293- s. service . to_owned ( ) ,
294- s. operation . to_owned ( ) ,
295- RequestKind :: Start ,
296- ) ,
297- Variant :: CancelOperation ( c) => (
298- c. service . to_owned ( ) ,
299- c. operation . to_owned ( ) ,
300- RequestKind :: Cancel ,
301- ) ,
302- } )
303- . unwrap_or_default ( ) ;
304- self . outstanding_task_map . lock ( ) . insert (
305- tt,
306- NexusInFlightTask {
307- request_kind,
308- timeout_task,
309- scheduled_time : t
310- . resp
311- . request
312- . as_ref ( )
313- . and_then ( |r| r. scheduled_time )
314- . and_then ( |t| t. try_into ( ) . ok ( ) ) ,
315- start_time : Instant :: now ( ) ,
316- _permit : t. permit . into_used ( NexusSlotInfo { service, operation } ) ,
317- } ,
318- ) ;
319- Some ( Ok ( NexusTask {
320- variant : Some ( nexus_task:: Variant :: Task ( t. resp ) ) ,
321- } ) )
322- }
287+ let ( service, operation, request_kind) = t
288+ . resp
289+ . request
290+ . as_ref ( )
291+ . and_then ( |r| r. variant . as_ref ( ) )
292+ . map ( |v| match v {
293+ Variant :: StartOperation ( s) => (
294+ s. service . to_owned ( ) ,
295+ s. operation . to_owned ( ) ,
296+ RequestKind :: Start ,
297+ ) ,
298+ Variant :: CancelOperation ( c) => (
299+ c. service . to_owned ( ) ,
300+ c. operation . to_owned ( ) ,
301+ RequestKind :: Cancel ,
302+ ) ,
303+ } )
304+ . unwrap_or_default ( ) ;
305+ self . outstanding_task_map . lock ( ) . insert (
306+ tt,
307+ NexusInFlightTask {
308+ request_kind,
309+ timeout_task,
310+ scheduled_time : t
311+ . resp
312+ . request
313+ . as_ref ( )
314+ . and_then ( |r| r. scheduled_time )
315+ . and_then ( |t| t. try_into ( ) . ok ( ) ) ,
316+ start_time : Instant :: now ( ) ,
317+ _permit : t. permit . into_used ( NexusSlotInfo { service, operation } ) ,
318+ } ,
319+ ) ;
320+ Some ( Ok ( NexusTask {
321+ variant : Some ( nexus_task:: Variant :: Task ( t. resp ) ) ,
322+ } ) )
323+ } ,
324+ Err ( e) => Some ( Err ( PollError :: TonicError ( e) ) )
325+ } ,
323326 TaskStreamInput :: Cancel ( c) => Some ( Ok ( NexusTask {
324327 variant : Some ( nexus_task:: Variant :: CancelTask ( c) ) ,
325328 } ) ) ,
326329 TaskStreamInput :: SourceComplete => {
327330 source_done. cancel ( ) ;
328331 None
329332 }
330- TaskStreamInput :: Poll ( Err ( e) ) => Some ( Err ( PollError :: TonicError ( e) ) ) ,
331333 } ;
332334 async move { res }
333335 } )
@@ -379,7 +381,7 @@ enum RequestKind {
379381
380382#[ derive( derive_more:: From ) ]
381383enum TaskStreamInput {
382- Poll ( NexusPollItem ) ,
384+ Poll ( Box < NexusPollItem > ) ,
383385 Cancel ( CancelNexusTask ) ,
384386 SourceComplete ,
385387}
0 commit comments