Skip to content

IPublisher XML doc #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions RabbitMQ.AMQP.Client/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,40 @@
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.AMQP.Client
{
public class PublisherException : Exception
{
public PublisherException(string message) : base(message)
{
}
}

/// <summary>
/// Represents the status of a publish operation.
/// Accepted: The message was accepted for publication.
/// Rejected: The message was rejected by the broker.
/// Released: The message was released by the broker.
/// See <see href="https://www.rabbitmq.com/docs/amqp#outcomes">AMQP Outcomes</see>.
/// </summary>
public enum OutcomeState
{
/// <summary>
/// The message has been accepted by the broker.
/// </summary>
Accepted,

/// <summary>
/// At least one queue the message was routed to rejected the message. This happens when the
/// queue length is exceeded and the queue's overflow behaviour is set to reject-publish or when
/// a target classic queue is unavailable.
/// </summary>
Rejected,
Released,

/// <summary>
/// The broker could not route the message to any queue.
/// This is likely to be due to a topology misconfiguration.
/// </summary>
Released
}

/// <summary>
/// PublishOutcome represents the outcome of a publish operation.
/// It contains the state of the outcome and an error if the outcome is not successful.
/// Represents the outcome of a publish operation.
/// It contains the state of the outcome and an error if the outcome is not successful.
/// </summary>

public class PublishOutcome
{
public PublishOutcome(OutcomeState state, Error? error)
Expand All @@ -41,11 +44,21 @@ public PublishOutcome(OutcomeState state, Error? error)
Error = error;
}

/// <summary>
/// The <see cref="OutcomeState"/>.
/// </summary>
public OutcomeState State { get; }

/// <summary>
/// The <see cref="Error"/>, if any.
/// </summary>
public Error? Error { get; }
}

/// <summary>
/// Represents the result of a publish operation.
/// It contains the <see cref="PublishOutcome"/> and the original <see cref="IMessage"/>.
/// </summary>
public class PublishResult
{
public PublishResult(IMessage message, PublishOutcome outcome)
Expand All @@ -60,8 +73,8 @@ public PublishResult(IMessage message, PublishOutcome outcome)
}

