Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 28 additions & 56 deletions tower/src/buffer/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pin_project_lite::pin_project! {
where
T: Service<Request>,
{
current_message: Option<Message<Request, T::Future>>,
rx: mpsc::Receiver<Message<Request, T::Future>>,
service: T,
finish: bool,
Expand Down Expand Up @@ -53,7 +52,6 @@ where
};

let worker = Worker {
current_message: None,
finish: false,
failed: None,
rx,
Expand All @@ -68,33 +66,18 @@ where
///
/// If a `Message` is returned, the `bool` is true if this is the first time we received this
/// message, and false otherwise (i.e., we tried to forward it to the backing service before).
fn poll_next_msg(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<(Message<Request, T::Future>, bool)>> {
fn poll_next_msg(&mut self, cx: &mut Context<'_>) -> Poll<Option<Message<Request, T::Future>>> {
if self.finish {
// We've already received None and are shutting down
return Poll::Ready(None);
}

tracing::trace!("worker polling for next message");
if let Some(msg) = self.current_message.take() {
// If the oneshot sender is closed, then the receiver is dropped,
// and nobody cares about the response. If this is the case, we
// should continue to the next request.
if !msg.tx.is_closed() {
tracing::trace!("resuming buffered request");
return Poll::Ready(Some((msg, false)));
}

tracing::trace!("dropping cancelled buffered request");
}

// Get the next request
while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
if !msg.tx.is_closed() {
tracing::trace!("processing new request");
return Poll::Ready(Some((msg, true)));
return Poll::Ready(Some(msg));
}
// Otherwise, request is canceled, so pop the next one.
tracing::trace!("dropping cancelled request");
Expand Down Expand Up @@ -150,51 +133,40 @@ where
}

loop {
if self.failed.is_none() {
match self.service.poll_ready(cx) {
Poll::Pending => {
tracing::trace!(service.ready = false);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
let error = e.into();
tracing::debug!({ %error }, "service failed");
self.failed(error);
}
Poll::Ready(Ok(())) => {
tracing::debug!(service.ready = true);
}
}
}
match ready!(self.poll_next_msg(cx)) {
Some((msg, first)) => {
Some(msg) => {
let _guard = msg.span.enter();
if let Some(ref failed) = self.failed {
tracing::trace!("notifying caller about worker failure");
let _ = msg.tx.send(Err(failed.clone()));
continue;
}

// Wait for the service to be ready
tracing::trace!(
resumed = !first,
message = "worker received request; waiting for service readiness"
);
match self.service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
tracing::debug!(service.ready = true, message = "processing request");
let response = self.service.call(msg.request);

// Send the response future back to the sender.
//
// An error means the request had been canceled in-between
// our calls, the response future will just be dropped.
tracing::trace!("returning response future");
let _ = msg.tx.send(Ok(response));
}
Poll::Pending => {
tracing::trace!(service.ready = false, message = "delay");
// Put out current message back in its slot.
drop(_guard);
self.current_message = Some(msg);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
let error = e.into();
tracing::debug!({ %error }, "service failed");
drop(_guard);
self.failed(error);
let _ = msg.tx.send(Err(self
.failed
.as_ref()
.expect("Worker::failed did not set self.failed?")
.clone()));
}
}
tracing::debug!(service.ready = true, message = "processing request");
let response = self.service.call(msg.request);

// Send the response future back to the sender.
//
// An error means the request had been canceled in-between
// our calls, the response future will just be dropped.
tracing::trace!("returning response future");
let _ = msg.tx.send(Ok(response));
}
None => {
// No more more requests _ever_.
Expand Down
73 changes: 15 additions & 58 deletions tower/tests/buffer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,13 @@ async fn waits_for_channel_capacity() {

assert_ready_ok!(service.poll_ready());
let mut response2 = task::spawn(service.call("hello"));
assert_pending!(worker.poll());

assert_ready_ok!(service.poll_ready());
let mut response3 = task::spawn(service.call("hello"));
assert_pending!(service.poll_ready());
assert_pending!(worker.poll());

// wake up worker's service (i.e. Mock), now it's ready to make progress
handle.allow(1);
// process the request(i.e. send to handle), return the response
// and then poll worker's service::poll_ready in next loop.
assert_pending!(worker.poll());

handle
Expand All @@ -192,10 +191,10 @@ async fn waits_for_channel_capacity() {
assert_ready_ok!(response1.poll());

assert_ready_ok!(service.poll_ready());
let mut response4 = task::spawn(service.call("hello"));
let mut response3 = task::spawn(service.call("hello"));
assert_pending!(worker.poll());

handle.allow(3);
handle.allow(2);
assert_pending!(worker.poll());

handle
Expand All @@ -216,16 +215,6 @@ async fn waits_for_channel_capacity() {
.send_response("world");
assert_pending!(worker.poll());
assert_ready_ok!(response3.poll());

assert_pending!(worker.poll());
handle
.next_request()
.await
.unwrap()
.1
.send_response("world");
assert_pending!(worker.poll());
assert_ready_ok!(response4.poll());
Comment on lines -220 to -228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these assertions no longer needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because in this case there is no other request will be issued, so it is no need to do this assertion.

}

#[tokio::test(flavor = "current_thread")]
Expand All @@ -243,14 +232,13 @@ async fn wakes_pending_waiters_on_close() {
assert_pending!(worker.poll());
let mut response = task::spawn(service1.call("hello"));

assert!(worker.is_woken(), "worker task should be woken by request");
assert!(
!worker.is_woken(),
"worker task would NOT be woken by request until worker's service is ready"
);
assert_pending!(worker.poll());

// fill the channel so all subsequent requests will wait for capacity
let service1 = assert_ready_ok!(task::spawn(service.ready()).poll());
assert_pending!(worker.poll());
let mut response2 = task::spawn(service1.call("world"));

let mut service1 = service.clone();
let mut ready1 = task::spawn(service1.ready());
assert_pending!(worker.poll());
Expand All @@ -271,13 +259,6 @@ async fn wakes_pending_waiters_on_close() {
err
);

let err = assert_ready_err!(response2.poll());
assert!(
err.is::<error::Closed>(),
"response should fail with a Closed, got: {:?}",
err
);

assert!(
ready1.is_woken(),
"dropping worker should wake ready task 1"
Expand Down Expand Up @@ -316,14 +297,13 @@ async fn wakes_pending_waiters_on_failure() {
assert_pending!(worker.poll());
let mut response = task::spawn(service1.call("hello"));

assert!(worker.is_woken(), "worker task should be woken by request");
assert!(
!worker.is_woken(),
"worker task would NOT be woken by request until worker's service is ready"
);
assert_pending!(worker.poll());

// fill the channel so all subsequent requests will wait for capacity
let service1 = assert_ready_ok!(task::spawn(service.ready()).poll());
assert_pending!(worker.poll());
let mut response2 = task::spawn(service1.call("world"));

let mut service1 = service.clone();
let mut ready1 = task::spawn(service1.ready());
assert_pending!(worker.poll());
Expand All @@ -336,6 +316,8 @@ async fn wakes_pending_waiters_on_failure() {

// fail the inner service
handle.send_error("foobar");
// consume the in-flight request and send an Err response, then run
// next loop until read None.
// worker task terminates
assert_ready!(worker.poll());

Expand All @@ -345,12 +327,6 @@ async fn wakes_pending_waiters_on_failure() {
"response should fail with a ServiceError, got: {:?}",
err
);
let err = assert_ready_err!(response2.poll());
assert!(
err.is::<error::ServiceError>(),
"response should fail with a ServiceError, got: {:?}",
err
);
Comment on lines -348 to -353
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this no longer the case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned above, the old exec order is

  1. poll_next_msg
  2. service.poll_ready
  3. service.call

the new exec order is

  1. service.poll_ready
  2. poll_next_msg
  3. service.call

there is no 'preload' message, so i delete it.


assert!(
ready1.is_woken(),
Expand All @@ -375,25 +351,6 @@ async fn wakes_pending_waiters_on_failure() {
);
}

#[tokio::test(flavor = "current_thread")]
async fn propagates_trace_spans() {
use tower::util::ServiceExt;
use tracing::Instrument;

let _t = support::trace_init();

let span = tracing::info_span!("my_span");

let service = support::AssertSpanSvc::new(span.clone());
let (service, worker) = Buffer::pair(service, 5);
let worker = tokio::spawn(worker);

let result = tokio::spawn(service.oneshot(()).instrument(span));

result.await.expect("service panicked").expect("failed");
worker.await.expect("worker panicked");
}
Comment on lines -378 to -395
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may i ask why this test was deleted?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this commit change the worker exec order:

  1. service.poll_ready
  2. poll_next_msg
  3. service.call

after 2 poll_next_msg, the span in the msg can be retrieved, the service.call is now in the span scope, but not the servce.poll_ready, as you see, AssertSpanSvc does not fit the case, so i delete this test.


#[tokio::test(flavor = "current_thread")]
async fn doesnt_leak_permits() {
let _t = support::trace_init();
Expand Down