22using System . Linq ;
33using System . Threading ;
44using System . Threading . Tasks ;
5+ using Microsoft . Extensions . Logging ;
56using Microsoft . Extensions . Options ;
67using Motor . Extensions . Hosting . Abstractions ;
78using Motor . Extensions . Hosting . CloudEvents ;
89using Motor . Extensions . Hosting . PgMq . Options ;
910using Npgmq ;
11+ using Npgsql ;
1012
1113namespace Motor . Extensions . Hosting . PgMq ;
1214
@@ -23,16 +25,19 @@ public class PgMqMessageProducer<TOutput> : IRawMessagePublisher<TOutput>
2325 where TOutput : notnull
2426{
2527 private readonly PgMqPublisherOptions _options ;
28+ private readonly ILogger < PgMqMessageProducer < TOutput > > _logger ;
2629 private readonly PublisherOptions _publisherOptions ;
2730 private readonly INpgmqClient _npgmqClient ;
2831
2932 public PgMqMessageProducer (
3033 IOptions < PgMqPublisherOptions > options ,
34+ ILogger < PgMqMessageProducer < TOutput > > logger ,
3135 IOptions < PublisherOptions > publisherOptions ,
3236 INpgmqClient npgmqClient
3337 )
3438 {
3539 _options = options . Value ;
40+ _logger = logger ;
3641 _publisherOptions = publisherOptions . Value ;
3742 _npgmqClient = npgmqClient ;
3843 }
@@ -62,15 +67,13 @@ public async Task StartAsync(CancellationToken token = default)
6267 }
6368 catch ( NpgmqException e )
6469 {
65- // This is possibly a race with another extension initializing in parallel.
66- // Check if the extension is already present.
67- try
70+ if ( e . InnerException is PostgresException { SqlState : PostgresErrorCodes . DuplicateObject } )
6871 {
69- await _npgmqClient . GetPgmqVersionAsync ( token ) ;
72+ _logger . LogWarning ( e , "pgmq extension already exists, assuming it was created by another instance" ) ;
7073 }
71- catch ( NpgmqException )
74+ else
7275 {
73- throw e ;
76+ throw ;
7477 }
7578 }
7679
0 commit comments