Skip to content

Commit 039aa87

Browse files
inoa-pmattosPauloHMattos
authored andcommitted
Refactor OutgoingFrameMemory for zero-copy AMQP payload streaming
This commit eliminates large contiguous buffer allocations and redundant payload copying during AMQP frame serialization by fully leveraging System.IO.Pipelines.
1 parent ebe5ae5 commit 039aa87

File tree

3 files changed

+118
-30
lines changed

3 files changed

+118
-30
lines changed

projects/RabbitMQ.Client/Impl/Frame.cs

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131

3232
using System;
3333
using System.Buffers;
34+
using System.Buffers.Binary;
3435
using System.IO;
3536
using System.IO.Pipelines;
3637
using System.Runtime.CompilerServices;
3738
using System.Threading;
39+
using System.Threading.Channels;
3840
using System.Threading.Tasks;
3941
using RabbitMQ.Client.Exceptions;
4042
using RabbitMQ.Client.Framing;
@@ -113,17 +115,30 @@ internal static class BodySegment
113115
* +--------------+
114116
* | x bytes |
115117
* +--------------+ */
118+
public const int HeaderSize = StartPayload;
119+
public const int FooterSize = 1;
116120
public const int FrameSize = BaseFrameSize;
117121

118122
[MethodImpl(MethodImplOptions.AggressiveInlining)]
119123
public static int WriteTo(Span<byte> span, ushort channel, ReadOnlySpan<byte> body)
120124
{
121125
const int StartBodyArgument = StartPayload;
122-
NetworkOrderSerializer.WriteUInt64(ref span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8));
126+
WriteHeader(span, channel, body.Length);
123127
body.CopyTo(span.Slice(StartBodyArgument));
124128
span[StartPayload + body.Length] = Constants.FrameEnd;
129+
WriteFooter(span.Slice(StartPayload + body.Length));
125130
return body.Length + BaseFrameSize;
126131
}
132+
133+
public static void WriteHeader(Span<byte> span, ushort channel, int bodyLength)
134+
{
135+
NetworkOrderSerializer.WriteUInt64(ref span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)bodyLength << 8));
136+
}
137+
138+
public static void WriteFooter(Span<byte> span)
139+
{
140+
span[0] = Constants.FrameEnd;
141+
}
127142
}
128143

129144
internal static class Heartbeat
@@ -170,27 +185,42 @@ public static OutgoingFrameMemory SerializeToFrames<TMethod, THeader>(ref TMetho
170185
where TMethod : struct, IOutgoingAmqpMethod
171186
where THeader : IAmqpHeader
172187
{
173-
int remainingBodyBytes = body.Length;
174-
int size = Method.FrameSize + Header.FrameSize +
175-
method.GetRequiredBufferSize() + header.GetRequiredBufferSize() +
176-
BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes) + remainingBodyBytes;
177-
178188
// Will be returned by SocketFrameWriter.WriteLoop
179-
IMemoryOwner<byte> buffer = MemoryPool<byte>.Shared.Rent(size);
189+
IMemoryOwner<byte> bodyCopy = MemoryPool<byte>.Shared.Rent(body.Length);
190+
body.CopyTo(bodyCopy.Memory);
191+
return SerializeToFrames(ref method, ref header, bodyCopy, body.Length, channelNumber, maxBodyPayloadBytes);
192+
}
193+
194+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
195+
public static OutgoingFrameMemory SerializeToFrames<TMethod, THeader>(ref TMethod method, ref THeader header, IMemoryOwner<byte> body, int bodyLength, ushort channelNumber, int maxBodyPayloadBytes)
196+
where TMethod : struct, IOutgoingAmqpMethod
197+
where THeader : IAmqpHeader
198+
{
199+
// Calculate ONLY the Method and Header framing size
200+
int framingSize = Method.FrameSize + Header.FrameSize +
201+
method.GetRequiredBufferSize() + header.GetRequiredBufferSize();
202+
203+
// Pre-calculate total final sequence size
204+
int bodyFramesCount = GetBodyFrameCount(maxBodyPayloadBytes, bodyLength);
205+
int totalSize = framingSize + bodyLength + (BodySegment.FrameSize * bodyFramesCount);
206+
207+
// Rent a smaller buffer exclusively for the Method and Header
208+
IMemoryOwner<byte> buffer = MemoryPool<byte>.Shared.Rent(framingSize);
180209
Span<byte> bufferSpan = buffer.Memory.Span;
181210

182211
int offset = Method.WriteTo(bufferSpan, channelNumber, ref method);
183-
offset += Header.WriteTo(bufferSpan.Slice(offset), channelNumber, ref header, remainingBodyBytes);
184-
ReadOnlySpan<byte> bodySpan = body.Span;
185-
while (remainingBodyBytes > 0)
186-
{
187-
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
188-
offset += BodySegment.WriteTo(bufferSpan.Slice(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize));
189-
remainingBodyBytes -= frameSize;
190-
}
191-
192-
System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}");
193-
return new OutgoingFrameMemory(buffer, size);
212+
offset += Header.WriteTo(bufferSpan.Slice(offset), channelNumber, ref header, bodyLength);
213+
214+
System.Diagnostics.Debug.Assert(offset == framingSize, $"Serialized to wrong size, expect {framingSize}, offset {offset}");
215+
216+
return new OutgoingFrameMemory(
217+
buffer,
218+
framingSize,
219+
body,
220+
bodyLength,
221+
channelNumber,
222+
maxBodyPayloadBytes,
223+
totalSize);
194224
}
195225

