@@ -22,12 +22,12 @@ use crate::{
2222 } ,
2323} ;
2424use activity_heartbeat_manager:: ActivityHeartbeatManager ;
25- use dashmap:: DashMap ;
2625use futures_util:: {
2726 Stream , StreamExt , stream,
2827 stream:: { BoxStream , PollNext } ,
2928} ;
3029use std:: {
30+ collections:: HashMap ,
3131 convert:: TryInto ,
3232 future,
3333 sync:: {
@@ -59,7 +59,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
5959use tokio_util:: sync:: CancellationToken ;
6060use tracing:: Span ;
6161
62- type OutstandingActMap = Arc < DashMap < TaskToken , RemoteInFlightActInfo > > ;
62+ type OutstandingActMap = Arc < parking_lot :: Mutex < HashMap < TaskToken , RemoteInFlightActInfo > > > ;
6363
6464#[ derive( Debug ) ]
6565struct PendingActivityCancel {
@@ -191,7 +191,7 @@ impl WorkerActivityTasks {
191191 local_timeout_buffer : Duration ,
192192 ) -> Self {
193193 let shutdown_initiated_token = CancellationToken :: new ( ) ;
194- let outstanding_activity_tasks = Arc :: new ( DashMap :: new ( ) ) ;
194+ let outstanding_activity_tasks = Arc :: new ( parking_lot :: Mutex :: new ( HashMap :: new ( ) ) ) ;
195195 let server_poller_stream =
196196 new_activity_task_poller ( poller, metrics. clone ( ) , shutdown_initiated_token. clone ( ) ) ;
197197 let ( eager_activities_tx, eager_activities_rx) = unbounded_channel ( ) ;
@@ -320,7 +320,11 @@ impl WorkerActivityTasks {
320320 status : aer:: Status ,
321321 client : & dyn WorkerClient ,
322322 ) {
323- if let Some ( ( _, act_info) ) = self . outstanding_activity_tasks . remove ( & task_token) {
323+ let act_info = {
324+ let mut outstanding_activity_tasks = self . outstanding_activity_tasks . lock ( ) ;
325+ outstanding_activity_tasks. remove ( & task_token)
326+ } ;
327+ if let Some ( act_info) = act_info {
324328 let act_metrics = self . metrics . with_new_attrs ( [
325329 activity_type ( act_info. base . activity_type ) ,
326330 workflow_type ( act_info. base . workflow_type ) ,
@@ -433,12 +437,14 @@ impl WorkerActivityTasks {
433437 details : ActivityHeartbeat ,
434438 ) -> Result < ( ) , ActivityHeartbeatError > {
435439 // TODO: Propagate these back as cancels. Silent fails is too nonobvious
436- let at_info = self
437- . outstanding_activity_tasks
438- . get ( & TaskToken ( details. task_token . clone ( ) ) )
439- . ok_or ( ActivityHeartbeatError :: UnknownActivity ) ?;
440- let heartbeat_timeout: Duration = at_info
441- . heartbeat_timeout
440+ let ( heartbeat_timeout, timeout_resetter) = {
441+ let outstanding_activity_tasks = self . outstanding_activity_tasks . lock ( ) ;
442+ let at_info = outstanding_activity_tasks
443+ . get ( & TaskToken ( details. task_token . clone ( ) ) )
444+ . ok_or ( ActivityHeartbeatError :: UnknownActivity ) ?;
445+ ( at_info. heartbeat_timeout , at_info. timeout_resetter . clone ( ) )
446+ } ;
447+ let heartbeat_timeout: Duration = heartbeat_timeout
442448 // We treat None as 0 (even though heartbeat_timeout is never set to None by the server)
443449 . unwrap_or_default ( )
444450 . try_into ( )
@@ -457,7 +463,7 @@ impl WorkerActivityTasks {
457463 let throttle_interval =
458464 std:: cmp:: min ( throttle_interval, self . max_heartbeat_throttle_interval ) ;
459465 self . heartbeat_manager
460- . record ( details, throttle_interval, at_info . timeout_resetter . clone ( ) )
466+ . record ( details, throttle_interval, timeout_resetter)
461467 }
462468
463469 /// Returns a handle that the workflows management side can use to interact with this manager
@@ -509,31 +515,32 @@ where
509515 // an outstanding activity task. This is fine because it means that we
510516 // no longer need to cancel this activity, so we'll just ignore such
511517 // orphaned cancellations.
512- if let Some ( mut details) =
513- self . outstanding_tasks . get_mut ( & next_pc. task_token )
514518 {
515- if details. issued_cancel_to_lang . is_some ( ) {
516- // Don't double-issue cancellations
517- None
518- } else {
519- details. issued_cancel_to_lang = Some ( next_pc. reason ) ;
520- if next_pc. reason == ActivityCancelReason :: NotFound
521- || next_pc. details . is_not_found
522- {
523- details. known_not_found = true ;
519+ let mut outstanding_tasks = self . outstanding_tasks . lock ( ) ;
520+ if let Some ( details) = outstanding_tasks. get_mut ( & next_pc. task_token ) {
521+ if details. issued_cancel_to_lang . is_some ( ) {
522+ // Don't double-issue cancellations
523+ None
524+ } else {
525+ details. issued_cancel_to_lang = Some ( next_pc. reason ) ;
526+ if next_pc. reason == ActivityCancelReason :: NotFound
527+ || next_pc. details . is_not_found
528+ {
529+ details. known_not_found = true ;
530+ }
531+ Some ( Ok ( ActivityTask :: cancel_from_ids (
532+ next_pc. task_token . 0 ,
533+ next_pc. reason ,
534+ next_pc. details ,
535+ ) ) )
524536 }
525- Some ( Ok ( ActivityTask :: cancel_from_ids (
526- next_pc. task_token . 0 ,
527- next_pc. reason ,
528- next_pc. details ,
529- ) ) )
537+ } else {
538+ debug ! ( task_token = %next_pc. task_token,
539+ "Unknown activity task when issuing cancel" ) ;
540+ // If we can't find the activity here, it's already been completed,
541+ // in which case issuing a cancel again is pointless.
542+ None
530543 }
531- } else {
532- debug ! ( task_token = %next_pc. task_token,
533- "Unknown activity task when issuing cancel" ) ;
534- // If we can't find the activity here, it's already been completed,
535- // in which case issuing a cancel again is pointless.
536- None
537544 }
538545 }
539546 ActivityTaskSource :: PendingStart ( res) => {
@@ -560,22 +567,24 @@ where
560567 } ;
561568
562569 let tt: TaskToken = task. resp . task_token . clone ( ) . into ( ) ;
563- let outstanding_entry = self . outstanding_tasks . entry ( tt . clone ( ) ) ;
564- let mut outstanding_info =
565- outstanding_entry . insert ( RemoteInFlightActInfo :: new (
570+ self . outstanding_tasks . lock ( ) . insert (
571+ tt . clone ( ) ,
572+ RemoteInFlightActInfo :: new (
566573 & task. resp ,
567574 task. permit . into_used ( ActivitySlotInfo {
568575 activity_type : activity_type_name. to_string ( ) ,
569576 } ) ,
570- ) ) ;
577+ ) ,
578+ ) ;
579+
571580 // If we have already waited the grace period and issued cancels,
572581 // this will have been set true, indicating anything that happened
573582 // to be buffered/in-flight/etc should get an immediate cancel. This
574583 // is to allow the user to potentially decide to ignore cancels and
575584 // do work on polls that got received during shutdown.
576585 if should_issue_immediate_cancel. load ( Ordering :: Acquire ) {
577586 let _ = cancels_tx. send ( PendingActivityCancel :: new (
578- tt,
587+ tt. clone ( ) ,
579588 ActivityCancelReason :: WorkerShutdown ,
580589 ActivityTask :: primary_reason_to_cancellation_details (
581590 ActivityCancelReason :: WorkerShutdown ,
@@ -602,13 +611,14 @@ where
602611 if let Some ( ( timeout_type, timeout_at) ) = timeout_at {
603612 let sleep_time = timeout_at + local_timeout_buffer;
604613 let cancel_tx = cancels_tx. clone ( ) ;
614+ let task_token = tt. clone ( ) ;
605615 let resetter = if timeout_type == HEARTBEAT_TYPE {
606616 Some ( Arc :: new ( Notify :: new ( ) ) )
607617 } else {
608618 None
609619 } ;
610620 let resetter_clone = resetter. clone ( ) ;
611- outstanding_info . local_timeouts_task =
621+ let local_timeouts_task =
612622 Some ( tokio:: task:: spawn ( async move {
613623 if let Some ( rs) = resetter_clone {
614624 loop {
@@ -621,12 +631,12 @@ where
621631 tokio:: time:: sleep ( sleep_time) . await ;
622632 }
623633 debug ! (
624- task_token=%tt ,
634+ task_token=%task_token ,
625635 "Timing out activity due to elapsed local \
626636 {timeout_type} timer"
627637 ) ;
628638 let _ = cancel_tx. send ( PendingActivityCancel :: new (
629- tt ,
639+ task_token ,
630640 ActivityCancelReason :: TimedOut ,
631641 ActivityCancellationDetails {
632642 is_not_found : true ,
@@ -635,7 +645,12 @@ where
635645 } ,
636646 ) ) ;
637647 } ) ) ;
638- outstanding_info. timeout_resetter = resetter;
648+ if let Some ( outstanding_info) =
649+ self . outstanding_tasks . lock ( ) . get_mut ( & tt)
650+ {
651+ outstanding_info. local_timeouts_task = local_timeouts_task;
652+ outstanding_info. timeout_resetter = resetter;
653+ }
639654 }
640655 }
641656
@@ -653,9 +668,14 @@ where
653668 self . shutdown_initiated_token . cancelled ( ) . await ;
654669 tokio:: time:: sleep ( gp) . await ;
655670 should_issue_immediate_cancel_clone. store ( true , Ordering :: Release ) ;
656- for mapref in outstanding_tasks_clone. iter ( ) {
671+ for task_token in outstanding_tasks_clone
672+ . lock ( )
673+ . keys ( )
674+ . cloned ( )
675+ . collect :: < Vec < _ > > ( )
676+ {
657677 let _ = self . cancels_tx . send ( PendingActivityCancel :: new (
658- mapref . key ( ) . clone ( ) ,
678+ task_token ,
659679 ActivityCancelReason :: WorkerShutdown ,
660680 ActivityTask :: primary_reason_to_cancellation_details (
661681 ActivityCancelReason :: WorkerShutdown ,
@@ -667,7 +687,10 @@ where
667687 join ! (
668688 async {
669689 self . start_tasks_stream_complete. cancelled( ) . await ;
670- while !outstanding_tasks_clone. is_empty( ) {
690+ while {
691+ let outstanding_tasks = outstanding_tasks_clone. lock( ) ;
692+ !outstanding_tasks. is_empty( )
693+ } {
671694 self . complete_notify. notified( ) . await
672695 }
673696 // If we were waiting for the grace period but everything already finished,
0 commit comments