Skip to content

Commit c8abfd2

Browse files
committed
Fix pinned drop violations; make types Unpin where practical
* FutureGroup and StreamGroup are now `Unpin` types. At heart, they kinda always were * ChunkedVec is now built around pinning, fulfilling its destiny as a data structure made specifically for pinnable data * Brief changes to tests, to catch the bug that spawned this commit Properly speaking, neither FutureGroup nor StreamGroup need to be pinned. The futures/streams are placed in allocations managed by ChunkedVec, which is responsible for keeping the futures/streams pinned. This has been true. In fact, this means the entire pin_project/`#[pin]` management done by these types was superfluous. FutureGroup and StreamGroup don't store any pinnable data inline. Going forward, these types can be safely exposed as Unpin. Even if their implementation changes again, FutureGroup and StreamGroup are never going to store their streams/futures on stack. Only inline storage makes pinning self-referential data meaningful; heap storage will always be stable. Thus, making them Unpin is totally safe from the perspective of future compatibility. The same goes for derived types. As long as the futures/streams are polled on the heap, there's no way they will need to be pinned in the old-fashioned (inline) way.
1 parent 877daa6 commit c8abfd2

7 files changed

Lines changed: 146 additions & 168 deletions

File tree

src/concurrent_stream/for_each.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use core::sync::atomic::{AtomicUsize, Ordering};
1212
use core::task::{ready, Context, Poll};
1313

1414
// OK: validated! - all bounds should check out
15-
#[pin_project]
1615
pub(crate) struct ForEachConsumer<FutT, T, F, FutB>
1716
where
1817
FutT: Future<Output = T>,
@@ -21,13 +20,20 @@ where
2120
{
2221
// NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential
2322
count: Arc<AtomicUsize>,
24-
#[pin]
2523
group: FutureGroup<ForEachFut<F, FutT, T, FutB>>,
2624
limit: usize,
2725
f: F,
2826
_phantom: PhantomData<(T, FutB)>,
2927
}
3028

29+
impl<FutT, T, F, FutB> Unpin for ForEachConsumer<FutT, T, F, FutB>
30+
where
31+
FutT: Future<Output = T>,
32+
F: Fn(T) -> FutB,
33+
FutB: Future<Output = ()>,
34+
{
35+
}
36+
3137
impl<A, T, F, B> ForEachConsumer<A, T, F, B>
3238
where
3339
A: Future<Output = T>,
@@ -60,32 +66,30 @@ where
6066
type Output = ();
6167

6268
async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
63-
let mut this = self.project();
69+
let mut this = self.get_mut();
6470
// If we have no space, we're going to provide backpressure until we have space
65-
while this.count.load(Ordering::Relaxed) >= *this.limit {
71+
while this.count.load(Ordering::Relaxed) >= this.limit {
6672
this.group.next().await;
6773
}
6874

6975
// Space was available! - insert the item for posterity
7076
this.count.fetch_add(1, Ordering::Relaxed);
7177
let fut = ForEachFut::new(this.f.clone(), future, this.count.clone());
72-
this.group.as_mut().insert_pinned(fut);
78+
this.group.insert(fut);
7379

7480
ConsumerState::Continue
7581
}
7682

77-
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
78-
let mut this = self.project();
79-
while (this.group.next().await).is_some() {}
83+
async fn progress(mut self: Pin<&mut Self>) -> super::ConsumerState {
84+
while self.group.next().await.is_some() {}
8085
ConsumerState::Empty
8186
}
8287

83-
async fn flush(self: Pin<&mut Self>) -> Self::Output {
84-
let mut this = self.project();
88+
async fn flush(mut self: Pin<&mut Self>) -> Self::Output {
8589
// 4. We will no longer receive any additional futures from the
8690
// underlying stream; wait until all the futures in the group have
8791
// resolved.
88-
while (this.group.next().await).is_some() {}
92+
while self.group.next().await.is_some() {}
8993
}
9094
}
9195

