Skip to content

Commit ce74914

Browse files
committed
(#301) Adjust channel scaling + recovery
This commit fixes a potential DevidedByZeroException when channel workload is calculated in the auto scaling channel pool. It also passes a cancellation token to the autoscaling, making it so that it the channel is not created within a reasonable time span, the task is cancelled.
1 parent 311ff71 commit ce74914

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

src/RawRabbit/Channel/AutoScalingChannelPool.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,25 @@ public void SetupScaling()
6565

6666
_timer = new Timer(state =>
6767
{
68-
var workPerChannel = ChannelRequestQueue.Count / Pool.Count;
68+
var workPerChannel = Pool.Count == 0 ? int.MaxValue : ChannelRequestQueue.Count / Pool.Count;
6969
var scaleUp = Pool.Count < _options.MaximumPoolSize;
7070
var scaleDown = _options.MinimunPoolSize < Pool.Count;
7171

7272
_logger.Debug("Channel pool currently has {channelCount} channels open and a total workload of {totalWorkload}", Pool.Count, ChannelRequestQueue.Count);
7373
if (scaleUp && _options.DesiredAverageWorkload < workPerChannel)
7474
{
7575
_logger.Debug("The estimated workload is {averageWorkload} operations/channel, which is higher than the desired workload ({desiredAverageWorkload}). Creating channel.", workPerChannel, _options.DesiredAverageWorkload);
76+
77+
var channelCancellation = new CancellationTokenSource(_options.RefreshInterval);
7678
_factory
77-
.CreateChannelAsync()
79+
.CreateChannelAsync(channelCancellation.Token)
7880
.ContinueWith(tChannel =>
7981
{
80-
Add(tChannel.Result);
81-
});
82+
if (tChannel.Status == TaskStatus.RanToCompletion)
83+
{
84+
Add(tChannel.Result);
85+
}
86+
}, CancellationToken.None);
8287
return;
8388
}
8489

src/RawRabbit/Channel/ChannelFactory.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public ChannelFactory(IConnectionFactory connectionFactory, RawRabbitConfigurati
3232
{
3333
_logger.Debug("Creating a new connection for {hostNameCount} hosts.", ClientConfig.Hostnames.Count);
3434
Connection = ConnectionFactory.CreateConnection(ClientConfig.Hostnames);
35+
Connection.ConnectionShutdown += (sender, args) =>
36+
_logger.Warn("Connection was shutdown by {Initiator}. ReplyText {ReplyText}", args.Initiator, args.ReplyText);
3537
}
3638
catch (BrokerUnreachableException e)
3739
{
@@ -43,8 +45,8 @@ public ChannelFactory(IConnectionFactory connectionFactory, RawRabbitConfigurati
4345

4446
public virtual async Task<IModel> CreateChannelAsync(CancellationToken token = default(CancellationToken))
4547
{
46-
token.ThrowIfCancellationRequested();
4748
var connection = await GetConnectionAsync(token);
49+
token.ThrowIfCancellationRequested();
4850
var channel = connection.CreateModel();
4951
Channels.Add(channel);
5052
return channel;
@@ -80,11 +82,16 @@ public ChannelFactory(IConnectionFactory connectionFactory, RawRabbitConfigurati
8082

8183
_logger.Debug("Connection is recoverable. Waiting for 'Recovery' event to be triggered. ");
8284
var recoverTcs = new TaskCompletionSource<IConnection>();
85+
token.Register(() => recoverTcs.SetCanceled());
8386

8487
EventHandler<EventArgs> completeTask = null;
8588
completeTask = (sender, args) =>
8689
{
87-
_logger.Debug("Connection has been recovered!");
90+
if (recoverTcs.Task.IsCanceled)
91+
{
92+
return;
93+
}
94+
_logger.Info("Connection has been recovered!");
8895
recoverTcs.TrySetResult(recoverable as IConnection);
8996
recoverable.Recovery -= completeTask;
9097
};

0 commit comments

Comments
 (0)