Description
Background and motivation
Today bounded channels let the channel creator set a maximum capacity for the queue inside of the channel. This critical for making sure that backpressure can be applied to the consumer once the bounds have been hit (it can affects the reliability and performance of system using channels). When the consumer hits the capacity, the channel can be configured to behave in different ways by configuring the BoundedChannelFullMode. When this mode is set to wait, if the writer is using WaitToWriteAsync
or WriteAsync
, the method call will return an incomplete task until the reader consumes the one element that keeps it under the capacity. This normally isn't an issue but when building systems that want to react to this back pressure with a more heavy weight process, flipping from no wait to wait start on every single write operation could be costly.
The idea is to add a ResumeCapacity
that applies when BoundedChannelFullMode.Wait
is set that wouldn't resume the writer until that capacity is reached. By default it would be set to Capacity - 1, but somebody could set it to any value >= 0 and < Capacity. This would make it so that the writer could only start writing again after the queue size reached ResumeCapacity
after it reach the original Capacity.
Today this is somewhat possible by polling the Count on the channel reader but it has a bunch of problems:
- The writer might not have access to the reader
- The writer doesn't want to think about bounds in wait mode, just what to do with the item when TryWrite returns false (or just have WriteAsync do the right thing).
API Proposal
namespace System.Threading.Channels;
public class BoundedChannelOptions
{
public int? DrainCapacity { get; set; }
}
API Usage
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10) {
DrainCapacity = 5
});
async Task WriteNumbersAsync()
{
for (var i = 0; ; i++)
{
// This will write 10 numbers, then wait until 5 of them has been consumed, then will write 5 more
await channel.Writer.WriteAsync(i);
}
}
_ = WriteNumbersAsync();
await foreach (var value in channel.Reader.ReadAllAsync())
{
Console.WriteLine(value);
// The reader is a little slower than the writer
await Task.Delay(1000);
}
Alternative Designs
Instead of a setting, it could be done at the callsite with an overload of WaitToWriteAsync.
namespace System.Threading.Channels;
public abstract class ChannelWriter
{
public virtual ValueTask<bool> WaitToWriteAsync(int numberOfItems);
}
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10));
async Task WriteNumbersAsync()
{
for (var i = 0; ; i++)
{
// This will write 10 numbers, then wait until 5 of them has been consumed, then will write 5 more
await WriteNumberAsync(i);
}
async Task WriteNumberAsync(int n)
{
if (!channel.TryWrite(i))
{
// We hit back pressure, now poll until the reader consumes enough...
await channel.Writer.WaitToWriteAsync(5);
channel.TryWrite(i); // Now we should be able to write
}
}
}
_ = WriteNumbersAsync();
await foreach (var value in channel.Reader.ReadAllAsync())
{
Console.WriteLine(value);
// The reader is a little slower than the writer
await Task.Delay(1000);
}
Risks
No response