Skip to content

A restrospection on interoperatabiliy with futures crate #15

Open
@jerry73204

Description

@jerry73204

On Restrospection

I've been looking for parallelism combined with async/await and thanks for your decent work.
I'd like to share to retrospection on pulling this crate in my existing project.

My project is filled with stream combinators from futures' StreamExt and async_std's StreamExt. They are extension traits of futures' Stream. In this way, any streams with Stream trait equip with extended combinators automatically. It's great convenience when you're writing your own stream type.

On the contrary, the parallel-stream's ParallelStream is alien to above extension traits. It's a standalone trait with a family of implemented types. That is, when a stream is turned into a ParallelStream, it loses all combinators from those extension traits. Also, things get more complex when writing your own stream type. I think the limit() of ParallelStream is the root of evil.

I noticed aggressive trait bounds that is hard to satisfy. For example, the map method requires the f to have Send, Sync and Copy. Only few and very special types have both Sync and Copy. It makes map useless because it restricts the closure cannot have a local variable lacking one of the traits.

fn map<F, T, Fut>(self, f: F) -> Map<T> where
    F: FnMut(Self::Item) -> Fut + Send + Sync + Copy + 'static,
    T: Send + 'static,
    Fut: Future<Output = T> + Send, 

On Alternative Design

I gather above thoughts and attempted an alternative based on your work, and it comes the par-stream. Basically it provides an extension trait ParStreamExt to futures's Stream and solve the trait bound issue. The limit is given on demand. It sets the # of workers only for that stage. It's not 100% equal to your design because the limit only applies to one stage rather than a group of stages. Instead, I moved your design to another particular API.

let shared = Arc::new(AtomicUsize::new(0));
stream.par_then(None, |item| {  // None sets the limit to the number of cores.
    let shared = shared.clone();   // Clone a variable without `Copy` trait
    async move {
        let new_item = compute(item, shared);
        new_item
    }
})
.collect::<Vec<_>>();  // from futures' StreamExt

To the limit the workers of a group of combinators, I suggest the builder patten. So far, it's implemented yet in my crate, but we can see how it would become here.

stream
    .enumerate()
    /* start of group */
    .into_par_group(ParGroupConfig {  // turn to a parallel group builder
        limit: Some(4),
        ..Default::default()  // using default runtime and other default options, etc
    })
    .then(|item| { async move { /* omit */ } })  // first stage
    .filter_map(|item| { async move { /* omit */ } }) // second stage
    .build_stream()  // build a stream from the group builder
    /* end of group */
    . collect::<Vec<_>>();  // combinator from futures' StreamExt

In this way, it lets users to customize the whole parallel group in one config. It would work seamlessly with existing futures' combinators.

Here we may move to more thorough discussion to help the design to evolve. It's fine for me to look more carefully to my work and find a way to combine them.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions