Skip to content

Enable throttling object store reads #1049

@sgentry

Description

@sgentry

Proposed change

Feature request is originating from Slack channel discussion.

Add object store throttling options to control speed of reads from NATS when calling store.GetAsync. This would be to avoid slow consumer events when reads are faster than uploads described below. Possible options would be chunkSize, maxBytesPerSecond and delay.

Use case

For my use-case, we want to stream large files from object store directly to an HTTP endpoint. We know our reads from NATS server will be much faster than uploading to HTTP endpoint.

Contribution

This is a very dirty attempt for our use-case to throttle reads for HTTP uploads and was just for testing what might be possible. Just including for reference from Slack discussion.

public class NatsObjectStoreStreamContent : HttpContent
{
    private readonly INatsObjStore _objectStore;
    private readonly string _objectName;
    private readonly long _contentLength;
    private readonly ILogger? _logger;
    private readonly int _chunkSize;
    private readonly int _maxBytesPerSecond;

    public NatsObjectStoreStreamContent(
        INatsObjStore objectStore,
        string objectName,
        long contentLength,
        ILogger? logger = null,
        int chunkSize = 1024 * 1024, // 1MB chunks
        int maxBytesPerSecond = 50 * 1024 * 1024) // 50 MB/s default throttle
    {
        _objectStore = objectStore ??  throw new ArgumentNullException(nameof(objectStore));
        _objectName = objectName ??  throw new ArgumentNullException(nameof(objectName));
        _contentLength = contentLength;
        _logger = logger;
        _chunkSize = chunkSize;
        _maxBytesPerSecond = maxBytesPerSecond;

        Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
        Headers.ContentLength = contentLength;
    }

    protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
    {
        _logger?.LogDebug(
            "Starting throttled stream of '{ObjectName}' ({Size: N0} bytes, max {Speed} MB/s)",
            _objectName, _contentLength, _maxBytesPerSecond / 1024.0 / 1024.0);

        long totalBytesWritten = 0;
        var stopwatch = Stopwatch.StartNew();
        var lastLogTime = DateTime.UtcNow;

        try
        {
            // Create throttled wrapper stream
            await using var throttledStream = new ThrottledWriteStream(
                stream,
                _chunkSize,
                _maxBytesPerSecond,
                _logger);

            // NATS writes to throttled stream
            var result = await _objectStore.GetAsync(_objectName, throttledStream, leaveOpen: true);
           
            if (result == null)
            {
                throw new FileNotFoundException($"Object '{_objectName}' not found");
            }

            await throttledStream. FlushAsync();

            totalBytesWritten = throttledStream.TotalBytesWritten;
            var elapsed = stopwatch.Elapsed;
            var actualMbps = (totalBytesWritten / 1024.0 / 1024.0) / elapsed.TotalSeconds;

            _logger?.LogInformation(
                "Completed streaming {Bytes: N0} bytes in {Elapsed:F2}s ({Speed:F2} MB/s)",
                totalBytesWritten, elapsed. TotalSeconds, actualMbps);
        }
        catch (Exception ex)
        {
            _logger?.LogError(ex,
                "Error while streaming object '{ObjectName}' after {Bytes:N0} bytes",
                _objectName, totalBytesWritten);
            throw;
        }
    }

    protected override bool TryComputeLength(out long length)
    {
        length = _contentLength;
        return true;
    }
}

/// <summary>
/// Stream wrapper that throttles write speed to prevent overwhelming downstream consumer
/// </summary>
public class ThrottledWriteStream :  Stream
{
    private readonly Stream _innerStream;
    private readonly int _chunkSize;
    private readonly int _maxBytesPerSecond;
    private readonly ILogger? _logger;
    private readonly Stopwatch _stopwatch;
   
    private long _totalBytesWritten;
    private DateTime _lastLogTime;
    private readonly object _lock = new object();

    public long TotalBytesWritten => _totalBytesWritten;

    public ThrottledWriteStream(
        Stream innerStream,
        int chunkSize,
        int maxBytesPerSecond,
        ILogger? logger = null)
    {
        _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
        _chunkSize = chunkSize;
        _maxBytesPerSecond = maxBytesPerSecond;
        _logger = logger;
        _stopwatch = Stopwatch.StartNew();
        _lastLogTime = DateTime.UtcNow;
    }

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override long Length => _innerStream.Length;
    public override long Position
    {
        get => _innerStream.Position;
        set => _innerStream.Position = value;
    }

    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
    {
        // Calculate expected time based on throttle rate
        var expectedElapsedMs = (_totalBytesWritten * 1000.0) / _maxBytesPerSecond;
        var actualElapsedMs = _stopwatch.Elapsed.TotalMilliseconds;
       
        // If we're going too fast, delay
        if (actualElapsedMs < expectedElapsedMs)
        {
            var delayMs = (int)(expectedElapsedMs - actualElapsedMs);
            if (delayMs > 0)
            {
                _logger?.LogTrace("Throttling:  delaying {Delay}ms", delayMs);
                await Task.Delay(delayMs, cancellationToken);
            }
        }

        // Write to underlying stream
        await _innerStream. WriteAsync(buffer, cancellationToken);
        await _innerStream.FlushAsync(cancellationToken);

        _totalBytesWritten += buffer.Length;

        // Log progress every 5 seconds
        if (DateTime. UtcNow - _lastLogTime > TimeSpan.FromSeconds(5))
        {
            var currentMbps = (_totalBytesWritten / 1024.0 / 1024.0) / _stopwatch.Elapsed.TotalSeconds;
            _logger?.LogDebug(
                "Progress: {Bytes:N0} bytes ({MB: F2} MB) - {Speed:F2} MB/s",
                _totalBytesWritten, _totalBytesWritten / 1024.0 / 1024.0, currentMbps);
            _lastLogTime = DateTime.UtcNow;
        }
    }

    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        await WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
    }

    public override void Flush() => _innerStream.Flush();
    public override Task FlushAsync(CancellationToken cancellationToken) => _innerStream.FlushAsync(cancellationToken);
    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
}
``

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions