Add BasicPublishAsync overloads that accept IMemoryOwner<byte> as the body#1913
Add BasicPublishAsync overloads that accept IMemoryOwner<byte> as the body#1913PauloHMattos wants to merge 21 commits intorabbitmq:mainfrom
Conversation
7b09635 to
4035394
Compare
|
I think this is very clean and superior to the other PR. These are some of my thoughts / opinions (I'm aware some of them are highly subjective ;) )
The current branch gives callers who want zero-copy a clean opt-in path with explicit ownership semantics, without touching the default behavior or adding runtime configuration. I have pushed a few small tweaks here main...danielmarbach:rabbitmq-dotnet-client:publish-imemoryowner |
|
I have also quickly done a PoC showing ROS support danielmarbach@c5c9690. The branch is on top of the adjustment branch and is here main...danielmarbach:rabbitmq-dotnet-client:publish-sequence. I think we could do this as a follow up |
I was thinking about that. Also, for subscription. There's no need to materialize everything sequential memory, it that still has to be parsed and might not even be needed. A |
The IMemoryOwner overload gives you that. You can have a wrapper that is aware of the memory you pooled and then you get notified |
|
I will wait for @lukebakken to also weigh in, but it seems that IMemoryOwner is the preferred approach. I will close the other PR when we lock in on this approach.
Thanks. Do you want to close this and open a PR with your tweaks, or should I pull them into my branch?
What I don't like about this implementation is that if the ROS is an actual sequence, we make a copy for the full size of the ROS. That seems counterproductive because the developer using the library did the work of assembling a sequence to avoid large buffers and LOH allocations, only to have the client allocate a large contiguous buffer anyway. I think if we go down this route, we should at least also use a sequence of IMemoryOwner buffers to avoid this large contiguous buffer. |
Feel free to cherry pick then we can keep this PR alive and the discussion associated with it |
Yeah I did not properly finish this. We would have to write the individual segments internally and not allocate a large buffer. I just wanted to explore how the API shape looks like and not yet spend more time on it |
Because there's no out-of-the-box slicing of Since there's no out-of-the-box implementation of |
|
@PauloHMattos Something like this 863508c ( probably needs some more polishing and thinking). I'm quite time-constrained, but it shows your current OutgoingFrame design would be extendable. |
|
@paulomorgado You meant something like this, right? |
|
@danielmarbach, something like that. But I think it's being overlooked here. If it's a single segment, it won't be disposed. Having to pass a By the way, do you know any library with a general-purpose implementation of |
Yes, that's what I had in mind with the OutgoingFrame. During the next weekend I think I will have a lot of time to work on this, but I think we should leave the ROS for a follow up PR, so a I will focus on finishing leaving this PR ready for review. Once it's is merged I can finish what you started on the ROS implementation |
I considered 3 ways when working on this:
None felt particularly elegant, so I went with the first one for the simple reason that it was what I implemented first 😄. Either way, I don't have a strong opinion about it and I'm happy with whichever option we choose. |
My understanding is that memory owner is the canonical way of transferring owner ship, see rule 7 and 8 in https://github.com/dotnet/docs/blob/main/docs/standard/memory-and-spans/memory-t-usage-guidelines.md#rule-7-if-you-have-an-imemoryownert-reference-you-must-at-some-point-dispose-of-it-or-transfer-its-ownership-but-not-both but it could also be that I'm misreading the guidance. |
Yes it is not bullet proof yet. My idea was to only explore the approach a bit not make it done done |
Not a criticism. Please, carry on! |
My personal preference, from experience, is:
|
|
I think the custom wrapper type does more clearly express the intent. Going to do a quick spike then we can have a look at it |
|
Here we go danielmarbach@a73b7e6 |
@PauloHMattos to be honest for low-level .NET stuff I defer to @danielmarbach and @paulomorgado, among other regular contributors. I'll, of course, review this PR, but if those two like this approach better that's a big 👍 for me as well 😸 |
|
I've cherry-picked all of @danielmarbach changes except for danielmarbach@a73b7e6. @paulomorgado, do you have any feedback/opinion regarding the IReadOnlyMemoryOwner approach? I personally don't like that the user would have to allocate a new object just to slice the memory. |
I went down this path mentally and concluded it would require carrying it through into the underlying transport. I think it is quite involved at first sight, but I did not spend a lot of time thinking this through. Curious to see your assessment! |
|
Don't get me wrong I do understand the overhead of a single class allocation. But I do wonder if this in the specific case is really such a big deal because at the end of the day you anyway need some sort of scoping mechanism to essentially pool stuff and then release the pool stuff again. So the question is doesn't for example an abstraction like the read-only memory owner give a more concise sort of overloading structure on the methods that would also be a good example we can follow when we introduce the read-only sequence support. |
|
I will look at @paulomorgado feedback and make the fixes next weekend. Thanks |
|
@PauloHMattos I tried to address those. We probably also need to resolve the conflicts on the API changes. I might be able to do this later |
This commit eliminates large contiguous buffer allocations and redundant payload copying during AMQP frame serialization by fully leveraging System.IO.Pipelines.
…or improved clarity.
…cing `BasicPublishCoreAsync`.
Co-authored-by: Paulo Morgado <470455+paulomorgado@users.noreply.github.com>
Co-authored-by: Paulo Morgado <470455+paulomorgado@users.noreply.github.com>
When `TransmitAsync` is called with an `IMemoryOwner<byte>` body on a closed channel, ownership has already been transferred to the callee. Throwing `AlreadyClosedException` without first disposing `body` leaks the memory owner.
…Extensions` In `OutgoingFrame.Dispose`, remove redundant `_methodAndHeader = default` assignment, use `is not null` instead of `!= null`, and use `default` instead of `null` for `_body` assignment. In `IChannelExtensions`, fix remarks ordering on the `PublicationAddress` overload to be consistent with all other overloads, and add missing periods after "BasicProperties" in eight remarks blocks.
…zeToFrames` The PR regressed the `ReadOnlyMemory<byte>` overload of `SerializeToFrames` from one allocation to two by copying the body into a `MemoryPool` buffer and delegating to the `IMemoryOwner<byte>` overload. Restore the original single-allocation approach: pack method, header, and body frames into one buffer. The `IMemoryOwner<byte>` overload retains its split method/header + body approach for zero-copy publishing. Also improve variable naming in the `ReadOnlyMemory<byte>` overload: split `remainingBodyBytes` into `bodyLength` (body size) and `remainingBodyBytes` (loop counter), and rename `frameSize` to `payloadSize` to match the naming used in `OutgoingFrame.WriteTo`. Add explanatory comments to both overloads to make the copy-vs-zero-copy distinction explicit.
The `ReadOnlyMemory<byte> + IDisposable?` shape was chosen specifically because `IDisposable?` is nullable, allowing callers without pooled memory to pass `null`. The refactor accidentally used non-nullable `IDisposable`, contradicting the intent of the API shape decision and the examples used to justify it.
`_bodyOwner` was only disposed inside the `_methodAndHeader is not null` block, making its disposal contingent on the method/header buffer state. These are logically independent resources and should be disposed separately.
- Use `default` instead of `null` for `_body` in `OutgoingFrame` no-body constructor (`ReadOnlyMemory<byte>` is a struct) - Remove extraneous blank line in `SessionBase.TransmitAsync` - Clarify `bodyOwner` XML docs to mention the parameter is optional and that `null` should be passed when no disposal is needed
de58422 to
0f0e5eb
Compare
|
@PauloHMattos I force pushed after a rebase |
| public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader header, ReadOnlyMemory<byte> body, IDisposable? bodyOwner, CancellationToken cancellationToken = default) | ||
| where TMethod : struct, IOutgoingAmqpMethod | ||
| where THeader : IAmqpHeader | ||
| { | ||
| if (!IsOpen && cmd.ProtocolCommandId != ProtocolCommandId.ChannelCloseOk) | ||
| { | ||
| bodyOwner?.Dispose(); | ||
| ThrowAlreadyClosedException(); | ||
| } | ||
|
|
||
| RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(in cmd), ref Unsafe.AsRef(in header), body, ChannelNumber, Connection.MaxPayloadSize); | ||
| OutgoingFrame bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(in cmd), ref Unsafe.AsRef(in header), body, bodyOwner, ChannelNumber, Connection.MaxPayloadSize); | ||
| RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); | ||
| return Connection.WriteAsync(bytes, cancellationToken); | ||
| } |
There was a problem hiding this comment.
Isn't there the chance for leaking the bodyOwner on error?
public ValueTask TransmitAsync<TMethod, THeader>(..., IDisposable? bodyOwner, ...)
{
if (!IsOpen && cmd.ProtocolCommandId != ProtocolCommandId.ChannelCloseOk)
{
bodyOwner?.Dispose();
ThrowAlreadyClosedException();
}
OutgoingFrame bytes = default;
try
{
bytes = Framing.SerializeToFrames(..., bodyOwner, ...);
RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size);
return Connection.WriteAsync(bytes, cancellationToken);
}
catch
{
// If SerializeToFrames failed: bytes is default → Dispose() is no-op,
// but bodyOwner was never captured → dispose it directly.
// If SerializeToFrames succeeded: bytes holds bodyOwner → Dispose() covers both.
bytes.Dispose();
if (bytes.Size == 0)
bodyOwner?.Dispose();
throw;
}
}There was a problem hiding this comment.
@paulomorgado That would only capture synchronous exceptions. Returning a task or value task and expecting a try / catch to fire is a recipe for disaster. Let me have a closer look
There was a problem hiding this comment.
I don't think we have a problem here. The real guard against bodyOwner leaking from the BasicPublishCoreAsync to TransmitAsync path is already the bodyOwnerTransferred fix we put in BasicPublishCoreAsync.
For TransmitAsync itself: once SerializeToFrames succeeds, OutgoingFrame owns bodyOwner, and SocketFrameHandler is responsible for disposing of it in all paths (already fixed). The only true gap was PopulateMessageEnvelopeSize throwing after SerializeToFrames but that's a Activity tagging call with no realistic throw surface, and patching it with an incorrect try/catch or unnecessary introducing awaits here seems worse than leaving it
There was a problem hiding this comment.
I'm starting to think I analyzed this wrong.
Maybe it should be as simple as:
public ValueTask TransmitAsync<TMethod, THeader>(..., IDisposable? bodyOwner, ...)
{
try
{
// ...
}
finally
{
bodyOwner?.Dispose();
}
}But that would require the method to be async.
Or not, with some extra work and creating an async local method if bodyOwner is not null and a sync one if it is.
There was a problem hiding this comment.
Asked Copilot and this seems to be the recommendation:
Problem
TransmitAsync has no protection around the synchronous code between the !IsOpen guard and Connection.WriteAsync. If SerializeToFrames or PopulateMessageEnvelopeSize throws, bodyOwner (and possibly the rented method+header memory) leaks:
// Current code — no try/catch around these calls
OutgoingFrame bytes = Framing.SerializeToFrames(..., body, bodyOwner, ...); // can throw (OOM)
RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); // can throw
return Connection.WriteAsync(bytes, cancellationToken); // async faults handled by write pipeline ✅Meanwhile, BasicPublishCoreAsync has already set bodyOwnerTransferred = true before calling ModelSendAsync, so its outer finally won't clean up either.
Failure analysis
| Failure point | Who disposes? |
|---|---|
!IsOpen early exit |
bodyOwner?.Dispose() before throw ✅ |
SerializeToFrames throws (e.g., OOM) |
Nobody ❌ — frame never created, bodyOwner not captured |
PopulateMessageEnvelopeSize throws |
Nobody ❌ — entire OutgoingFrame (including bodyOwner) leaks |
Connection.WriteAsync throws synchronously |
Nobody ❌ — same as above |
Connection.WriteAsync returns faulted ValueTask |
WriteAsyncCore catch ✅ |
WriteLoopAsync crashes |
WriteLoopAsync finally drain ✅ |
Recommended fix
public ValueTask TransmitAsync<TMethod, THeader>(
in TMethod cmd, in THeader header,
ReadOnlyMemory<byte> body, IDisposable? bodyOwner,
CancellationToken cancellationToken = default)
where TMethod : struct, IOutgoingAmqpMethod
where THeader : IAmqpHeader
{
if (!IsOpen && cmd.ProtocolCommandId != ProtocolCommandId.ChannelCloseOk)
{
bodyOwner?.Dispose();
ThrowAlreadyClosedException();
}
OutgoingFrame bytes = default;
try
{
bytes = Framing.SerializeToFrames(
ref Unsafe.AsRef(in cmd), ref Unsafe.AsRef(in header),
body, bodyOwner, ChannelNumber, Connection.MaxPayloadSize);
RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size);
return Connection.WriteAsync(bytes, cancellationToken);
}
catch
{
// If SerializeToFrames succeeded (Size > 0), bytes owns both
// the rented method+header memory and bodyOwner — Dispose covers both.
// If SerializeToFrames failed, bytes is default — Dispose is a no-op,
// so we must dispose bodyOwner directly.
bytes.Dispose();
if (bytes.Size == 0)
{
bodyOwner?.Dispose();
}
throw;
}
}Why this works
| Failure | What happens |
|---|---|
!IsOpen |
bodyOwner?.Dispose() before throw ✅ |
SerializeToFrames throws |
bytes is default → Dispose() is no-op → bodyOwner?.Dispose() ✅ |
PopulateMessageEnvelopeSize throws |
bytes.Dispose() frees frame + bodyOwner ✅ |
Connection.WriteAsync throws synchronously |
Same as above ✅ |
Connection.WriteAsync returns faulted ValueTask |
WriteAsyncCore catch disposes frame ✅ (try/catch not involved) |
| Happy path | Frame flows through write pipeline → WriteLoopAsync disposes ✅ |
Practical severity
Low-to-medium. SerializeToFrames is arithmetic + MemoryPool.Rent + buffer writes — only OOM or catastrophic failure triggers this. PopulateMessageEnvelopeSize is Activity?.SetTag — essentially cannot throw. But this is the last gap in an otherwise complete ownership chain.
There was a problem hiding this comment.
Like I mentioned above my assessment was it is not worth it because when those edge cases happen you are in catastrophic territory and returning buffers is the least of your worry since you need to restart anyway.
That being said I might be missing something and I'm happy to be convinced otherwise
Proposed Changes
Alternative to #1912. If the mantainers think this is the best approach I will improve this PR body.
Types of Changes
What types of changes does your code introduce to this project?
Put an
xin the boxes that applyChecklist
Put an
xin the boxes that apply. You can also fill these out after creatingthe PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.
CONTRIBUTING.mddocumentFurther Comments
Benchmark
Official version
Body size : 1 MB Iterations : 500 Tasks : 16 Non-copying : False Startup memory : 16 MB--- Start ---
Memory usage: 17 MB
Memory usage: 19 MB
Memory usage: 19 MB
Memory usage: 21 MB
Memory usage: 21 MB
Memory usage: 2230 MB
Memory usage: 2236 MB
Memory usage: 2248 MB
Memory usage: 2249 MB
Memory usage: 2249 MB
Memory usage: 2249 MB
Memory usage: 2250 MB
Memory usage: 2249 MB
Memory usage: 2249 MB
--- Results ---
Avg time : 10948 ms
Min time : 8910 ms
Max time : 11633 ms
Memory : 2252 MB
Queue length : 8000 / 8000
Valid messages : 100 / 100 (first 100 of 8000)
PR + ReadOnlyMemory
Body size : 1 MB Iterations : 500 Tasks : 16 Non-copying : False Startup memory : 16 MB--- Start ---
Memory usage: 17 MB
Memory usage: 19 MB
Memory usage: 19 MB
Memory usage: 22 MB
Memory usage: 22 MB
Memory usage: 2119 MB
Memory usage: 2129 MB
Memory usage: 2133 MB
Memory usage: 2134 MB
Memory usage: 2135 MB
Memory usage: 2135 MB
Memory usage: 1782 MB
--- Results ---
Avg time : 8835 ms
Min time : 8615 ms
Max time : 9216 ms
Memory : 1782 MB
Queue length : 8000 / 8000
Valid messages : 100 / 100 (first 100 of 8000)
PR + IMemoryOwner
Forked version Body size : 1 MB Iterations : 500 Tasks : 16 Non-copying : True Startup memory : 16 MB--- Start ---
Memory usage: 18 MB
Memory usage: 19 MB
Memory usage: 20 MB
Memory usage: 22 MB
Memory usage: 23 MB
Memory usage: 35 MB
Memory usage: 37 MB
Memory usage: 37 MB
Memory usage: 37 MB
Memory usage: 38 MB
Memory usage: 40 MB
Memory usage: 41 MB
--- Results ---
Avg time : 8940 ms
Min time : 8563 ms
Max time : 9373 ms
Memory : 41 MB
Queue length : 8000 / 8000
Valid messages : 100 / 100 (first 100 of 8000)