Skip to content

Commit 6656e10

Browse files
Get PerformanceTest up and running (#55)
* Get PerformanceTest up and running * change perf test Signed-off-by: Gabriele Santomaggio <[email protected]> * Remove MaxInflight parameter. It is not necessary anymore Signed-off-by: Gabriele Santomaggio <[email protected]> --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent 7faf7e0 commit 6656e10

File tree

7 files changed

+213
-63
lines changed

7 files changed

+213
-63
lines changed

RabbitMQ.AMQP.Client/IPublisherBuilder.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ public interface IPublisherBuilder : IAddressBuilder<IPublisherBuilder>
1212
{
1313
IPublisherBuilder PublishTimeout(TimeSpan timeout);
1414

15-
IPublisherBuilder MaxInflightMessages(int maxInFlight);
16-
1715
Task<IPublisher> BuildAsync(CancellationToken cancellationToken = default);
1816
}
1917
}

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@ public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
1717
private readonly AmqpConnection _connection;
1818
private readonly TimeSpan _timeout;
1919
private readonly string _address;
20-
private readonly int _maxInFlight;
2120
private readonly Guid _id = Guid.NewGuid();
2221

2322
private SenderLink? _senderLink = null;
2423

25-
public AmqpPublisher(AmqpConnection connection, string address, TimeSpan timeout, int maxInFlight)
24+
public AmqpPublisher(AmqpConnection connection, string address, TimeSpan timeout)
2625
{
2726
_connection = connection;
2827
_address = address;
2928
_timeout = timeout;
30-
_maxInFlight = maxInFlight;
3129

3230
if (false == _connection.Publishers.TryAdd(_id, this))
3331
{

RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ public class AmqpPublisherBuilder : IPublisherBuilder
9292
private string? _key;
9393
private string? _queue;
9494
private TimeSpan _timeout = TimeSpan.FromSeconds(10);
95-
private int _maxInFlight = 1000;
9695

9796
public AmqpPublisherBuilder(AmqpConnection connection)
9897
{
@@ -133,17 +132,12 @@ public IPublisherBuilder PublishTimeout(TimeSpan timeout)
133132
return this;
134133
}
135134

136-
public IPublisherBuilder MaxInflightMessages(int maxInFlight)
137-
{
138-
_maxInFlight = maxInFlight;
139-
return this;
140-
}
141135

142136
public async Task<IPublisher> BuildAsync(CancellationToken cancellationToken = default)
143137
{
144138
string address = new AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address();
145139

146-
AmqpPublisher publisher = new(_connection, address, _timeout, _maxInFlight);
140+
AmqpPublisher publisher = new(_connection, address, _timeout);
147141

148142
// TODO pass cancellationToken
149143
await publisher.OpenAsync()

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,15 +281,14 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.Subject(string! subject) -> RabbitMQ.AMQP.
281281
RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException
282282
RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException.AmqpNotOpenException(string! message) -> void
283283
RabbitMQ.AMQP.Client.Impl.AmqpPublisher
284-
RabbitMQ.AMQP.Client.Impl.AmqpPublisher.AmqpPublisher(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string! address, System.TimeSpan timeout, int maxInFlight) -> void
284+
RabbitMQ.AMQP.Client.Impl.AmqpPublisher.AmqpPublisher(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string! address, System.TimeSpan timeout) -> void
285285
RabbitMQ.AMQP.Client.Impl.AmqpPublisher.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.PublishResult!>!
286286
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder
287287
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.AmqpPublisherBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
288288
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.BuildAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IPublisher!>!
289289
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
290290
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.Exchange(string! exchangeName) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
291291
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.Key(string! key) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
292-
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.MaxInflightMessages(int maxInFlight) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
293292
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.PublishTimeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
294293
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
295294
RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.Queue(string! queueName) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
@@ -425,7 +424,6 @@ RabbitMQ.AMQP.Client.IPublisher
425424
RabbitMQ.AMQP.Client.IPublisher.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.PublishResult!>!
426425
RabbitMQ.AMQP.Client.IPublisherBuilder
427426
RabbitMQ.AMQP.Client.IPublisherBuilder.BuildAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IPublisher!>!
428-
RabbitMQ.AMQP.Client.IPublisherBuilder.MaxInflightMessages(int maxInFlight) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
429427
RabbitMQ.AMQP.Client.IPublisherBuilder.PublishTimeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IPublisherBuilder!
430428
RabbitMQ.AMQP.Client.IQueueInfo
431429
RabbitMQ.AMQP.Client.IQueueInfo.Arguments() -> System.Collections.Generic.Dictionary<string!, object!>!

docs/Examples/HAClient/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
await queueSpec.DeleteAsync();
5858
await queueSpec.DeclareAsync();
5959

60-
IPublisher publisher = await connection.PublisherBuilder().Queue(queueName).MaxInflightMessages(2000).BuildAsync();
60+
IPublisher publisher = await connection.PublisherBuilder().Queue(queueName).BuildAsync();
6161

6262
ManualResetEvent pausePublishing = new(true);
6363
publisher.ChangeState += (sender, fromState, toState, e) =>

docs/Examples/PerformanceTest/Program.cs

Lines changed: 93 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,26 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System.Diagnostics;
6+
using PerformanceTest;
67
using RabbitMQ.AMQP.Client;
78
using RabbitMQ.AMQP.Client.Impl;
89
using Trace = Amqp.Trace;
910
using TraceLevel = Amqp.TraceLevel;
1011

12+
// ---- Configuration ----
13+
const int total = 5_000_000;
14+
const int tasksSize = 200;
15+
bool enableConsumer = true;
16+
// -----------------------
17+
18+
1119
Trace.TraceLevel = TraceLevel.Verbose;
1220

1321
ConsoleTraceListener consoleListener = new();
1422
Trace.TraceListener = (l, f, a) =>
1523
consoleListener.WriteLine($"[{DateTime.Now}] [{l}] - {f}");
1624

17-
Trace.WriteLine(TraceLevel.Information, "Starting");
25+
Trace.WriteLine(TraceLevel.Information, "Starting performance test...");
1826
const string containerId = "performance-test-connection";
1927

2028
IEnvironment environment = await AmqpEnvironment
@@ -30,86 +38,124 @@
3038
IQueueSpecification queueSpec = management.Queue(queueName).Type(QueueType.QUORUM);
3139
await queueSpec.DeleteAsync();
3240
await queueSpec.DeclareAsync();
33-
Trace.WriteLine(TraceLevel.Information, "Queue Created");
34-
35-
IPublisher publisher = await connection.PublisherBuilder().Queue(queueName).MaxInflightMessages(5000).BuildAsync();
36-
37-
int received = 0;
38-
DateTime start = DateTime.Now;
41+
Trace.WriteLine(TraceLevel.Information, $"Queue {queueName} recreated");
42+
Stats stats = new();
43+
IPublisher publisher = await connection.PublisherBuilder().Queue(queueName).BuildAsync();
3944

4045
async Task MessageHandler(IContext context, IMessage message)
4146
{
4247
await context.AcceptAsync();
48+
stats.IncrementConsumed();
49+
}
50+
51+
IConsumer? consumer = null;
52+
53+
if (enableConsumer)
54+
{
55+
consumer = await connection.ConsumerBuilder()
56+
.Queue(queueName)
57+
.InitialCredits(1000)
58+
.MessageHandler(MessageHandler)
59+
.BuildAsync();
60+
}
61+
4362

44-
if (Interlocked.Increment(ref received) % 200_000 == 0)
63+
stats.Start();
64+
_ = Task.Run(async () =>
65+
{
66+
while (stats.IsRunning())
4567
{
46-
DateTime end = DateTime.Now;
47-
Console.WriteLine($"Received Time: {end - start} {received}");
68+
await Task.Delay(1000);
69+
Trace.WriteLine(TraceLevel.Information, $"{stats.Report()}");
4870
}
49-
};
71+
});
5072

51-
IConsumer consumer = await connection.ConsumerBuilder()
52-
.Queue(queueName)
53-
.InitialCredits(1000)
54-
.MessageHandler(MessageHandler)
55-
.Stream().Offset(1).Builder().BuildAsync();
73+
List<Task<PublishResult>> tasks = [];
5674

5775
try
5876
{
59-
int confirmed = 0;
60-
61-
const int total = 1_000_000;
62-
for (int i = 0; i < total; i++)
77+
for (int i = 0; i < (total / tasksSize); i++)
6378
{
6479
try
6580
{
66-
if (i % 200_000 == 0)
81+
for (int j = 0; j < tasksSize; j++)
6782
{
68-
DateTime endp = DateTime.Now;
69-
Console.WriteLine($"Sending Time: {endp - start} - messages {i}");
83+
var message = new AmqpMessage(new byte[10]);
84+
tasks.Add(publisher.PublishAsync(message));
85+
stats.IncrementPublished();
7086
}
7187

72-
var message = new AmqpMessage(new byte[10]);
73-
PublishResult pr = await publisher.PublishAsync(message);
74-
if (pr.Outcome.State == OutcomeState.Accepted)
88+
foreach (var result in await Task.WhenAll(tasks))
7589
{
76-
if (Interlocked.Increment(ref confirmed) % 200_000 != 0)
90+
if (result.Outcome.State == OutcomeState.Accepted)
7791
{
78-
return;
92+
stats.IncrementAccepted();
93+
}
94+
else
95+
{
96+
stats.IncrementFailed();
7997
}
80-
81-
DateTime confirmEnd = DateTime.Now;
82-
Console.WriteLine($"Confirmed Time: {confirmEnd - start} {confirmed}");
83-
}
84-
else
85-
{
86-
Console.WriteLine(
87-
$"outcome result, state: {pr.Outcome.State}, message_id: " +
88-
$"{message.MessageId()}, error: {pr.Outcome.Error}");
8998
}
99+
100+
tasks.Clear();
90101
}
91102
catch (Exception e)
92103
{
93104
Trace.WriteLine(TraceLevel.Error, $"{e.Message}");
94105
}
95106
}
96107

97-
DateTime end = DateTime.Now;
98-
Console.WriteLine($"Total Sent Time: {end - start}");
108+
stats.Stop();
109+
await Task.Delay(1000);
110+
Trace.WriteLine(TraceLevel.Information, $"Consumer: {enableConsumer} - TaskSize: {tasksSize} - {stats.Report(true)}");
99111
}
100112
catch (Exception e)
101113
{
102114
Trace.WriteLine(TraceLevel.Error, $"{e.Message}");
103115
}
116+
finally
117+
{
118+
Console.WriteLine("Press any key to delete the queue and close the connection.");
119+
Console.ReadKey();
104120

105-
Console.WriteLine("Press any key to delete the queue and close the connection.");
106-
Console.ReadKey();
121+
try
122+
{
123+
await publisher.CloseAsync();
124+
publisher.Dispose();
125+
}
126+
catch (Exception ex)
127+
{
128+
Console.WriteLine("[ERROR] unexpected exception while closing publisher: {0}", ex);
129+
}
107130

108-
await publisher.CloseAsync();
109-
publisher.Dispose();
131+
try
132+
{
133+
if (consumer != null)
134+
{
135+
await consumer.CloseAsync();
136+
consumer.Dispose();
137+
}
138+
}
139+
catch (Exception ex)
140+
{
141+
Console.WriteLine("[ERROR] unexpected exception while closing consumer: {0}", ex);
142+
}
110143

111-
await consumer.CloseAsync();
112-
consumer.Dispose();
144+
try
145+
{
146+
await queueSpec.DeleteAsync();
147+
}
148+
catch (Exception ex)
149+
{
150+
Console.WriteLine("[ERROR] unexpected exception while deleting queue: {0}", ex);
151+
}
113152

114-
await queueSpec.DeleteAsync();
115-
await environment.CloseAsync();
153+
try
154+
{
155+
await environment.CloseAsync();
156+
}
157+
catch (Exception ex)
158+
{
159+
Console.WriteLine("[ERROR] unexpected exception while closing environment: {0}", ex);
160+
}
161+
}

0 commit comments

Comments
 (0)