Skip to content

Commit 7b09635

Browse files
inoa-pmattosPauloHMattos
authored andcommitted
Add BasicPublishAsync overloads that accept IMemoryOwner<byte> as the body
1 parent 039aa87 commit 7b09635

File tree

11 files changed

+334
-16
lines changed

11 files changed

+334
-16
lines changed

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Threading;
3536
using System.Threading.Tasks;
@@ -211,6 +212,25 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
211212
CancellationToken cancellationToken = default)
212213
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
213214

215+
/// <summary>
216+
/// Asynchronously publishes a message.
217+
/// </summary>
218+
/// <param name="exchange">The exchange.</param>
219+
/// <param name="routingKey">The routing key.</param>
220+
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
221+
/// <param name="basicProperties">The message properties.</param>
222+
/// <param name="body">The message body length.</param>
223+
/// <param name="bodyLength">The message body.</param>
224+
/// <param name="cancellationToken">CancellationToken for this operation.</param>
225+
/// <remarks>
226+
/// Routing key must be shorter than 255 bytes.
227+
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
228+
/// </remarks>
229+
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
230+
bool mandatory, TProperties basicProperties, IMemoryOwner<byte> body, int bodyLength,
231+
CancellationToken cancellationToken = default)
232+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
233+
214234
/// <summary>
215235
/// Asynchronously publishes a message.
216236
/// </summary>
@@ -229,6 +249,25 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
229249
CancellationToken cancellationToken = default)
230250
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
231251

252+
/// <summary>
253+
/// Asynchronously publishes a message.
254+
/// </summary>
255+
/// <param name="exchange">The exchange.</param>
256+
/// <param name="routingKey">The routing key.</param>
257+
/// <param name="mandatory">If set to <c>true</c>, the message must route to a queue.</param>
258+
/// <param name="basicProperties">The message properties.</param>
259+
/// <param name="body">The message body.</param>
260+
/// <param name="bodyLength">The message body length.</param>
261+
/// <param name="cancellationToken">CancellationToken for this operation.</param>
262+
/// <remarks>
263+
/// Routing key must be shorter than 255 bytes.
264+
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
265+
/// </remarks>
266+
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
267+
bool mandatory, TProperties basicProperties, IMemoryOwner<byte> body, int bodyLength,
268+
CancellationToken cancellationToken = default)
269+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
270+
232271
/// <summary>
233272
/// Configures QoS parameters of the Basic content-class.
234273
/// </summary>

projects/RabbitMQ.Client/IChannelExtensions.cs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Threading;
3536
using System.Threading.Tasks;
@@ -88,6 +89,23 @@ public static ValueTask BasicPublishAsync<T>(this IChannel channel,
8889
mandatory: false, basicProperties: basicProperties, body: body,
8990
cancellationToken);
9091

92+
/// <summary>
93+
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
94+
/// </summary>
95+
/// <remarks>
96+
/// The publication occurs with mandatory=false.
97+
/// </remarks>
98+
public static ValueTask BasicPublishAsync<T>(this IChannel channel,
99+
PublicationAddress addr,
100+
T basicProperties,
101+
IMemoryOwner<byte> body,
102+
int bodyLength,
103+
CancellationToken cancellationToken = default)
104+
where T : IReadOnlyBasicProperties, IAmqpHeader =>
105+
channel.BasicPublishAsync(exchange: addr.ExchangeName, routingKey: addr.RoutingKey,
106+
mandatory: false, basicProperties: basicProperties, body: body, bodyLength: bodyLength,
107+
cancellationToken);
108+
91109
/// <summary>
92110
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
93111
/// </summary>
@@ -103,6 +121,22 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
103121
mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body,
104122
cancellationToken);
105123

124+
/// <summary>
125+
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
126+
/// </summary>
127+
/// <remarks>
128+
/// The publication occurs with mandatory=false and empty BasicProperties
129+
/// </remarks>
130+
public static ValueTask BasicPublishAsync(this IChannel channel,
131+
string exchange,
132+
string routingKey,
133+
IMemoryOwner<byte> body,
134+
int bodyLength,
135+
CancellationToken cancellationToken = default) =>
136+
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
137+
mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body, bodyLength: bodyLength,
138+
cancellationToken);
139+
106140
/// <summary>
107141
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(CachedString, CachedString, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)" />
108142
/// </summary>
@@ -118,6 +152,22 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
118152
mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body,
119153
cancellationToken);
120154

