Skip to content

Commit 0c5efd8

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Switch actor-loop supervision channel to a raw tokio mpsc (#3834)
Summary: Pull Request resolved: #3834 The supervision channel between an actor and its child does not need to be a monarch channel. A raw tokio mpsc channel is good enough. The main benefit of using the raw tokio mpsc is that we do not need to worry about the additional complexity brought from the monarch channel, especially the sequence number generation. In addition, it make the semantics clear. The raw tokio mpsc means the sender is within the same process. Reviewed By: mariusae Differential Revision: D104848635 fbshipit-source-id: b5f4777a6927ac97caa11783d7edd7a0bad8ae77
1 parent e4f60af commit 0c5efd8

5 files changed

Lines changed: 44 additions & 30 deletions

File tree

hyperactor/src/proc.rs

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ pub struct ActorInstance<A: Actor> {
523523
/// Handle to the actor (used for lifecycle control and port access).
524524
pub handle: ActorHandle<A>,
525525
/// Supervision events delivered to this actor.
526-
pub supervision: PortReceiver<ActorSupervisionEvent>,
526+
pub supervision: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
527527
/// Control signals for the actor.
528528
pub signal: PortReceiver<Signal>,
529529
/// Primary work queue for handler dispatch.
@@ -1814,7 +1814,10 @@ impl<A: Actor> Drop for InstanceState<A> {
18141814
pub struct InstanceReceivers<A: Actor> {
18151815
/// Signal and supervision receivers for the actor loop. `None`
18161816
/// for detached/client instances that don't run an actor loop.
1817-
actor_loop: Option<(PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>)>,
1817+
actor_loop: Option<(
1818+
PortReceiver<Signal>,
1819+
mpsc::UnboundedReceiver<ActorSupervisionEvent>,
1820+
)>,
18181821
/// Work queue for dispatching messages to actor handlers.
18191822
work: mpsc::UnboundedReceiver<WorkCell<A>>,
18201823
/// Introspect message receiver for the dedicated introspect task.
@@ -1854,10 +1857,10 @@ impl<A: Actor> Instance<A> {
18541857
None
18551858
} else {
18561859
let (signal_port, signal_receiver) = mailbox.open_port::<Signal>();
1857-
let (supervision_port, supervision_receiver) =
1858-
mailbox.open_port::<ActorSupervisionEvent>();
1860+
let (supervision_tx, supervision_receiver) =
1861+
mpsc::unbounded_channel::<ActorSupervisionEvent>();
18591862
Some((
1860-
(signal_port, supervision_port),
1863+
(signal_port, supervision_tx),
18611864
(signal_receiver, supervision_receiver),
18621865
))
18631866
};
@@ -2308,7 +2311,10 @@ impl<A: Actor> Instance<A> {
23082311
async fn serve(
23092312
mut self,
23102313
mut actor: A,
2311-
actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
2314+
actor_loop_receivers: (
2315+
PortReceiver<Signal>,
2316+
mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2317+
),
23122318
mut work_rx: mpsc::UnboundedReceiver<WorkCell<A>>,
23132319
) {
23142320
let result = self
@@ -2366,7 +2372,7 @@ impl<A: Actor> Instance<A> {
23662372
if let Some(parent) = self.inner.cell.maybe_unlink_parent() {
23672373
if let Some(event) = event {
23682374
// Parent exists, failure should be propagated to the parent.
2369-
parent.send_supervision_event_or_crash(&self, event);
2375+
parent.send_supervision_event_or_crash(event);
23702376
}
23712377
// TODO: we should get rid of this signal, and use *only* supervision events for
23722378
// the purpose of conveying lifecycle changes
@@ -2400,7 +2406,10 @@ impl<A: Actor> Instance<A> {
24002406
async fn run_actor_tree(
24012407
&mut self,
24022408
actor: &mut A,
2403-
mut actor_loop_receivers: (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
2409+
mut actor_loop_receivers: (
2410+
PortReceiver<Signal>,
2411+
mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2412+
),
24042413
work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
24052414
) -> Result<String, ActorError> {
24062415
// It is okay to catch all panics here, because we are in a tokio task,
@@ -2519,7 +2528,10 @@ impl<A: Actor> Instance<A> {
25192528
async fn run(
25202529
&mut self,
25212530
actor: &mut A,
2522-
actor_loop_receivers: &mut (PortReceiver<Signal>, PortReceiver<ActorSupervisionEvent>),
2531+
actor_loop_receivers: &mut (
2532+
PortReceiver<Signal>,
2533+
mpsc::UnboundedReceiver<ActorSupervisionEvent>,
2534+
),
25232535
work_rx: &mut mpsc::UnboundedReceiver<WorkCell<A>>,
25242536
) -> Result<String, ActorError> {
25252537
let (signal_receiver, supervision_event_receiver) = actor_loop_receivers;
@@ -2572,7 +2584,7 @@ impl<A: Actor> Instance<A> {
25722584
let _ = ACTOR_MESSAGE_HANDLER_DURATION.start(metric_pairs);
25732585
let work = work.expect("inconsistent work queue state");
25742586
if let Err(err) = work.handle(actor, self).await {
2575-
for supervision_event in supervision_event_receiver.drain() {
2587+
while let Ok(supervision_event) = supervision_event_receiver.try_recv() {
25762588
self.handle_supervision_event(actor, supervision_event).await?;
25772589
}
25782590
let kind = ActorErrorKind::processing(err);
@@ -2582,7 +2594,7 @@ impl<A: Actor> Instance<A> {
25822594
});
25832595
}
25842596
}
2585-
Ok(supervision_event) = supervision_event_receiver.recv() => {
2597+
Some(supervision_event) = supervision_event_receiver.recv() => {
25862598
self.handle_supervision_event(actor, supervision_event).await?;
25872599
}
25882600
}
@@ -2971,7 +2983,10 @@ struct InstanceCellState {
29712983
proc: Proc,
29722984

29732985
/// Control port handles to the actor loop, if one is running.
2974-
actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
2986+
actor_loop: Option<(
2987+
PortHandle<Signal>,
2988+
mpsc::UnboundedSender<ActorSupervisionEvent>,
2989+
)>,
29752990

29762991
/// An observer that stores the current status of the actor.
29772992
status: watch::Receiver<ActorStatus>,
@@ -3120,7 +3135,10 @@ impl InstanceCell {
31203135
actor_id: ActorAddr,
31213136
actor_type: ActorType,
31223137
proc: Proc,
3123-
actor_loop: Option<(PortHandle<Signal>, PortHandle<ActorSupervisionEvent>)>,
3138+
actor_loop: Option<(
3139+
PortHandle<Signal>,
3140+
mpsc::UnboundedSender<ActorSupervisionEvent>,
3141+
)>,
31243142
status: watch::Receiver<ActorStatus>,
31253143
parent: Option<InstanceCell>,
31263144
ports: Arc<dyn Any + Send + Sync>,
@@ -3229,14 +3247,10 @@ impl InstanceCell {
32293247
/// Note that "let it crash" is the default behavior when a supervision event
32303248
/// cannot be delivered upstream. It is the upstream's responsibility to
32313249
/// detect and handle crashes.
3232-
pub fn send_supervision_event_or_crash(
3233-
&self,
3234-
child_cx: &impl context::Actor, // context of the child who sends the event.
3235-
event: ActorSupervisionEvent,
3236-
) {
3250+
pub fn send_supervision_event_or_crash(&self, event: ActorSupervisionEvent) {
32373251
match &self.inner.actor_loop {
3238-
Some((_, supervision_port)) => {
3239-
if let Err(err) = supervision_port.send(child_cx, event.clone()) {
3252+
Some((_, supervision_tx)) => {
3253+
if let Err(err) = supervision_tx.send(event.clone()) {
32403254
if !event.is_error() {
32413255
// Normal lifecycle events (e.g. clean stop) that fail to
32423256
// send are silently dropped. This happens when a child

hyperactor_mesh/src/global_context.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ pub struct GlobalClientActor {
177177
/// The root client is a monitor, so it should process these
178178
/// events without crashing on routine routing/delivery failures
179179
/// it observes.
180-
supervision_rx: PortReceiver<ActorSupervisionEvent>,
180+
supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
181181
/// Primary work queue for handler dispatch.
182182
///
183183
/// Any bound handler message (e.g. `MeshFailure`,
@@ -195,7 +195,7 @@ impl GlobalClientActor {
195195
work = self.work_rx.recv() => {
196196
let work = work.expect("inconsistent work queue state");
197197
if let Err(err) = work.handle(&mut self, instance).await {
198-
for supervision_event in self.supervision_rx.drain() {
198+
while let Ok(supervision_event) = self.supervision_rx.try_recv() {
199199
instance.handle_supervision_event(&mut self, supervision_event).await
200200
.expect("GlobalClientActor::handle_supervision_event is infallible");
201201
}
@@ -209,7 +209,7 @@ impl GlobalClientActor {
209209
_ = self.signal_rx.recv() => {
210210
// TODO: do we need any signal handling for the root client?
211211
}
212-
Ok(supervision_event) = self.supervision_rx.recv() => {
212+
Some(supervision_event) = self.supervision_rx.recv() => {
213213
instance.handle_supervision_event(&mut self, supervision_event).await
214214
.expect("GlobalClientActor::handle_supervision_event is infallible");
215215
}

hyperactor_mesh/src/testing.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ fn process_name(pid: u32) -> Option<String> {
182182
#[derive(Debug)]
183183
pub struct TestRootClient {
184184
signal_rx: PortReceiver<Signal>,
185-
supervision_rx: PortReceiver<ActorSupervisionEvent>,
185+
supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
186186
work_rx: mpsc::UnboundedReceiver<WorkCell<Self>>,
187187
}
188188

@@ -206,7 +206,7 @@ impl TestRootClient {
206206
work = self.work_rx.recv() => {
207207
let work = work.expect("inconsistent work queue state");
208208
if let Err(err) = work.handle(&mut self, instance).await {
209-
for supervision_event in self.supervision_rx.drain() {
209+
while let Ok(supervision_event) = self.supervision_rx.try_recv() {
210210
if let Err(err) = instance.handle_supervision_event(&mut self, supervision_event).await {
211211
break 'messages err;
212212
}
@@ -221,7 +221,7 @@ impl TestRootClient {
221221
_ = self.signal_rx.recv() => {
222222
// TODO: do we need any signal handling for the root client?
223223
}
224-
Ok(supervision_event) = self.supervision_rx.recv() => {
224+
Some(supervision_event) = self.supervision_rx.recv() => {
225225
if let Err(err) = instance.handle_supervision_event(&mut self, supervision_event).await {
226226
break err;
227227
}

monarch_hyperactor/src/actor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ impl PythonActor {
762762
// exiting the loop.
763763
// Else, continue handling messages.
764764
if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
765-
for supervision_event in supervision_rx.drain() {
765+
while let Ok(supervision_event) = supervision_rx.try_recv() {
766766
if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
767767
break 'messages Some(err);
768768
}
@@ -787,7 +787,7 @@ impl PythonActor {
787787
Err(err) => break Some(err),
788788
}
789789
}
790-
Ok(supervision_event) = supervision_rx.recv() => {
790+
Some(supervision_event) = supervision_rx.recv() => {
791791
if let Err(err) = instance.handle_supervision_event(&mut actor, supervision_event).await {
792792
break Some(err);
793793
}

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ mod tests {
864864
#[derive(Debug)]
865865
struct TestClient {
866866
signal_rx: PortReceiver<Signal>,
867-
supervision_rx: PortReceiver<ActorSupervisionEvent>,
867+
supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
868868
work_rx: mpsc::UnboundedReceiver<WorkCell<Self>>,
869869
}
870870

@@ -895,7 +895,7 @@ mod tests {
895895
}
896896
}
897897
_ = self.signal_rx.recv() => {}
898-
Ok(event) = self.supervision_rx.recv() => {
898+
Some(event) = self.supervision_rx.recv() => {
899899
let _ = instance
900900
.handle_supervision_event(&mut self, event)
901901
.await;

0 commit comments

Comments
 (0)