196226
[MethodImpl(MethodImplOptions.AggressiveInlining)]

projects/RabbitMQ.Client/Impl/SocketFrameHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ private async Task WriteLoopAsync()
251251
try
252252
{
253253
frames.WriteTo(_pipeWriter);
254-
await _pipeWriter.FlushAsync().ConfigureAwait(false);
254+
await _pipeWriter.FlushAsync()
255+
.ConfigureAwait(false);
255256
RabbitMqClientEventSource.Log.CommandSent(frames.Size);
256257
}
257258
finally

projects/RabbitMQ.Client/OutgoingFrameMemory.cs

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,35 +31,92 @@
3131

3232
using System;
3333
using System.Buffers;
34+
using System.Diagnostics;
35+
using static RabbitMQ.Client.Impl.Framing;
3436

3537
namespace RabbitMQ.Client
3638
{
3739
internal struct OutgoingFrameMemory : IDisposable
3840
{
39-
internal OutgoingFrameMemory(IMemoryOwner<byte> memoryOwner, int length)
41+
private IMemoryOwner<byte>? _methodAndHeader;
42+
private readonly int _methodAndHeaderLength;
43+
private IMemoryOwner<byte>? _body;
44+
private readonly int _bodyLength;
45+
private readonly int _maxBodyPayloadBytes;
46+
private readonly ushort _channelNumber;
47+
48+
internal OutgoingFrameMemory(
49+
IMemoryOwner<byte> methodAndHeader,
50+
int methodAndHeaderLength)
4051
{
41-
_memory = memoryOwner.Memory.Slice(0, length);
42-
_memoryOwner = memoryOwner;
52+
_methodAndHeader = methodAndHeader;
53+
_methodAndHeaderLength = methodAndHeaderLength;
54+
_body = null;
55+
_bodyLength = 0;
56+
_channelNumber = 0;
57+
_maxBodyPayloadBytes = 0;
58+
Size = methodAndHeaderLength;
4359
}
4460

45-
internal readonly int Size => _memory.Length;
61+
internal OutgoingFrameMemory(
62+
IMemoryOwner<byte> methodAndHeader,
63+
int methodAndHeaderLength,
64+
IMemoryOwner<byte> body,
65+
int bodyLength,
66+
ushort channelNumber,
67+
int maxBodyPayloadBytes,
68+
int totalSize)
69+
{
70+
_methodAndHeader = methodAndHeader;
71+
_methodAndHeaderLength = methodAndHeaderLength;
72+
_body = body;
73+
_bodyLength = bodyLength;
74+
_channelNumber = channelNumber;
75+
_maxBodyPayloadBytes = maxBodyPayloadBytes;
76+
Size = totalSize;
77+
}
4678

47-
private ReadOnlyMemory<byte> _memory;
48-
private IMemoryOwner<byte>? _memoryOwner;
79+
internal readonly int Size { get; }
4980

50-
internal readonly void WriteTo(IBufferWriter<byte> pipeWriter)
81+
internal readonly void WriteTo(IBufferWriter<byte> writer)
5182
{
52-
pipeWriter.Write(_memory.Span);
83+
Debug.Assert(_methodAndHeader is not null);
84+
ReadOnlySpan<byte> methodAndHeader = _methodAndHeader!.Memory.Span.Slice(0, _methodAndHeaderLength);
85+
writer.Write(methodAndHeader);
86+
87+
if (_bodyLength == 0)
88+
{
89+
return;
90+
}
91+
92+
Debug.Assert(_body is not null);
93+
ReadOnlySpan<byte> bodySpan = _body!.Memory.Span.Slice(0, _bodyLength);
94+
int remainingBodyBytes = bodySpan.Length;
95+
int bodyOffset = 0;
96+
97+
while (remainingBodyBytes > 0)
98+
{
99+
int payloadSize = remainingBodyBytes > _maxBodyPayloadBytes ? _maxBodyPayloadBytes : remainingBodyBytes;
100+
101+
Span<byte> span = writer.GetSpan(BodySegment.HeaderSize + payloadSize + BodySegment.FooterSize);
102+
int offset = BodySegment.WriteTo(span, _channelNumber, bodySpan.Slice(bodyOffset, payloadSize));
103+
writer.Advance(offset);
104+
105+
remainingBodyBytes -= payloadSize;
106+
bodyOffset += payloadSize;
107+
}
53108
}
54109

55110
public void Dispose()
56111
{
57-
IMemoryOwner<byte>? memoryOwner = _memoryOwner;
58-
_memoryOwner = null;
112+
IMemoryOwner<byte>? memoryOwner = _methodAndHeader;
113+
_methodAndHeader = null;
59114
if (memoryOwner != null)
60115
{
61116
memoryOwner.Dispose();
62-
_memory = default;
117+
_methodAndHeader = default;
118+
_body?.Dispose();
119+
_body = null;
63120
}
64121
}
65122
}

0 commit comments

Comments
 (0)