src/concurrent_stream/from_concurrent_stream.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ impl<T, E> FromConcurrentStream<Result<T, E>> for Result<Vec<T>, E> {
4141
}
4242

4343
// TODO: replace this with a generalized `fold` operation
44-
#[pin_project]
4544
pub(crate) struct VecConsumer<'a, Fut: Future> {
46-
#[pin]
4745
group: FutureGroup<Fut>,
4846
output: &'a mut Vec<Fut::Output>,
4947
}
@@ -63,31 +61,28 @@ where
6361
{
6462
type Output = ();
6563

66-
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
67-
let mut this = self.project();
64+
async fn send(mut self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
6865
// unbounded concurrency, so we just goooo
69-
this.group.as_mut().insert_pinned(future);
66+
self.group.insert(future);
7067
ConsumerState::Continue
7168
}
7269

7370
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
74-
let mut this = self.project();
71+
let this = self.get_mut();
7572
while let Some(item) = this.group.next().await {
7673
this.output.push(item);
7774
}
7875
ConsumerState::Empty
7976
}
8077
async fn flush(self: Pin<&mut Self>) -> Self::Output {
81-
let mut this = self.project();
78+
let this = self.get_mut();
8279
while let Some(item) = this.group.next().await {
8380
this.output.push(item);
8481
}
8582
}
8683
}
8784

88-
#[pin_project]
8985
pub(crate) struct ResultVecConsumer<'a, Fut: Future, T, E> {
90-
#[pin]
9186
group: FutureGroup<Fut>,
9287
output: &'a mut Result<Vec<T>, E>,
9388
}
@@ -107,15 +102,14 @@ where
107102
{
108103
type Output = ();
109104

110-
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
111-
let mut this = self.project();
105+
async fn send(mut self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
112106
// unbounded concurrency, so we just goooo
113-
this.group.as_mut().insert_pinned(future);
107+
self.group.insert(future);
114108
ConsumerState::Continue
115109
}
116110

117111
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
118-
let mut this = self.project();
112+
let this = self.get_mut();
119113
let Ok(items) = this.output else {
120114
return ConsumerState::Break;
121115
};
@@ -126,7 +120,7 @@ where
126120
items.push(item);
127121
}
128122
Err(e) => {
129-
**this.output = Err(e);
123+
*this.output = Err(e);
130124
return ConsumerState::Break;
131125
}
132126
}

src/concurrent_stream/try_for_each.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::concurrent_stream::ConsumerState;
22
use crate::future::FutureGroup;
33
use crate::private::Try;
44
use futures_lite::StreamExt;
5-
use pin_project::pin_project;
65

76
use super::Consumer;
87
use alloc::sync::Arc;
@@ -14,7 +13,6 @@ use core::pin::Pin;
1413
use core::sync::atomic::{AtomicUsize, Ordering};
1514
use core::task::{ready, Context, Poll};
1615

17-
#[pin_project]
1816
pub(crate) struct TryForEachConsumer<FutT, T, F, FutB, B>
1917
where
2018
FutT: Future<Output = T>,
@@ -24,14 +22,22 @@ where
2422
{
2523
// NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential
2624
count: Arc<AtomicUsize>,
27-
#[pin]
2825
group: FutureGroup<TryForEachFut<F, FutT, T, FutB, B>>,
2926
limit: usize,
3027
residual: Option<B::Residual>,
3128
f: F,
3229
_phantom: PhantomData<(T, FutB)>,
3330
}
3431

32+
impl<FutT, T, F, FutB, B> Unpin for TryForEachConsumer<FutT, T, F, FutB, B>
33+
where
34+
FutT: Future<Output = T>,
35+
F: Clone + Fn(T) -> FutB,
36+
FutB: Future<Output = B>,
37+
B: Try<Output = ()>,
38+
{
39+
}
40+
3541
impl<FutT, T, F, FutB, B> TryForEachConsumer<FutT, T, F, FutB, B>
3642
where
3743
FutT: Future<Output = T>,
@@ -66,9 +72,9 @@ where
6672
type Output = B;
6773

6874
async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
69-
let mut this = self.project();
75+
let mut this = self.get_mut();
7076
// If we have no space, we're going to provide backpressure until we have space
71-
while this.count.load(Ordering::Relaxed) >= *this.limit {
77+
while this.count.load(Ordering::Relaxed) >= this.limit {
7278
match this.group.next().await {
7379
// Case 1: there are no more items available in the group. We
7480
// can no longer iterate over them, and necessarily should be
@@ -82,7 +88,7 @@ where
8288
// entirely so we can short-circuit with an error from the
8389
// `flush` method.
8490
ControlFlow::Break(residual) => {
85-
*this.residual = Some(residual);
91+
this.residual = Some(residual);
8692
return ConsumerState::Break;
8793
}
8894
},
@@ -92,23 +98,23 @@ where
9298
// Space was available! - insert the item for posterity
9399
this.count.fetch_add(1, Ordering::Relaxed);
94100
let fut = TryForEachFut::new(this.f.clone(), future, this.count.clone());
95-
this.group.as_mut().insert_pinned(fut);
101+
this.group.insert(fut);
96102
ConsumerState::Continue
97103
}
98104

99105
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
100-
let mut this = self.project();
106+
let mut this = self.get_mut();
101107
while let Some(res) = this.group.next().await {
102108
if let ControlFlow::Break(residual) = res.branch() {
103-
*this.residual = Some(residual);
109+
this.residual = Some(residual);
104110
return ConsumerState::Break;
105111
}
106112
}
107113
ConsumerState::Empty
108114
}
109115