/// <summary>
/// Interface for publishing messages to an AMQP broker.
/// Implementations of this interface are expected to be thread-safe.
/// Interface for publishing messages to an AMQP broker.
/// Implementations of this interface are expected to be thread-safe.
/// </summary>
public interface IPublisher : ILifeCycle
{
Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

namespace RabbitMQ.AMQP.Client.Impl
{
/// <summary>
/// Implementation of <see cref="IConsumer"/>.
/// </summary>
public class AmqpConsumer : AbstractReconnectLifeCycle, IConsumer
{
private enum PauseStatus
Expand Down
137 changes: 73 additions & 64 deletions RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

namespace RabbitMQ.AMQP.Client.Impl
{
/// <summary>
/// Implementation of <see cref="IPublisher"/>.
/// </summary>
public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
{
private readonly AmqpConnection _connection;
Expand All @@ -30,6 +33,10 @@ public AmqpPublisher(AmqpConnection connection, string? address, IMetricsReporte
_connection.AddPublisher(_id, this);
}

/// <summary>
/// Open this publisher
/// </summary>
/// <returns>A <see cref="Task"/> representing the async operation.</returns>
public override async Task OpenAsync()
{
try
Expand Down Expand Up @@ -91,13 +98,11 @@ await base.OpenAsync()
}

/// <summary>
/// Publishes a message to the broker in an asynchronous manner.
/// The PublishResult is synchronous. In order to increase the performance
/// you can use more tasks to publish messages in parallel
/// Publishes a message to the broker asynchronously.
/// </summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <returns>A <see cref="Task"/> representating the await-able result of the publish operation.</returns>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="NotSupportedException"></exception>
/// <exception cref="PublisherException"></exception>
Expand All @@ -122,85 +127,89 @@ public Task<PublishResult> PublishAsync(IMessage message, CancellationToken canc
TaskCompletionSource<PublishResult> publishResultTcs =
Utils.CreateTaskCompletionSource<PublishResult>();

try
Message nativeMessage = ((AmqpMessage)message).NativeMessage;

void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
{
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
// Note: sometimes `inMessage` is null 🤔
Debug.Assert(Object.ReferenceEquals(this, state));

void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
if (false == Object.ReferenceEquals(_senderLink, sender))
{
// Note: sometimes `message` is null 🤔
Debug.Assert(Object.ReferenceEquals(this, state));

if (false == Object.ReferenceEquals(_senderLink, sender))
{
// TODO log this case?
}

PublishOutcome publishOutcome;
switch (outcome)
{
case Rejected rejectedOutcome:
{
const OutcomeState publishState = OutcomeState.Rejected;
publishOutcome = new PublishOutcome(publishState,
Utils.ConvertError(rejectedOutcome.Error));
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
break;
}
case Released:
{
const OutcomeState publishState = OutcomeState.Released;
publishOutcome = new PublishOutcome(publishState, null);
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.RELEASED);
break;
}
case Accepted:
{
const OutcomeState publishState = OutcomeState.Accepted;
publishOutcome = new PublishOutcome(publishState, null);
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.ACCEPTED);
break;
}
default:
{
throw new NotSupportedException();
}
}
// TODO log this case?
}

// TODO cancellation token
if (_metricsReporter is not null && stopwatch is not null)
{
stopwatch.Stop();
_metricsReporter.Published(stopwatch.Elapsed);
}
PublishOutcome publishOutcome;
switch (outcome)
{
case Rejected rejectedOutcome:
{
const OutcomeState publishState = OutcomeState.Rejected;
publishOutcome = new PublishOutcome(publishState,
Utils.ConvertError(rejectedOutcome.Error));
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
break;
}
case Released:
{
const OutcomeState publishState = OutcomeState.Released;
publishOutcome = new PublishOutcome(publishState, null);
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.RELEASED);
break;
}
case Accepted:
{
const OutcomeState publishState = OutcomeState.Accepted;
publishOutcome = new PublishOutcome(publishState, null);
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.ACCEPTED);
break;
}
default:
{
throw new NotSupportedException();
}
}

var publishResult = new PublishResult(message, publishOutcome);
publishResultTcs.SetResult(publishResult);
// TODO cancellation token
if (_metricsReporter is not null && stopwatch is not null)
{
stopwatch.Stop();
_metricsReporter.Published(stopwatch.Elapsed);
}

/*
* Note: do NOT use SendAsync here as it prevents the Closed event from
* firing on the native connection. Bizarre, I know!
*/
_senderLink.Send(nativeMessage, OutcomeCallback, this);
var publishResult = new PublishResult(message, publishOutcome);
publishResultTcs.SetResult(publishResult);
}

return publishResultTcs.Task;
/*
* Note: do NOT use SendAsync here as it prevents the Closed event from
* firing on the native connection. Bizarre, I know!
*/
try
{
_senderLink.Send(nativeMessage, OutcomeCallback, this);
}
catch (AmqpException ex)
catch (AmqpException amqpException)
{
stopwatch?.Stop();
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error));
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(amqpException.Error));
var publishResult = new PublishResult(message, publishOutcome);
publishResultTcs.SetResult(publishResult);
return publishResultTcs.Task;
}
catch (Exception e)
catch (Exception ex)
{
throw new PublisherException($"{ToString()} Failed to publish message, {e}");
var publisherException = new PublisherException($"{ToString()} Failed to publish message", ex);
publishResultTcs.SetException(publisherException);
}

return publishResultTcs.Task;
}

/// <summary>
/// Close this publisher
/// </summary>
/// <returns>A <see cref="Task"/> representing the async operation.</returns>
public override async Task CloseAsync()
{
if (_senderLink is null)
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ RabbitMQ.AMQP.Client.PreconditionFailedException
RabbitMQ.AMQP.Client.PreconditionFailedException.PreconditionFailedException(string! message) -> void
RabbitMQ.AMQP.Client.PublisherException
RabbitMQ.AMQP.Client.PublisherException.PublisherException(string! message) -> void
RabbitMQ.AMQP.Client.PublisherException.PublisherException(string! message, System.Exception! innerException) -> void
RabbitMQ.AMQP.Client.PublishOutcome
RabbitMQ.AMQP.Client.PublishOutcome.Error.get -> RabbitMQ.AMQP.Client.Error?
RabbitMQ.AMQP.Client.PublishOutcome.PublishOutcome(RabbitMQ.AMQP.Client.OutcomeState state, RabbitMQ.AMQP.Client.Error? error) -> void
Expand Down
19 changes: 19 additions & 0 deletions RabbitMQ.AMQP.Client/PublisherException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// This source code is dual-licensed under the Apache License, version 2.0,
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;

namespace RabbitMQ.AMQP.Client
{
public class PublisherException : Exception
{
public PublisherException(string message) : base(message)
{
}

public PublisherException(string message, Exception innerException) : base(message, innerException)
{
}
}
}