Skip to content

Commit 468538c

Browse files
authored
Merge pull request #219 from matheus-consoli/look-ma-no-deps
use `FutureGroup` for concurrent streams
2 parents 29cd385 + 7c7b935 commit 468538c

4 files changed

Lines changed: 16 additions & 18 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,14 @@ harness = false
2929
[features]
3030
default = ["std"]
3131
std = ["alloc", "futures-lite/std"]
32-
alloc = ["dep:fixedbitset", "dep:smallvec", "dep:futures-buffered", "futures-lite/alloc"]
32+
alloc = ["dep:fixedbitset", "dep:smallvec", "futures-lite/alloc"]
3333

3434
[dependencies]
3535
fixedbitset = { version = "0.5.7", default-features = false, optional = true }
3636
futures-core = { version = "0.3", default-features = false }
3737
futures-lite = { version = "2.5.0", default-features = false }
3838
pin-project = "1.1"
3939
smallvec = { version = "1.13", optional = true }
40-
futures-buffered = { version = "0.2.9", optional = true }
4140

4241
[dev-dependencies]
4342
async-io = "2.4"

src/concurrent_stream/for_each.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{Consumer, ConsumerState};
2-
use futures_buffered::FuturesUnordered;
2+
use crate::future::FutureGroup;
33
use futures_lite::StreamExt;
44
use pin_project::pin_project;
55

@@ -22,7 +22,7 @@ where
2222
// NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential
2323
count: Arc<AtomicUsize>,
2424
#[pin]
25-
group: FuturesUnordered<ForEachFut<F, FutT, T, FutB>>,
25+
group: FutureGroup<ForEachFut<F, FutT, T, FutB>>,
2626
limit: usize,
2727
f: F,
2828
_phantom: PhantomData<(T, FutB)>,
@@ -44,7 +44,7 @@ where
4444
f,
4545
_phantom: PhantomData,
4646
count: Arc::new(AtomicUsize::new(0)),
47-
group: FuturesUnordered::new(),
47+
group: FutureGroup::new(),
4848
}
4949
}
5050
}
@@ -69,7 +69,7 @@ where
6969
// Space was available! - insert the item for posterity
7070
this.count.fetch_add(1, Ordering::Relaxed);
7171
let fut = ForEachFut::new(this.f.clone(), future, this.count.clone());
72-
this.group.as_mut().push(fut);
72+
this.group.as_mut().insert_pinned(fut);
7373

7474
ConsumerState::Continue
7575
}

src/concurrent_stream/from_concurrent_stream.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream};
2+
use crate::future::FutureGroup;
23
#[cfg(all(feature = "alloc", not(feature = "std")))]
34
use alloc::vec::Vec;
45
use core::future::Future;
56
use core::pin::Pin;
6-
use futures_buffered::FuturesUnordered;
77
use futures_lite::StreamExt;
88
use pin_project::pin_project;
99

@@ -44,14 +44,14 @@ impl<T, E> FromConcurrentStream<Result<T, E>> for Result<Vec<T>, E> {
4444
#[pin_project]
4545
pub(crate) struct VecConsumer<'a, Fut: Future> {
4646
#[pin]
47-
group: FuturesUnordered<Fut>,
47+
group: FutureGroup<Fut>,
4848
output: &'a mut Vec<Fut::Output>,
4949
}
5050

5151
impl<'a, Fut: Future> VecConsumer<'a, Fut> {
5252
pub(crate) fn new(output: &'a mut Vec<Fut::Output>) -> Self {
5353
Self {
54-
group: FuturesUnordered::new(),
54+
group: FutureGroup::new(),
5555
output,
5656
}
5757
}
@@ -66,7 +66,7 @@ where
6666
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
6767
let mut this = self.project();
6868
// unbounded concurrency, so we just goooo
69-
this.group.as_mut().push(future);
69+
this.group.as_mut().insert_pinned(future);
7070
ConsumerState::Continue
7171
}
7272

@@ -88,14 +88,14 @@ where
8888
#[pin_project]
8989
pub(crate) struct ResultVecConsumer<'a, Fut: Future, T, E> {
9090
#[pin]
91-
group: FuturesUnordered<Fut>,
91+
group: FutureGroup<Fut>,
9292
output: &'a mut Result<Vec<T>, E>,
9393
}
9494

9595
impl<'a, Fut: Future, T, E> ResultVecConsumer<'a, Fut, T, E> {
9696
pub(crate) fn new(output: &'a mut Result<Vec<T>, E>) -> Self {
9797
Self {
98-
group: FuturesUnordered::new(),
98+
group: FutureGroup::new(),
9999
output,
100100
}
101101
}
@@ -110,7 +110,7 @@ where
110110
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
111111
let mut this = self.project();
112112
// unbounded concurrency, so we just goooo
113-
this.group.as_mut().push(future);
113+
this.group.as_mut().insert_pinned(future);
114114
ConsumerState::Continue
115115
}
116116

src/concurrent_stream/try_for_each.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::concurrent_stream::ConsumerState;
2+
use crate::future::FutureGroup;
23
use crate::private::Try;
3-
use futures_buffered::FuturesUnordered;
44
use futures_lite::StreamExt;
55
use pin_project::pin_project;
66

@@ -24,9 +24,8 @@ where
2424
{
2525
// NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential
2626
count: Arc<AtomicUsize>,
27-
// TODO: remove the `Pin<Box>` from this signature by requiring this struct is pinned
2827
#[pin]
29-
group: FuturesUnordered<TryForEachFut<F, FutT, T, FutB, B>>,
28+
group: FutureGroup<TryForEachFut<F, FutT, T, FutB, B>>,
3029
limit: usize,
3130
residual: Option<B::Residual>,
3231
f: F,
@@ -50,7 +49,7 @@ where
5049
f,
5150
residual: None,
5251
count: Arc::new(AtomicUsize::new(0)),
53-
group: FuturesUnordered::new(),
52+
group: FutureGroup::new(),
5453
_phantom: PhantomData,
5554
}
5655
}
@@ -93,7 +92,7 @@ where
9392
// Space was available! - insert the item for posterity
9493
this.count.fetch_add(1, Ordering::Relaxed);
9594
let fut = TryForEachFut::new(this.f.clone(), future, this.count.clone());
96-
this.group.as_mut().push(fut);
95+
this.group.as_mut().insert_pinned(fut);
9796
ConsumerState::Continue
9897
}
9998

0 commit comments

Comments
 (0)