Skip to content

API proposal: Add IProducerConsumerCollection to System.Threading.Channels.CreateUnbounded(...) #32700

Open
@zcsizmadia

Description

@zcsizmadia

Reason

Currently unbounded channels use a ConcurrentQueue to hold the queued items. This concurrent queue is created in the internal UnboundedChannel object and cannot be changed. Since the Channel reader and writer architecture is a producer/consumer pattern, it would be great if the object holding the queued items could be provided to the channel creation, using the IProducerConsumerCollection interface. If no such interface is specified the default ConcurrentQueue is created and used by the channel.

This addition would reflect the nature of the Channel architecture.

Use case

Use a priority queue instead of a FIFO.

API Proposal

namespace System.Threading.Channels
{
    public static class Channel
    {
        // existing
        public static Channel<T> CreateUnbounded<T>();
        public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);

        // new
        public static Channel<T> CreateUnbounded<T>(IProducerConsumerCollection<T> collection);
        public static Channel<T> CreateUnbounded<T>(IProducerConsumerCollection<T> collection, UnboundedChannelOptions options);
}

Implementation

The internal UnboundedChannel object is already using a ConcurrentQueue object to hold the queued items, and the used methods are only TryDequeue, Enqueue and Count.
The _items object must be changed to IProducerConsumerCollection and the Dequeue and Enqueue must be changed to TryTake and TryAdd.

Diff/PR

internal sealed class UnboundedChannel<T> : Channel<T>, IDebugEnumerable<T>
{
    private readonly IProducerConsumerCollection<T> _items;
        
    internal UnboundedChannel(bool runContinuationsAsynchronously)
            : this(runContinuationsAsynchronously, new ConcurrentQueue<T>())
    {
    }

    internal UnboundedChannel(bool runContinuationsAsynchronously, IProducerConsumerCollection<T> queue)
    {
        _runContinuationsAsynchronously = runContinuationsAsynchronously;
        _completion = new TaskCompletionSource<VoidResult>(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
        _items = queue ?? throw new ArgumentNullException(nameof(queue));
        Reader = new UnboundedChannelReader(this);
        Writer = new UnboundedChannelWriter(this);
    }

    // Example changes:

    //if (parent._items.TryDequeue(out item)) ->
    //if (parent._items.TryTake(out item))

    //parent._items.Enqueue(item); ->
    //if (!parent._items.TryAdd(item))
    //{
    //    return false;
    //}
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions