Skip to content

Commit eb9ddab

Browse files
authored
feat(fastrace-futures): add StreamExt::enter_on_poll (#174)
1 parent 9e03104 commit eb9ddab

2 files changed

Lines changed: 160 additions & 0 deletions

File tree

fastrace-futures/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ futures-sink = { version = "0.3.31" }
2020
pin-project = { version = "1.1.8" }
2121

2222
[dev-dependencies]
23+
fastrace = { workspace = true, features = ["enable"] }
2324
async-stream = { version = "0.3" }
2425
futures = { version = "0.3" }
2526
tokio = { workspace = true }

fastrace-futures/src/lib.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22

33
#![doc = include_str!("../README.md")]
44

5+
use std::borrow::Cow;
56
use std::pin::Pin;
67
use std::task::Context;
78
use std::task::Poll;
89

910
use fastrace::Span;
11+
use fastrace::local::LocalSpan;
1012
use futures_core::Stream;
1113
use futures_sink::Sink;
1214

@@ -51,6 +53,68 @@ pub trait StreamExt: Stream + Sized {
5153
span: Some(span),
5254
}
5355
}
56+
57+
/// Starts a [`LocalSpan`] at every [`Stream::poll_next()`].
58+
///
59+
/// This is useful for tracing each **poll** of a stream (not each yielded item),
60+
/// e.g. to observe how often an async stream is woken. If you need a single span
61+
/// that covers the whole stream lifecycle, use [`StreamExt::in_span`] instead.
62+
///
63+
/// The span name can be any `impl Into<Cow<'static, str>>`.
64+
///
65+
/// # Important: Local parent required
66+
///
67+
/// `enter_on_poll` creates [`LocalSpan`]s, which require an existing local parent
68+
/// context at the time of each poll. Without one, the spans will be no-ops.
69+
///
70+
/// The typical way to provide a local parent is to wrap the stream with
71+
/// [`StreamExt::in_span`] **after** `enter_on_poll`:
72+
///
73+
/// ```text
74+
/// stream.enter_on_poll("poll").in_span(span)
75+
/// ```
76+
///
77+
/// ⚠️ Do **not** reverse the order:
78+
///
79+
/// ```text
80+
/// // WRONG: in_span sets the local parent *after* enter_on_poll tries to create
81+
/// // the LocalSpan, so the poll spans will be no-ops.
82+
/// stream.in_span(span).enter_on_poll("poll")
83+
/// ```
84+
///
85+
/// # Examples:
86+
///
87+
/// ```
88+
/// # #[tokio::main]
89+
/// # async fn main() {
90+
/// use async_stream::stream;
91+
/// use fastrace::prelude::*;
92+
/// use fastrace_futures::StreamExt as _;
93+
/// use futures::StreamExt;
94+
///
95+
/// let root = Span::root("root", SpanContext::random());
96+
///
97+
/// let s = stream! {
98+
/// for i in 0..2 {
99+
/// yield i;
100+
/// }
101+
/// }
102+
/// .enter_on_poll("poll")
103+
/// .in_span(Span::enter_with_parent("stream", &root));
104+
///
105+
/// tokio::pin!(s);
106+
///
107+
/// assert_eq!(s.next().await.unwrap(), 0);
108+
/// assert_eq!(s.next().await.unwrap(), 1);
109+
/// assert_eq!(s.next().await, None);
110+
/// # }
111+
/// ```
112+
fn enter_on_poll(self, name: impl Into<Cow<'static, str>>) -> EnterOnPollStream<Self> {
113+
EnterOnPollStream {
114+
inner: self,
115+
name: name.into(),
116+
}
117+
}
54118
}
55119

56120
impl<T> StreamExt for T where T: Stream {}
@@ -164,3 +228,98 @@ where T: Sink<I>
164228
}
165229
}
166230
}
231+
232+
/// Adapter for [`StreamExt::enter_on_poll()`](StreamExt::enter_on_poll).
233+
#[pin_project::pin_project]
234+
pub struct EnterOnPollStream<T> {
235+
#[pin]
236+
inner: T,
237+
name: Cow<'static, str>,
238+
}
239+
240+
impl<T> Stream for EnterOnPollStream<T>
241+
where T: Stream
242+
{
243+
type Item = T::Item;
244+
245+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
246+
let this = self.project();
247+
let _guard = LocalSpan::enter_with_local_parent(this.name.clone());
248+
this.inner.poll_next(cx)
249+
}
250+
}
251+
252+
#[cfg(test)]
253+
mod tests {
254+
use fastrace::local::LocalCollector;
255+
use fastrace::prelude::*;
256+
use futures::StreamExt as _;
257+
use futures::stream;
258+
259+
use crate::StreamExt as _;
260+
261+
#[tokio::test]
262+
async fn test_enter_on_poll_creates_spans() {
263+
let collector = LocalCollector::start();
264+
265+
let s = stream::iter(vec![1, 2]).enter_on_poll("poll");
266+
tokio::pin!(s);
267+
assert_eq!(s.next().await, Some(1));
268+
assert_eq!(s.next().await, Some(2));
269+
assert_eq!(s.next().await, None);
270+
271+
let local_spans = collector.collect();
272+
let parent_ctx = SpanContext::random();
273+
let spans = local_spans.to_span_records(parent_ctx);
274+
275+
let poll_count = spans.iter().filter(|s| s.name == "poll").count();
276+
assert!(
277+
poll_count >= 2,
278+
"expected at least 2 poll spans, got {}",
279+
poll_count
280+
);
281+
}
282+
283+
#[tokio::test]
284+
async fn test_enter_on_poll_pending_then_ready() {
285+
use std::pin::Pin;
286+
use std::task::Context;
287+
use std::task::Poll;
288+
289+
use futures::stream::Stream;
290+
291+
struct PendOnce {
292+
polled: bool,
293+
}
294+
295+
impl Stream for PendOnce {
296+
type Item = i32;
297+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<i32>> {
298+
if self.polled {
299+
Poll::Ready(Some(42))
300+
} else {
301+
self.polled = true;
302+
cx.waker().wake_by_ref();
303+
Poll::Pending
304+
}
305+
}
306+
}
307+
308+
let collector = LocalCollector::start();
309+
310+
let s = PendOnce { polled: false }.enter_on_poll("poll");
311+
tokio::pin!(s);
312+
assert_eq!(s.next().await, Some(42));
313+
314+
let local_spans = collector.collect();
315+
let parent_ctx = SpanContext::random();
316+
let spans = local_spans.to_span_records(parent_ctx);
317+
318+
let poll_count = spans.iter().filter(|s| s.name == "poll").count();
319+
assert!(
320+
poll_count >= 2,
321+
"expected at least 2 poll spans (pending + ready), got {}",
322+
poll_count
323+
);
324+
}
325+
}

0 commit comments

Comments
 (0)