Skip to content

Commit 3ae2be7

Browse files
committed
Added some flexible behavior to retry broker initialization on startup. Really only applies to Rabbit MQ. Closes GH-1987
1 parent a0ffc3e commit 3ae2be7

File tree

2 files changed

+51
-8
lines changed

2 files changed

+51
-8
lines changed

src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,17 +157,26 @@ public override async ValueTask ConnectAsync(IWolverineRuntime runtime)
157157
new ConnectionFactory { HostName = "localhost" };
158158

159159
configureDefaults(ConnectionFactory);
160-
161-
if (_listenerConnection == null && !UseSenderConnectionOnly)
160+
161+
try
162162
{
163-
_listenerConnection = BuildConnection(ConnectionRole.Listening);
164-
await _listenerConnection.ConnectAsync();
165-
}
163+
if (_listenerConnection == null && !UseSenderConnectionOnly)
164+
{
165+
_listenerConnection = BuildConnection(ConnectionRole.Listening);
166+
await _listenerConnection.ConnectAsync();
167+
}
166168

167-
if (_sendingConnection == null && !UseListenerConnectionOnly)
169+
if (_sendingConnection == null && !UseListenerConnectionOnly)
170+
{
171+
_sendingConnection = BuildConnection(ConnectionRole.Sending);
172+
await _sendingConnection.ConnectAsync();
173+
}
174+
}
175+
catch (Exception)
168176
{
169-
_sendingConnection = BuildConnection(ConnectionRole.Sending);
170-
await _sendingConnection.ConnectAsync();
177+
_listenerConnection = null;
178+
_sendingConnection = null;
179+
throw;
171180
}
172181

173182
foreach (var tenant in Tenants)

src/Wolverine/Transports/BrokerTransport.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,32 @@ public sealed override async ValueTask InitializeAsync(IWolverineRuntime runtime
9191

9292
tryBuildSystemEndpoints(runtime);
9393

94+
var attempts = 1;
95+
96+
for (int i = 0; i < 20; i++)
97+
{
98+
try
99+
{
100+
await startupAsync(runtime);
101+
return;
102+
}
103+
catch (Exception e)
104+
{
105+
runtime.Logger.LogError(e, "Error trying to start message broker {Broker} on Attempt {Attempt} of 20", Protocol, i + 1);
106+
if (i < 19)
107+
{
108+
runtime.Logger.LogInformation("Will retry to start broker {Broker} in 5 seconds", Protocol);
109+
await Task.Delay(5.Seconds());
110+
}
111+
}
112+
}
113+
114+
throw new BrokerInitializationException(this);
115+
116+
}
117+
118+
private async ValueTask startupAsync(IWolverineRuntime runtime)
119+
{
94120
await ConnectAsync(runtime);
95121

96122
foreach (var endpoint in endpoints())
@@ -118,5 +144,13 @@ protected virtual IEnumerable<Endpoint> explicitEndpoints()
118144
/// <param name="runtime"></param>
119145
protected virtual void tryBuildSystemEndpoints(IWolverineRuntime runtime)
120146
{
147+
}
148+
}
149+
150+
public class BrokerInitializationException : Exception
151+
{
152+
public BrokerInitializationException(IBrokerTransport transport) : base($"Unable to initialize the Broker {transport.Protocol} in time")
153+
{
154+
121155
}
122156
}

0 commit comments

Comments
 (0)