Skip to content

Commit f76bf0a

Browse files
committed
@andresilva review
1 parent b3d5828 commit f76bf0a

3 files changed

Lines changed: 48 additions & 31 deletions

File tree

actor/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,11 @@ stability_scope!(ALPHA {
179179
futures::future::pending()
180180
}
181181

182-
/// Runs at the end of each iteration.
182+
/// Runs at the end of each iteration that dispatched a message.
183+
///
184+
/// Not guaranteed to be called after every single message. When
185+
/// multiple read-only messages are batched concurrently,
186+
/// `postprocess` may only run after the full batch completes.
183187
fn postprocess(
184188
&mut self,
185189
_context: &mut E,

actor/src/service/builder.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl<A> ServiceBuilder<A> {
9898
/// `capacity` is per-lane queue depth and must be non-zero.
9999
pub fn with_lane<L>(self, lane: L, capacity: NonZeroUsize) -> MultiLaneServiceBuilder<A, L>
100100
where
101-
L: Copy + Ord + Send + 'static,
101+
L: Ord + Send + 'static,
102102
{
103103
MultiLaneServiceBuilder {
104104
actor: self.actor,
@@ -110,7 +110,7 @@ impl<A> ServiceBuilder<A> {
110110
/// Add an unbounded lane, transitioning to a [`MultiLaneUnboundedServiceBuilder`].
111111
pub fn with_unbounded_lane<L>(self, lane: L) -> MultiLaneUnboundedServiceBuilder<A, L>
112112
where
113-
L: Copy + Ord + Send + 'static,
113+
L: Ord + Send + 'static,
114114
{
115115
MultiLaneUnboundedServiceBuilder {
116116
actor: self.actor,
@@ -186,7 +186,7 @@ impl<A> ServiceBuilder<A> {
186186
/// Configures a multi-lane actor service loop with bounded lanes.
187187
pub struct MultiLaneServiceBuilder<A, L>
188188
where
189-
L: Copy + Ord + Send + 'static,
189+
L: Ord + Send + 'static,
190190
{
191191
actor: A,
192192
lanes: Vec<(L, NonZeroUsize)>,
@@ -195,7 +195,7 @@ where
195195

196196
impl<A, L> MultiLaneServiceBuilder<A, L>
197197
where
198-
L: Copy + Ord + Send + 'static,
198+
L: Ord + Send + 'static,
199199
{
200200
/// Add another bounded lane.
201201
///
@@ -245,7 +245,7 @@ where
245245
/// Configures a multi-lane actor service loop with unbounded lanes.
246246
pub struct MultiLaneUnboundedServiceBuilder<A, L>
247247
where
248-
L: Copy + Ord + Send + 'static,
248+
L: Ord + Send + 'static,
249249
{
250250
actor: A,
251251
lanes: Vec<L>,
@@ -254,7 +254,7 @@ where
254254

255255
impl<A, L> MultiLaneUnboundedServiceBuilder<A, L>
256256
where
257-
L: Copy + Ord + Send + 'static,
257+
L: Ord + Send + 'static,
258258
{
259259
/// Add another unbounded lane.
260260
pub fn with_unbounded_lane(mut self, lane: L) -> Self {

actor/src/service/driver.rs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -203,39 +203,52 @@ where
203203
let external = self
204204
.actor
205205
.on_external(self.context.as_present_mut(), &mut args);
206-
Self::recv_event(&mut self.shutdown, &mut self.lanes, pin!(external)).await
207-
};
208-
match event {
209-
LoopEvent::Shutdown => {
210-
self.shutdown_gracefully(&mut args, &mut reads, "shutdown signal received")
211-
.await;
212-
return;
206+
let external = pin!(external);
207+
let recv = Self::recv_event(&mut self.shutdown, &mut self.lanes, external);
208+
if reads.is_empty() {
209+
Some(recv.await)
210+
} else {
211+
select! {
212+
event = recv => Some(event),
213+
result = reads.next() => {
214+
result.and_then(|is_fatal| is_fatal.then_some(LoopEvent::Shutdown))
215+
},
216+
}
213217
}
214-
LoopEvent::Mailbox(Some(message)) => match message.into_ingress_envelope() {
215-
IngressEnvelope::ReadOnly(message) => {
216-
self.handle_read_only(&args, &mut reads, message);
218+
};
219+
if let Some(event) = event {
220+
match event {
221+
LoopEvent::Shutdown => {
222+
self.shutdown_gracefully(&mut args, &mut reads, "shutdown signal received")
223+
.await;
224+
return;
217225
}
218-
IngressEnvelope::ReadWrite(message) => {
226+
LoopEvent::Mailbox(Some(message)) => match message.into_ingress_envelope() {
227+
IngressEnvelope::ReadOnly(message) => {
228+
self.handle_read_only(&args, &mut reads, message);
229+
}
230+
IngressEnvelope::ReadWrite(message) => {
231+
if self.handle_read_write(&mut args, &mut reads, message).await {
232+
return;
233+
}
234+
}
235+
},
236+
LoopEvent::External(Some(message)) => {
219237
if self.handle_read_write(&mut args, &mut reads, message).await {
220238
return;
221239
}
222240
}
223-
},
224-
LoopEvent::External(Some(message)) => {
225-
if self.handle_read_write(&mut args, &mut reads, message).await {
241+
LoopEvent::Mailbox(None) | LoopEvent::External(None) => {
242+
self.shutdown_gracefully(
243+
&mut args,
244+
&mut reads,
245+
"ingress source closed, shutting down actor",
246+
)
247+
.await;
226248
return;
227249
}
228250
}
229-
LoopEvent::Mailbox(None) | LoopEvent::External(None) => {
230-
self.shutdown_gracefully(
231-
&mut args,
232-
&mut reads,
233-
"ingress source closed, shutting down actor",
234-
)
235-
.await;
236-
return;
237-
}
238-
};
251+
}
239252

240253
self.actor
241254
.postprocess(self.context.as_present_mut(), &mut args)

0 commit comments

Comments
 (0)