110116
async fn flush(self: Pin<&mut Self>) -> Self::Output {
111-
let mut this = self.project();
117+
let mut this = self.get_mut();
112118
// Return the error if we stopped iteration because of a previous error.
113119
if this.residual.is_some() {
114120
return B::from_residual(this.residual.take().unwrap());

src/future/future_group.rs

Lines changed: 15 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ use crate::utils::{ChunkedVec, PollState, PollVec, WakerVec};
5757
/// # });}
5858
/// ```
5959
#[must_use = "`FutureGroup` does nothing if not iterated over"]
60-
#[pin_project::pin_project]
6160
pub struct FutureGroup<F> {
62-
#[pin]
6361
futures: ChunkedVec<F>,
6462
wakers: WakerVec,
6563
states: PollVec,
@@ -189,7 +187,7 @@ impl<F> FutureGroup<F> {
189187
let is_present = self.keys.remove(&key.0);
190188
if is_present {
191189
self.states[key.0].set_none();
192-
self.futures.remove(key.0);
190+
self.futures.remove_in_place(key.0);
193191
}
194192
is_present
195193
}
@@ -272,43 +270,6 @@ impl<F: Future> FutureGroup<F> {
272270
Key(index)
273271
}
274272

275-
/// Insert a value into a pinned `FutureGroup`
276-
///
277-
/// This method is private because it serves as an implementation detail for
278-
/// `ConcurrentStream`. We should never expose this publicly, as the entire
279-
/// point of this crate is that we abstract the futures poll machinery away
280-
/// from end-users.
281-
///
282-
/// # Safety
283-
///
284-
/// This is safe because `ChunkedVec` uses a triangular allocation
285-
/// strategy that never moves existing elements when growing. Each bucket
286-
/// is a heap-allocated box that remains at a stable address.
287-
#[allow(unused)]
288-
pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key
289-
where
290-
F: Future,
291-
{
292-
let mut this = self.project();
293-
// SAFETY: ChunkedVec guarantees that inserting a value never moves
294-
// existing values. Growth allocates new buckets without touching existing ones.
295-
let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future);
296-
this.keys.insert(index);
297-
let key = Key(index);
298-
299-
// Update tracking structures to match the new capacity
300-
let new_cap = this.futures.as_ref().capacity();
301-
this.wakers.resize(new_cap);
302-
this.states.resize(new_cap);
303-
304-
// Set the corresponding state
305-
this.states[index].set_pending();
306-
let mut readiness = this.wakers.readiness();
307-
readiness.set_ready(index);
308-
309-
key
310-
}
311-
312273
/// Create a stream which also yields the key of each item.
313274
///
314275
/// # Example
@@ -337,19 +298,14 @@ impl<F: Future> FutureGroup<F> {
337298
}
338299

339300
impl<F: Future> FutureGroup<F> {
340-
fn poll_next_inner(
341-
self: Pin<&mut Self>,
342-
cx: &Context<'_>,
343-
) -> Poll<Option<(Key, <F as Future>::Output)>> {
344-
let mut this = self.project();
345-
301+
fn poll_next_inner(&mut self, cx: &Context<'_>) -> Poll<Option<(Key, <F as Future>::Output)>> {
346302
// Short-circuit if we have no futures to iterate over
347-
if this.futures.is_empty() {
303+
if self.futures.is_empty() {
348304
return Poll::Ready(None);
349305
}
350306

351307
// Set the top-level waker and check readiness
352-
let mut readiness = this.wakers.readiness();
308+
let mut readiness = self.wakers.readiness();
353309
readiness.set_waker(cx.waker());
354310
if !readiness.any_ready() {
355311
// Nothing is ready yet
@@ -358,24 +314,21 @@ impl<F: Future> FutureGroup<F> {
358314

359315
// Setup our futures state
360316
let mut ret = Poll::Pending;
361-
let states = this.states;
362-
363-
// SAFETY: We unpin the future group so we can later individually access
364-
// single futures. Either to read from them or to drop them.
365-
let futures = unsafe { this.futures.as_mut().get_unchecked_mut() };
317+
let states = &mut self.states;
318+
let futures = &mut self.futures;
366319

367-
for index in this.keys.iter().cloned() {
320+
for index in self.keys.iter().cloned() {
368321
if states[index].is_pending() && readiness.clear_ready(index) {
369322
// unlock readiness so we don't deadlock when polling
370323
#[allow(clippy::drop_non_drop)]
371324
drop(readiness);
372325

373326
// Obtain the intermediate waker.
374-
let mut cx = Context::from_waker(this.wakers.get(index).unwrap());
327+
let mut cx = Context::from_waker(self.wakers.get(index).unwrap());
375328

376-
// SAFETY: this future here is a projection from the futures
329+
// SAFETY: self future here is a projection from the futures
377330
// vec, which we're reading from.
378-
let future = unsafe { Pin::new_unchecked(&mut futures[index]) };
331+
let future = futures.get_mut(index).expect("index ready but not init?");
379332
match future.poll(&mut cx) {
380333
Poll::Ready(item) => {
381334
// Set the return type for the function
@@ -384,7 +337,7 @@ impl<F: Future> FutureGroup<F> {
384337
// Remove all associated data with the future
385338
// The only data we can't remove directly is the key entry.
386339
states[index] = PollState::None;
387-
futures.remove(index);
340+
futures.remove_in_place(index);
388341

389342
break;
390343
}
@@ -393,14 +346,14 @@ impl<F: Future> FutureGroup<F> {
393346
};
394347

395348
// Lock readiness so we can use it again
396-
readiness = this.wakers.readiness();
349+
readiness = self.wakers.readiness();
397350
}
398351
}
399352

400-
// Now that we're no longer borrowing `this.keys` we can remove
353+
// Now that we're no longer borrowing `self.keys` we can remove
401354
// the current key from the set
402355
if let Poll::Ready(Some((key, _))) = ret {
403-
this.keys.remove(&key.0);
356+
self.keys.remove(&key.0);
404357
}
405358

406359
ret
@@ -410,7 +363,7 @@ impl<F: Future> FutureGroup<F> {
410363
impl<F: Future> Stream for FutureGroup<F> {
411364
type Item = <F as Future>::Output;
412365

413-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
366+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
414367
match self.poll_next_inner(cx) {
415368
Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)),
416369
Poll::Ready(None) => Poll::Ready(None),

0 commit comments

Comments
 (0)