-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathRabbitMqTransportFactory.cs
132 lines (111 loc) · 4.3 KB
/
RabbitMqTransportFactory.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using System;
using System.Collections.Generic;
using RabbitMQ.Client;
using Rebus.Extensions;
using Rebus.Logging;
using Rebus.Tests.Contracts.Transports;
using Rebus.Transport;
namespace Rebus.RabbitMq.Tests
{
public class RabbitMqTransportFactory : ITransportFactory
{
// connection string for default docker instance
public const string ConnectionString = "amqp://guest:guest@localhost:5672";
readonly List<IDisposable> _disposables = new List<IDisposable>();
readonly HashSet<string> _queuesToDelete = new HashSet<string>();
public ITransport CreateOneWayClient()
{
return Create(null);
}
public ITransport Create(string inputQueueAddress)
{
var transport = new RabbitMqTransport(ConnectionString, inputQueueAddress, new ConsoleLoggerFactory(false));
_disposables.Add(transport);
if (inputQueueAddress != null)
{
transport.PurgeInputQueue();
}
transport.Initialize();
if (inputQueueAddress != null)
{
_queuesToDelete.Add(inputQueueAddress);
}
return transport;
}
public void CleanUp()
{
_disposables.ForEach(d => d.Dispose());
_disposables.Clear();
_queuesToDelete.ForEach(DeleteQueue);
_queuesToDelete.Clear();
}
public static void DeleteQueue(string queueName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
using (var connection = connectionFactory.CreateConnection())
using (var model = connection.CreateModel())
{
model.QueueDelete(queueName);
}
}
public static void DeleteExchange(string exchangeName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
using (var connection = connectionFactory.CreateConnection())
using (var model = connection.CreateModel())
{
model.ExchangeDelete(exchangeName);
}
}
/// <summary>
/// We check for the existence of the exchange with the name <paramref name="exchangeName"/> by creating another
/// randomly named exchange and trying to bind from the randomly named one to the one we want to check the existence of.
/// This causes an exception if the exchange with the name <paramref name="exchangeName"/> does not exists.
/// </summary>
public static bool ExchangeExists(string exchangeName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
using (var connection = connectionFactory.CreateConnection())
using (var model = connection.CreateModel())
{
try
{
const string nonExistentTopic = "6BE38CB8-089A-4B65-BA86-0801BBC064E9------DELETE-ME";
const string fakeExchange = "FEBC2512-CEC6-46EB-A058-37F1A9642B35------DELETE-ME";
model.ExchangeDeclare(fakeExchange, ExchangeType.Direct);
try
{
model.ExchangeBind(exchangeName, fakeExchange, nonExistentTopic);
model.ExchangeUnbind(exchangeName, fakeExchange, nonExistentTopic);
return true;
}
finally
{
model.ExchangeDelete(fakeExchange);
}
}
catch
{
return false;
}
}
}
public static bool QueueExists(string name)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
using (var connection = connectionFactory.CreateConnection())
using (var model = connection.CreateModel())
{
try
{
model.QueueDeclarePassive(name);
return true;
}
catch
{
return false;
}
}
}
}
}