Skip to content

Commit 0b0acef

Browse files
authored
Merge pull request #1384 from danielgerlag/copilot/fix-rabbitmq-client-support-issue
Add RabbitMQ.Client v7 support by migrating to async API
2 parents b78a79f + 7f96f67 commit 0b0acef

File tree

4 files changed

+36
-21
lines changed

4 files changed

+36
-21
lines changed

src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
using System;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
57
using Microsoft.Extensions.DependencyInjection.Extensions;
68
using WorkflowCore.Interface;
79
using WorkflowCore.Models;
@@ -10,7 +12,7 @@
1012

1113
namespace Microsoft.Extensions.DependencyInjection
1214
{
13-
public delegate IConnection RabbitMqConnectionFactory(IServiceProvider sp, string clientProvidedName);
15+
public delegate Task<IConnection> RabbitMqConnectionFactory(IServiceProvider sp, string clientProvidedName, CancellationToken cancellationToken = default);
1416

1517
public static class ServiceCollectionExtensions
1618
{
@@ -20,7 +22,7 @@ public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, IConnect
2022
if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory));
2123

2224
return options
23-
.UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(name));
25+
.UseRabbitMQ(async (sp, name, cancellationToken) => await connectionFactory.CreateConnectionAsync(name, cancellationToken));
2426
}
2527

2628
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options,
@@ -32,7 +34,7 @@ public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options,
3234
if (hostnames == null) throw new ArgumentNullException(nameof(hostnames));
3335

3436
return options
35-
.UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(hostnames.ToList(), name));
37+
.UseRabbitMQ(async (sp, name, cancellationToken) => await connectionFactory.CreateConnectionAsync(hostnames, name, cancellationToken));
3638
}
3739

3840
public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, RabbitMqConnectionFactory rabbitMqConnectionFactory)

src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,16 @@ public async Task QueueWork(string id, QueueType queue)
3737
if (_connection == null)
3838
throw new InvalidOperationException("RabbitMQ provider not running");
3939

40-
using (var channel = _connection.CreateModel())
40+
var channel = await _connection.CreateChannelAsync(new CreateChannelOptions(publisherConfirmationsEnabled: false, publisherConfirmationTrackingEnabled: false), CancellationToken.None);
41+
try
4142
{
42-
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null);
43-
var body = Encoding.UTF8.GetBytes(id);
44-
channel.BasicPublish(exchange: "", routingKey: _queueNameProvider.GetQueueName(queue), basicProperties: null, body: body);
43+
await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null, passive: false, noWait: false, CancellationToken.None);
44+
var body = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(id));
45+
await channel.BasicPublishAsync(exchange: "", routingKey: _queueNameProvider.GetQueueName(queue), mandatory: false, basicProperties: new BasicProperties(), body: body, CancellationToken.None);
46+
}
47+
finally
48+
{
49+
await channel.CloseAsync(200, "OK", abort: false, CancellationToken.None);
4550
}
4651
}
4752

@@ -50,46 +55,54 @@ public async Task<string> DequeueWork(QueueType queue, CancellationToken cancell
5055
if (_connection == null)
5156
throw new InvalidOperationException("RabbitMQ provider not running");
5257

53-
using (var channel = _connection.CreateModel())
58+
var channel = await _connection.CreateChannelAsync(new CreateChannelOptions(publisherConfirmationsEnabled: false, publisherConfirmationTrackingEnabled: false), CancellationToken.None);
59+
try
5460
{
55-
channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue),
56-
durable: true,
57-
exclusive: false,
58-
autoDelete: false,
59-
arguments: null);
61+
await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue),
62+
durable: true,
63+
exclusive: false,
64+
autoDelete: false,
65+
arguments: null,
66+
passive: false,
67+
noWait: false,
68+
CancellationToken.None);
6069

61-
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
70+
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, CancellationToken.None);
6271

63-
var msg = channel.BasicGet(_queueNameProvider.GetQueueName(queue), false);
72+
var msg = await channel.BasicGetAsync(_queueNameProvider.GetQueueName(queue), autoAck: false, CancellationToken.None);
6473
if (msg != null)
6574
{
6675
var data = Encoding.UTF8.GetString(msg.Body.ToArray());
67-
channel.BasicAck(msg.DeliveryTag, false);
76+
await channel.BasicAckAsync(msg.DeliveryTag, multiple: false, CancellationToken.None);
6877
return data;
6978
}
7079
return null;
7180
}
81+
finally
82+
{
83+
await channel.CloseAsync(200, "OK", abort: false, CancellationToken.None);
84+
}
7285
}
7386

7487
public void Dispose()
7588
{
7689
if (_connection != null)
7790
{
7891
if (_connection.IsOpen)
79-
_connection.Close();
92+
_connection.CloseAsync(200, "OK", TimeSpan.FromSeconds(10), abort: false, CancellationToken.None).GetAwaiter().GetResult();
8093
}
8194
}
8295

8396
public async Task Start()
8497
{
85-
_connection = _rabbitMqConnectionFactory(_serviceProvider, "Workflow-Core");
98+
_connection = await _rabbitMqConnectionFactory(_serviceProvider, "Workflow-Core");
8699
}
87100

88101
public async Task Stop()
89102
{
90103
if (_connection != null)
91104
{
92-
_connection.Close();
105+
await _connection.CloseAsync(200, "OK", TimeSpan.FromSeconds(10), abort: false, CancellationToken.None);
93106
_connection = null;
94107
}
95108
}

src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
<ItemGroup>
2525
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
26-
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
26+
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
2727
</ItemGroup>
2828

2929
</Project>

test/WorkflowCore.Tests.QueueProviders.RabbitMQ/WorkflowCore.Tests.QueueProviders.RabbitMQ.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFrameworks>net6.0</TargetFrameworks>
4+
<TargetFrameworks>net8.0</TargetFrameworks>
55
</PropertyGroup>
66

77
<ItemGroup>

0 commit comments

Comments
 (0)