Skip to content

Commit 994b6df

Browse files
nit
1 parent 94801b1 commit 994b6df

44 files changed

Lines changed: 642 additions & 454 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/dylints/blocking_in_actor_loop/src/lib.rs

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ dylint_linting::declare_late_lint! {
1818
///
1919
/// Detects blocking waits in actor event loops, such as awaiting an actor
2020
/// mailbox request/reply method or an async handler callback inside
21-
/// `select_loop!`.
21+
/// `select_loop!`. It also detects async mailbox request/reply helpers that
22+
/// create a oneshot channel, enqueue a message with the sender, and then
23+
/// await the receiver before returning.
2224
///
2325
/// ### Why is this bad?
2426
///
2527
/// Actor loops must keep polling their mailbox and other event sources.
2628
/// Awaiting another actor or an application callback from a branch body can
27-
/// starve the loop. Spawn or pool the work and select on its completion
28-
/// instead.
29+
/// starve the loop. Hiding the wait behind an async mailbox helper makes it
30+
/// too easy for actor loops to block accidentally. Spawn or pool the work
31+
/// and select on its completion instead.
2932
///
3033
/// ### Example
3134
///
@@ -62,6 +65,20 @@ impl<'tcx> LateLintPass<'tcx> for BlockingInActorLoop {
6265
Some(kind) => BlockingAwaitVisitor { cx, kind }.visit_expr(body.value),
6366
None => {}
6467
}
68+
69+
if let Some(span) = request_reply_helper_wait(cx, body) {
70+
cx.emit_span_lint(
71+
BLOCKING_IN_ACTOR_LOOP,
72+
span,
73+
DiagDecorator(|diag| {
74+
diag.primary_message("async mailbox request/reply helper hides an actor wait");
75+
diag.span_help(
76+
span,
77+
"return the oneshot receiver and let callers select or await explicitly",
78+
);
79+
}),
80+
);
81+
}
6582
}
6683
}
6784

@@ -135,6 +152,50 @@ fn actor_loop_kind(cx: &LateContext<'_>, body: &Body<'_>) -> Option<ActorLoopKin
135152
None
136153
}
137154

155+
fn request_reply_helper_wait(cx: &LateContext<'_>, body: &Body<'_>) -> Option<Span> {
156+
let snippet = cx
157+
.sess()
158+
.source_map()
159+
.span_to_snippet(body.value.span)
160+
.ok()?;
161+
162+
if !snippet.contains("oneshot::channel")
163+
|| !snippet.contains(".enqueue(Message::")
164+
|| !snippet.contains(".await")
165+
{
166+
return None;
167+
}
168+
169+
AwaitVisitor::default().await_span(body.value)
170+
}
171+
172+
#[derive(Default)]
173+
struct AwaitVisitor {
174+
span: Option<Span>,
175+
}
176+
177+
impl AwaitVisitor {
178+
fn await_span<'tcx>(mut self, expr: &'tcx Expr<'tcx>) -> Option<Span> {
179+
self.visit_expr(expr);
180+
self.span
181+
}
182+
}
183+
184+
impl<'tcx> Visitor<'tcx> for AwaitVisitor {
185+
fn visit_expr(&mut self, expr: &'tcx Expr<'tcx>) {
186+
if self.span.is_some() {
187+
return;
188+
}
189+
190+
if matches!(expr.kind, ExprKind::Match(_, _, MatchSource::AwaitDesugar)) {
191+
self.span = Some(expr.span);
192+
return;
193+
}
194+
195+
walk_expr(self, expr);
196+
}
197+
}
198+
138199
fn awaited_call<'tcx>(expr: &'tcx Expr<'tcx>) -> Option<AwaitedCall<'tcx>> {
139200
let ExprKind::Match(awaited, _, MatchSource::AwaitDesugar) = expr.kind else {
140201
return None;

.github/dylints/blocking_in_actor_loop/ui/main.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,31 @@ macro_rules! select_loop {
1414
}};
1515
}
1616

17+
mod oneshot {
18+
use std::{
19+
future::Future,
20+
marker::PhantomData,
21+
pin::Pin,
22+
task::{Context, Poll},
23+
};
24+
25+
pub struct Sender<T>(PhantomData<T>);
26+
27+
pub struct Receiver<T>(PhantomData<T>);
28+
29+
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
30+
(Sender(PhantomData), Receiver(PhantomData))
31+
}
32+
33+
impl<T> Future for Receiver<T> {
34+
type Output = Result<T, ()>;
35+
36+
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
37+
Poll::Pending
38+
}
39+
}
40+
}
41+
1742
trait Handler {
1843
async fn process(&mut self);
1944
}
@@ -50,6 +75,28 @@ impl Stream {
5075
async fn recv(&mut self) {}
5176
}
5277