155+
/// <summary>
156+
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(CachedString, CachedString, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)" />
157+
/// </summary>
158+
/// <remarks>
159+
/// The publication occurs with mandatory=false and empty BasicProperties
160+
/// </remarks>
161+
public static ValueTask BasicPublishAsync(this IChannel channel,
162+
CachedString exchange,
163+
CachedString routingKey,
164+
IMemoryOwner<byte> body,
165+
int bodyLength,
166+
CancellationToken cancellationToken = default) =>
167+
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
168+
mandatory: false, basicProperties: EmptyBasicProperty.Empty, body: body, bodyLength: bodyLength,
169+
cancellationToken);
170+
121171
/// <summary>
122172
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
123173
/// </summary>
@@ -134,6 +184,23 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
134184
mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body,
135185
cancellationToken);
136186

187+
/// <summary>
188+
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(string, string, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)"/>
189+
/// </summary>
190+
/// <remarks>
191+
/// The publication occurs with empty BasicProperties
192+
/// </remarks>
193+
public static ValueTask BasicPublishAsync(this IChannel channel,
194+
string exchange,
195+
string routingKey,
196+
bool mandatory,
197+
IMemoryOwner<byte> body,
198+
int bodyLength,
199+
CancellationToken cancellationToken = default) =>
200+
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
201+
mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body, bodyLength: bodyLength,
202+
cancellationToken);
203+
137204
/// <summary>
138205
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(CachedString, CachedString, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)" />
139206
/// </summary>
@@ -150,6 +217,23 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
150217
mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body,
151218
cancellationToken);
152219

220+
/// <summary>
221+
/// (Extension method) Convenience overload of <see cref="IChannel.BasicPublishAsync{TProperties}(CachedString, CachedString, bool, TProperties, ReadOnlyMemory{byte}, CancellationToken)" />
222+
/// </summary>
223+
/// <remarks>
224+
/// The publication occurs with empty BasicProperties
225+
/// </remarks>
226+
public static ValueTask BasicPublishAsync(this IChannel channel,
227+
CachedString exchange,
228+
CachedString routingKey,
229+
bool mandatory,
230+
IMemoryOwner<byte> body,
231+
int bodyLength,
232+
CancellationToken cancellationToken = default) =>
233+
channel.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
234+
mandatory: mandatory, basicProperties: EmptyBasicProperty.Empty, body: body, bodyLength: bodyLength,
235+
cancellationToken);
236+
153237
/// <summary>
154238
/// Asynchronously declare a queue.
155239
/// </summary>

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Diagnostics.CodeAnalysis;
3536
using System.Runtime.CompilerServices;
@@ -349,6 +350,15 @@ public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingK
349350
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
350351
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);
351352

353+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
354+
bool mandatory,
355+
TProperties basicProperties,
356+
IMemoryOwner<byte> body,
357+
int bodyLength,
358+
CancellationToken cancellationToken = default)
359+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
360+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, bodyLength, cancellationToken);
361+
352362
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
353363
bool mandatory,
354364
TProperties basicProperties,
@@ -357,6 +367,15 @@ public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedStr
357367
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
358368
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);
359369

370+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
371+
bool mandatory,
372+
TProperties basicProperties,
373+
IMemoryOwner<byte> body,
374+
int bodyLength,
375+
CancellationToken cancellationToken = default)
376+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
377+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, bodyLength, cancellationToken);
378+
360379
public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
361380
CancellationToken cancellationToken)
362381
{

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Diagnostics;
3536
using System.Runtime.CompilerServices;
@@ -101,6 +102,63 @@ await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, canc
101102
}
102103
}
103104

