Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9f63f68
Introduces OutgoingFrameMemory
PauloHMattos Feb 27, 2026
c8f89fd
Refactor OutgoingFrameMemory for zero-copy AMQP payload streaming
PauloHMattos Feb 28, 2026
27a98ca
Add BasicPublishAsync overloads that accept IMemoryOwner<byte> as the…
PauloHMattos Feb 28, 2026
d676907
Trailing white space
danielmarbach Mar 5, 2026
0747252
Remove redundant readonly
danielmarbach Mar 5, 2026
1440581
Rename `OutgoingFrameMemory` to `OutgoingFrame` across the codebase f…
danielmarbach Mar 5, 2026
d83d2b5
Refactor `BasicPublishAsync` methods to reduce duplication by introdu…
danielmarbach Mar 5, 2026
823f20e
Fix nullable annotations in BasicPublishCoreAsync
PauloHMattos Mar 7, 2026
e03c242
Expand XML docs for IChannelExtensions
lukebakken Mar 8, 2026
bec88e9
Update projects/RabbitMQ.Client/IChannelExtensions.cs
lukebakken Mar 9, 2026
2c176d2
Update projects/RabbitMQ.Client/IChannel.cs
lukebakken Mar 9, 2026
eb18928
Dispose `IMemoryOwner<byte>` body before throwing when channel is closed
lukebakken Mar 10, 2026
c0c533c
Fix style issues in `OutgoingFrame.Dispose` and XML docs in `IChannel…
lukebakken Mar 10, 2026
b36069a
Restore single-allocation path for `ReadOnlyMemory<byte>` in `Seriali…
lukebakken Mar 10, 2026
b70b1e7
Refactor PR to the ReadOnlyMemory + IDisposable shape
PauloHMattos Mar 15, 2026
03b5957
Fix `bodyOwner` parameter to be nullable (`IDisposable?`) in public API
lukebakken Mar 20, 2026
bc7d329
Dispose `_bodyOwner` independently in `OutgoingFrame.Dispose`
lukebakken Mar 20, 2026
1587d15
Fix minor issues found in final review
lukebakken Mar 20, 2026
f793d0e
More test scenarios
danielmarbach Mar 31, 2026
51b769a
Address proper frame disposal
danielmarbach Mar 31, 2026
0f0e5eb
Track transfer of owner
danielmarbach Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class MethodFramingBasicAck
public ushort Channel { get; set; }

[Benchmark]
internal RentedMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
internal OutgoingFrame BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
}

[Config(typeof(Config))]
Expand All @@ -71,13 +71,13 @@ public class MethodFramingBasicPublish
public int FrameMax { get; set; }

[Benchmark]
internal RentedMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
internal OutgoingFrame BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, bodyOwner: null, Channel, FrameMax);

[Benchmark]
internal RentedMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
internal OutgoingFrame BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, bodyOwner: null, Channel, FrameMax);

[Benchmark]
internal RentedMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
internal OutgoingFrame BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, bodyOwner: null, Channel, FrameMax);
}

[Config(typeof(Config))]
Expand All @@ -90,6 +90,6 @@ public class MethodFramingChannelClose
public ushort Channel { get; set; }

[Benchmark]
internal RentedMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
internal OutgoingFrame ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
}
}
39 changes: 39 additions & 0 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -211,6 +212,25 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Asynchronously publishes a message.
/// </summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="bodyOwner">An optional <see cref="IDisposable"/> instance responsible for releasing or returning the memory used by the message body once publication is complete. Pass <c>null</c> if the body memory does not require disposal.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body, IDisposable? bodyOwner,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Asynchronously publishes a message.
/// </summary>
Expand All @@ -229,6 +249,25 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Asynchronously publishes a message.
/// </summary>
/// <param name="exchange">The exchange.</param>
/// <param name="routingKey">The routing key.</param>
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
/// <param name="basicProperties">The message properties.</param>
/// <param name="body">The message body.</param>
/// <param name="bodyOwner">An optional <see cref="IDisposable"/> instance responsible for releasing or returning the memory used by the message body once publication is complete. Pass <c>null</c> if the body memory does not require disposal.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body, IDisposable? bodyOwner,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Configures QoS parameters of the Basic content-class.
/// </summary>
Expand Down
Loading
Loading