78+
enum Message {
79+
Ready { responder: oneshot::Sender<()> },
80+
}
81+
82+
struct RequestReplyMailbox;
83+
84+
impl RequestReplyMailbox {
85+
fn enqueue(&self, _: Message) {}
86+
87+
async fn ready(&self) -> Option<()> {
88+
let (responder, receiver) = oneshot::channel();
89+
self.enqueue(Message::Ready { responder });
90+
receiver.await.ok()
91+
}
92+
93+
fn ready_receiver(&self) -> oneshot::Receiver<()> {
94+
let (responder, receiver) = oneshot::channel();
95+
self.enqueue(Message::Ready { responder });
96+
receiver
97+
}
98+
}
99+
53100
async fn bad_handler(mut handler: impl Handler) {
54101
let context = Context;
55102
select_loop!(context, {
Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,79 @@
1+
warning: async mailbox request/reply helper hides an actor wait
2+
--> $DIR/main.rs:90:9
3+
|
4+
LL | receiver.await.ok()
5+
| ^^^^^^^^^^^^^^
6+
|
7+
help: return the oneshot receiver and let callers select or await explicitly
8+
--> $DIR/main.rs:90:9
9+
|
10+
LL | receiver.await.ok()
11+
| ^^^^^^^^^^^^^^
12+
note: the lint level is defined here
13+
--> $DIR/main.rs:3:9
14+
|
15+
LL | #![warn(blocking_in_actor_loop)]
16+
| ^^^^^^^^^^^^^^^^^^^^^^
17+
118
warning: blocking await in actor loop
2-
--> $DIR/main.rs:56:9
19+
--> $DIR/main.rs:103:9
320
|
421
LL | handler.process().await;
522
| ^^^^^^^^^^^^^^^^^^^^^^^
623
|
724
help: queue this work and select on its completion instead of awaiting it in the actor loop
8-
--> $DIR/main.rs:56:9
25+
--> $DIR/main.rs:103:9
926
|
1027
LL | handler.process().await;
1128
| ^^^^^^^^^^^^^^^^^
12-
note: the lint level is defined here
13-
--> $DIR/main.rs:3:9
14-
|
15-
LL | #![warn(blocking_in_actor_loop)]
16-
| ^^^^^^^^^^^^^^^^^^^^^^
1729

1830
warning: blocking await in actor loop
19-
--> $DIR/main.rs:63:9
31+
--> $DIR/main.rs:110:9
2032
|
2133
LL | monitor.collected().await;
2234
| ^^^^^^^^^^^^^^^^^^^^^^^^^
2335
|
2436
help: queue this work and select on its completion instead of awaiting it in the actor loop
25-
--> $DIR/main.rs:63:9
37+
--> $DIR/main.rs:110:9
2638
|
2739
LL | monitor.collected().await;
2840
| ^^^^^^^^^^^^^^^^^^^
2941

3042
warning: blocking await in actor loop
31-
--> $DIR/main.rs:70:9
43+
--> $DIR/main.rs:117:9
3244
|
3345
LL | mailbox.dial().await;
3446
| ^^^^^^^^^^^^^^^^^^^^
3547
|
3648
help: queue this work and select on its completion instead of awaiting it in the actor loop
37-
--> $DIR/main.rs:70:9
49+
--> $DIR/main.rs:117:9
3850
|
3951
LL | mailbox.dial().await;
4052
| ^^^^^^^^^^^^^^
4153

4254
warning: blocking await in actor loop
43-
--> $DIR/main.rs:83:9
55+
--> $DIR/main.rs:130:9
4456
|
4557
LL | stream.send().await;
4658
| ^^^^^^^^^^^^^^^^^^^
4759
|
4860
help: queue this work and select on its completion instead of awaiting it in the actor loop
49-
--> $DIR/main.rs:83:9
61+
--> $DIR/main.rs:130:9
5062
|
5163
LL | stream.send().await;
5264
| ^^^^^^^^^^^^^
5365

5466
warning: blocking await in actor loop
55-
--> $DIR/main.rs:84:9
67+
--> $DIR/main.rs:131:9
5668
|
5769
LL | stream.recv().await;
5870
| ^^^^^^^^^^^^^^^^^^^
5971
|
6072
help: queue this work and select on its completion instead of awaiting it in the actor loop
61-
--> $DIR/main.rs:84:9
73+
--> $DIR/main.rs:131:9
6274
|
6375
LL | stream.recv().await;
6476
| ^^^^^^^^^^^^^
6577

66-
warning: 5 warnings emitted
78+
warning: 6 warnings emitted
6779

broadcast/src/buffered/ingress.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,11 @@ impl<P: PublicKey, M: Digestible + Codec> Mailbox<P, M> {
127127

128128
/// Get a message by digest.
129129
///
130-
/// If the engine has shut down, returns `None`.
131-
pub async fn get(&self, digest: M::Digest) -> Option<M> {
130+
/// The returned receiver is closed if the engine has shut down.
131+
pub fn get(&self, digest: M::Digest) -> oneshot::Receiver<Option<M>> {
132132
let (responder, receiver) = oneshot::channel();
133133
let _ = self.sender.enqueue(Message::Get { digest, responder });
134-
receiver.await.unwrap_or_default()
134+
receiver
135135
}
136136
}
137137

0 commit comments

Comments
 (0)