Skip to content

Commit ad3f4a4

Browse files
authored
Merge pull request #112 from rabbitmq/rabbitmq-amqp-dotnet-client-111
Make `PublishAsync` actually async.
2 parents 60194c1 + c3d3e20 commit ad3f4a4

File tree

1 file changed

+18
-19
lines changed

1 file changed

+18
-19
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ await base.OpenAsync()
101101
/// <exception cref="InvalidOperationException"></exception>
102102
/// <exception cref="NotSupportedException"></exception>
103103
/// <exception cref="PublisherException"></exception>
104-
public async Task<PublishResult> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
104+
public Task<PublishResult> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
105105
{
106106
ThrowIfClosed();
107107

@@ -119,17 +119,17 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
119119
stopwatch.Start();
120120
}
121121

122+
TaskCompletionSource<PublishResult> publishResultTcs =
123+
Utils.CreateTaskCompletionSource<PublishResult>();
124+
122125
try
123126
{
124-
TaskCompletionSource<PublishOutcome> messagePublishedTcs =
125-
Utils.CreateTaskCompletionSource<PublishOutcome>();
126-
127127
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
128128

129129
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
130130
{
131131
// Note: sometimes `message` is null 🤔
132-
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(this, state));
132+
Debug.Assert(Object.ReferenceEquals(this, state));
133133

134134
if (false == Object.ReferenceEquals(_senderLink, sender))
135135
{
@@ -167,7 +167,15 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
167167
}
168168
}
169169

170-
messagePublishedTcs.SetResult(publishOutcome);
170+
// TODO cancellation token
171+
if (_metricsReporter is not null && stopwatch is not null)
172+
{
173+
stopwatch.Stop();
174+
_metricsReporter.Published(stopwatch.Elapsed);
175+
}
176+
177+
var publishResult = new PublishResult(message, publishOutcome);
178+
publishResultTcs.SetResult(publishResult);
171179
}
172180

173181
/*
@@ -176,25 +184,16 @@ void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object st
176184
*/
177185
_senderLink.Send(nativeMessage, OutcomeCallback, this);
178186

179-
// TODO cancellation token
180-
// PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken)
181-
PublishOutcome publishOutcome = await messagePublishedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
182-
.ConfigureAwait(false);
183-
184-
if (_metricsReporter is not null && stopwatch is not null)
185-
{
186-
stopwatch.Stop();
187-
_metricsReporter.Published(stopwatch.Elapsed);
188-
}
189-
190-
return new PublishResult(message, publishOutcome);
187+
return publishResultTcs.Task;
191188
}
192189
catch (AmqpException ex)
193190
{
194191
stopwatch?.Stop();
195192
_metricsReporter?.PublishDisposition(IMetricsReporter.PublishDispositionValue.REJECTED);
196193
var publishOutcome = new PublishOutcome(OutcomeState.Rejected, Utils.ConvertError(ex.Error));
197-
return new PublishResult(message, publishOutcome);
194+
var publishResult = new PublishResult(message, publishOutcome);
195+
publishResultTcs.SetResult(publishResult);
196+
return publishResultTcs.Task;
198197
}
199198
catch (Exception e)
200199
{

0 commit comments

Comments
 (0)