Skip to content

Commit db73768

Browse files
Alex-KumGermanCoding
authored andcommitted
add already exists check to pgmq initialization
1 parent cca3c07 commit db73768

4 files changed

Lines changed: 22 additions & 13 deletions

File tree

src/Motor.Extensions.Hosting.PgMq/PgMqHostBuilderExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public static void AddPgMqWithConfig<T>(this IPublisherBuilder<T> builder, IConf
5252
builder.Configure<PgMqPublisherOptions>(config);
5353
builder.AddPublisher(sp => new PgMqMessageProducer<T>(
5454
MSOptions.Create(options),
55+
sp.GetRequiredService<ILogger<PgMqMessageProducer<T>>>(),
5556
sp.GetRequiredService<IOptions<PublisherOptions>>(),
5657
new NpgmqClient(options.ToConnectionString())
5758
));

src/Motor.Extensions.Hosting.PgMq/PgMqMessageConsumer.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Motor.Extensions.Hosting.CloudEvents;
99
using Motor.Extensions.Hosting.PgMq.Options;
1010
using Npgmq;
11+
using Npgsql;
1112

1213
namespace Motor.Extensions.Hosting.PgMq;
1314

@@ -83,15 +84,13 @@ public async Task StartAsync(CancellationToken token = default)
8384
}
8485
catch (NpgmqException e)
8586
{
86-
// This is possibly a race with another extension initializing in parallel.
87-
// Check if the extension is already present.
88-
try
87+
if (e.InnerException is PostgresException { SqlState: PostgresErrorCodes.UniqueViolation })
8988
{
90-
await _npgmqClient.GetPgmqVersionAsync(token);
89+
_logger.LogWarning(e, "pgmq extension already exists, assuming it was created by another instance");
9190
}
92-
catch (NpgmqException)
91+
else
9392
{
94-
throw e;
93+
throw;
9594
}
9695
}
9796

src/Motor.Extensions.Hosting.PgMq/PgMqMessageProducer.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
using System.Linq;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
56
using Microsoft.Extensions.Options;
67
using Motor.Extensions.Hosting.Abstractions;
78
using Motor.Extensions.Hosting.CloudEvents;
89
using Motor.Extensions.Hosting.PgMq.Options;
910
using Npgmq;
11+
using Npgsql;
1012

1113
namespace Motor.Extensions.Hosting.PgMq;
1214

@@ -23,16 +25,19 @@ public class PgMqMessageProducer<TOutput> : IRawMessagePublisher<TOutput>
2325
where TOutput : notnull
2426
{
2527
private readonly PgMqPublisherOptions _options;
28+
private readonly ILogger<PgMqMessageProducer<TOutput>> _logger;
2629
private readonly PublisherOptions _publisherOptions;
2730
private readonly INpgmqClient _npgmqClient;
2831

2932
public PgMqMessageProducer(
3033
IOptions<PgMqPublisherOptions> options,
34+
ILogger<PgMqMessageProducer<TOutput>> logger,
3135
IOptions<PublisherOptions> publisherOptions,
3236
INpgmqClient npgmqClient
3337
)
3438
{
3539
_options = options.Value;
40+
_logger = logger;
3641
_publisherOptions = publisherOptions.Value;
3742
_npgmqClient = npgmqClient;
3843
}
@@ -62,15 +67,13 @@ public async Task StartAsync(CancellationToken token = default)
6267
}
6368
catch (NpgmqException e)
6469
{
65-
// This is possibly a race with another extension initializing in parallel.
66-
// Check if the extension is already present.
67-
try
70+
if (e.InnerException is PostgresException { SqlState: PostgresErrorCodes.UniqueViolation })
6871
{
69-
await _npgmqClient.GetPgmqVersionAsync(token);
72+
_logger.LogWarning(e, "pgmq extension already exists, assuming it was created by another instance");
7073
}
71-
catch (NpgmqException)
74+
else
7275
{
73-
throw e;
76+
throw;
7477
}
7578
}
7679

test/Motor.Extensions.Hosting.PgMq_IntegrationTest/PgMqIntegrationTests.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,11 @@ await producer.PublishMessageAsync(
320320
Assert.NotNull(requeued);
321321
}
322322

323-
private PgMqMessageProducer<T> GetProducer<T>(string queueName, CloudEventFormat cloudEventFormat)
323+
private PgMqMessageProducer<T> GetProducer<T>(
324+
string queueName,
325+
CloudEventFormat cloudEventFormat,
326+
ILogger<PgMqMessageProducer<T>>? logger = null
327+
)
324328
where T : notnull
325329
{
326330
var options = BuildPgOptions(
@@ -334,8 +338,10 @@ private PgMqMessageProducer<T> GetProducer<T>(string queueName, CloudEventFormat
334338
}
335339
);
336340
var producerOptions = MSOptions.Create(new PublisherOptions { CloudEventFormat = cloudEventFormat });
341+
logger ??= Mock.Of<ILogger<PgMqMessageProducer<T>>>();
337342
return new PgMqMessageProducer<T>(
338343
MSOptions.Create(options),
344+
logger,
339345
producerOptions,
340346
new NpgmqClient(options.ToConnectionString())
341347
);

0 commit comments

Comments
 (0)