105+
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
106+
bool mandatory, TProperties basicProperties, IMemoryOwner<byte> body, int bodyLength,
107+
CancellationToken cancellationToken = default)
108+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
109+
{
110+
PublisherConfirmationInfo? publisherConfirmationInfo = null;
111+
RateLimitLease? lease =
112+
await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken)
113+
.ConfigureAwait(false);
114+
try
115+
{
116+
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();
117+
118+
await MaybeEnforceFlowControlAsync(cancellationToken)
119+
.ConfigureAwait(false);
120+
121+
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
122+
123+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
124+
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, bodyLength, basicProperties)
125+
: default;
126+
127+
ulong publishSequenceNumber = 0;
128+
if (publisherConfirmationInfo is not null)
129+
{
130+
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
131+
}
132+
133+
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
134+
if (props is null)
135+
{
136+
await ModelSendAsync(in cmd, in basicProperties, body, bodyLength, cancellationToken)
137+
.ConfigureAwait(false);
138+
}
139+
else
140+
{
141+
await ModelSendAsync(in cmd, in props, body, bodyLength, cancellationToken)
142+
.ConfigureAwait(false);
143+
}
144+
}
145+
catch (Exception ex)
146+
{
147+
bool exceptionWasHandled =
148+
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
149+
if (!exceptionWasHandled)
150+
{
151+
throw;
152+
}
153+
}
154+
finally
155+
{
156+
MaybeReleasePublisherConfirmationLock(lease);
157+
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
158+
.ConfigureAwait(false);
159+
}
160+
}
161+
104162
public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
105163
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
106164
CancellationToken cancellationToken = default)
@@ -158,6 +216,64 @@ await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, canc
158216
}
159217
}
160218

219+
220+
public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
221+
bool mandatory, TProperties basicProperties, IMemoryOwner<byte> body, int bodyLength,
222+
CancellationToken cancellationToken = default)
223+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
224+
{
225+
PublisherConfirmationInfo? publisherConfirmationInfo = null;
226+
RateLimitLease? lease =
227+
await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken)
228+
.ConfigureAwait(false);
229+
try
230+
{
231+
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();
232+
233+
await MaybeEnforceFlowControlAsync(cancellationToken)
234+
.ConfigureAwait(false);
235+
236+
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
237+
238+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
239+
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, bodyLength, basicProperties)
240+
: default;
241+
242+
ulong publishSequenceNumber = 0;
243+
if (publisherConfirmationInfo is not null)
244+
{
245+
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
246+
}
247+
248+
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
249+
if (props is null)
250+
{
251+
await ModelSendAsync(in cmd, in basicProperties, body, bodyLength, cancellationToken)
252+
.ConfigureAwait(false);
253+
}
254+
else
255+
{
256+
await ModelSendAsync(in cmd, in props, body, bodyLength, cancellationToken)
257+
.ConfigureAwait(false);
258+
}
259+
}
260+
catch (Exception ex)
261+
{
262+
bool exceptionWasHandled =
263+
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
264+
if (!exceptionWasHandled)
265+
{
266+
throw;
267+
}
268+
}
269+
finally
270+
{
271+
MaybeReleasePublisherConfirmationLock(lease);
272+
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
273+
.ConfigureAwait(false);
274+
}
275+
}
276+
161277
private BasicProperties? PopulateBasicPropertiesHeaders<TProperties>(TProperties basicProperties,
162278
Activity? sendActivity, ulong publishSequenceNumber)
163279
where TProperties : IReadOnlyBasicProperties, IAmqpHeader

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.Diagnostics;
3536
using System.Diagnostics.CodeAnalysis;
@@ -485,6 +486,14 @@ protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THead
485486
return Session.TransmitAsync(in method, in header, body, cancellationToken);
486487
}
487488

489+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
490+
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, IMemoryOwner<byte> body, int bodyLength, CancellationToken cancellationToken)
491+
where TMethod : struct, IOutgoingAmqpMethod
492+
where THeader : IAmqpHeader
493+
{
494+
return Session.TransmitAsync(in method, in header, body, bodyLength, cancellationToken);
495+
}
496+
488497
internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
489498
{
490499
return _callbackExceptionAsyncWrapper.InvokeAsync(this, args);

0 commit comments

Comments
 (0)