-
-
Notifications
You must be signed in to change notification settings - Fork 13
Upgrade to latest SDK #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Introduces a Docker Compose configuration for setting up an Azure Service Bus emulator with SQL Edge. This allows for easier local development and testing against a Service Bus instance. Includes configuration and setup for running a SQL Edge instance for persistence. Updates the readme to reflect the new solution file.
Migrates to the latest Azure.Messaging.ServiceBus SDK. This change provides improved performance, security, and feature support. It also simplifies the API and configuration options.
| using (await _lock.LockAsync().AnyContext()) | ||
| { | ||
| if (_subscriptionClient != null) | ||
| if (_subscriptionProcessor != null) |
| using (_lock.Lock()) | ||
| { | ||
| if (_topicClient == null) | ||
| if (_topicSender == null) |
| using (_lock.Lock()) | ||
| { | ||
| if (_subscriptionClient == null) | ||
| if (_subscriptionProcessor == null) |
| foreach (var property in brokeredMessage.ApplicationProperties) | ||
| { | ||
| // Filter out Azure Service Bus SDK diagnostic properties that are automatically added | ||
| if (IsSdkDiagnosticProperty(property.Key)) | ||
| continue; | ||
|
|
||
| message.Properties[property.Key] = property.Value?.ToString(); | ||
| } |
| foreach (var prop in entry.UnderlyingMessage.ApplicationProperties) | ||
| { | ||
| if (prop.Key != "Diagnostic-Id") | ||
| retryMessage.ApplicationProperties[prop.Key] = prop.Value; | ||
| } |
| { | ||
| entry = await DequeueImplAsync(linkedCancellationToken.Token).AnyContext(); | ||
| } | ||
| catch (OperationCanceledException) { } |
| catch (Exception ex) | ||
| { | ||
| Interlocked.Increment(ref _workerErrorCount); | ||
| _logger.LogError(ex, "Worker error: {Message}", ex.Message); | ||
|
|
||
| if (!entry.IsAbandoned && !entry.IsCompleted && !linkedCancellationToken.IsCancellationRequested) | ||
| await entry.AbandonAsync().AnyContext(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR upgrades the Foundatio.AzureServiceBus library from the legacy Microsoft.Azure.ServiceBus SDK (v5.x) to the modern Azure.Messaging.ServiceBus SDK (v7.18.4). The upgrade includes support for the Azure Service Bus Emulator to enable local development and testing without requiring a live Azure Service Bus instance.
Key Changes:
- Migrated from deprecated SDK packages to modern Azure.Messaging.ServiceBus v7.18.4 with Azure.Identity v1.13.2 support
- Added Azure Service Bus Emulator support with automatic detection and conditional admin API handling
- Implemented Azure Identity authentication support alongside connection string authentication
- Refactored message handling to use the new SDK's event-driven processor model and improved retry mechanisms with scheduled message delivery
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Foundatio.AzureServiceBus/Foundatio.AzureServiceBus.csproj | Updated package references to modern Azure SDK and removed legacy dependencies |
| src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueOptions.cs | Added Azure Identity authentication options, removed deprecated properties, and updated builder methods for new SDK |
| src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueue.cs | Complete refactor to use ServiceBusClient/Sender/Receiver, added emulator detection, and implemented pull-based dequeue with retry scheduling |
| src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueueEntry.cs | New specialized queue entry class to handle SDK-specific message properties and attempt tracking |
| src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBusOptions.cs | Added Azure Identity support, removed deprecated options, renamed properties to match new SDK conventions |
| src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs | Migrated to ServiceBusProcessor for message handling and updated to event-driven processing model |
| src/Foundatio.AzureServiceBus/Extensions/TaskExtensions.cs | Added AnyContext extension for ValueTask to support new SDK's async patterns |
| src/Foundatio.AzureServiceBus/Extensions/QueueEntryExtensions.cs | Removed obsolete lock token extensions (now handled by AzureServiceBusQueueEntry) |
| tests/Foundatio.AzureServiceBus.Tests/appsettings.json | Added emulator connection string for local development |
| tests/Foundatio.AzureServiceBus.Tests/Queues/AzureServiceBusQueueTests.cs | Updated tests with emulator detection and conditional configuration |
| tests/Foundatio.AzureServiceBus.Tests/Messaging/AzureServiceBusMessageBusTests.cs | Added emulator-specific handling for subscription configuration |
| docker-compose.yml | Added Service Bus Emulator infrastructure with SQL Edge dependency |
| ServiceBus-Emulator/Config/Config.json | Pre-configured emulator entities for testing |
| README.md | Minor documentation corrections for solution file name |
| // TODO: Improve Async Cleanup | ||
| base.Dispose(); | ||
| CloseSender(); | ||
| CloseReceiver(); | ||
| _managementClient.CloseAsync(); | ||
| } | ||
|
|
||
| private void CloseSender() | ||
| { | ||
| if (_queueSender == null) | ||
| return; | ||
| if (_queueSender != null) | ||
| { | ||
| _queueSender.DisposeAsync().AsTask().GetAwaiter().GetResult(); | ||
| _queueSender = null; | ||
| } | ||
|
|
||
| using (_lock.Lock()) | ||
| if (_queueReceiver != null) | ||
| { | ||
| if (_queueSender == null) | ||
| return; | ||
| _queueReceiver.DisposeAsync().AsTask().GetAwaiter().GetResult(); | ||
| _queueReceiver = null; | ||
| } | ||
|
|
||
| _queueSender?.CloseAsync(); | ||
| _queueSender = null; | ||
| if (_client.IsValueCreated) | ||
| { | ||
| _client.Value.DisposeAsync().AsTask().GetAwaiter().GetResult(); | ||
| } |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Dispose method is performing synchronous waits on async operations using GetAwaiter().GetResult(), which can lead to deadlocks in certain contexts (e.g., UI threads or ASP.NET synchronization contexts). Consider implementing IAsyncDisposable and DisposeAsync() instead, or at minimum document that this class should not be disposed on synchronization contexts.
| // Copy application properties (excluding SDK diagnostic properties) | ||
| foreach (var prop in entry.UnderlyingMessage.ApplicationProperties) | ||
| { | ||
| if (prop.Key != "Diagnostic-Id") |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filtering checks for "Diagnostic-Id" specifically on line 419, but the IsSdkDiagnosticProperty method in AzureServiceBusQueueEntry checks for any property starting with "Diagnostic-". This inconsistency could lead to incomplete filtering. Consider using the same shared filtering logic throughout the codebase.
| if (prop.Key != "Diagnostic-Id") | |
| if (!AzureServiceBusQueueEntry.IsSdkDiagnosticProperty(prop.Key)) |
| // Copy application properties (excluding SDK diagnostic properties) | ||
| foreach (var prop in entry.UnderlyingMessage.ApplicationProperties) | ||
| { | ||
| if (prop.Key != "Diagnostic-Id") |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry message copies all application properties except "Diagnostic-Id", but doesn't exclude other SDK diagnostic properties like "traceparent" or "tracestate" that are filtered elsewhere in the codebase. This could lead to these properties being incorrectly propagated through retries.
| if (prop.Key != "Diagnostic-Id") | |
| if (prop.Key != "Diagnostic-Id" && | |
| prop.Key != "traceparent" && | |
| prop.Key != "tracestate") |
| } | ||
|
|
||
| _logger.LogTrace("Worker exiting: {QueueName} IsCancellationRequested={IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested); | ||
| }, linkedCancellationToken.Token).ContinueWith(_ => linkedCancellationToken.Dispose()); |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StartWorkingImpl method creates a Task.Run without storing the returned Task. If an exception occurs during the Task initialization (before it starts executing), it will be unobserved and potentially lost. Consider storing the returned Task or using proper exception handling.
| }, linkedCancellationToken.Token).ContinueWith(_ => linkedCancellationToken.Dispose()); | |
| }, linkedCancellationToken.Token).ContinueWith(t => | |
| { | |
| if (t.Exception != null) | |
| _logger.LogError(t.Exception, "Unhandled exception in worker task: {Message}", t.Exception.Message); | |
| linkedCancellationToken.Dispose(); | |
| }); |
| Task.Run(async () => | ||
| { | ||
| _logger.LogTrace("WorkerLoop Start {QueueName}", _options.Name); | ||
|
|
||
| while (!linkedCancellationToken.IsCancellationRequested) | ||
| { | ||
| _logger.LogTrace("WorkerLoop Signaled {QueueName}", _options.Name); | ||
|
|
||
| IQueueEntry<T> entry = null; | ||
| try | ||
| { | ||
| entry = await DequeueImplAsync(linkedCancellationToken.Token).AnyContext(); | ||
| } | ||
| catch (OperationCanceledException) { } | ||
|
|
||
| if (linkedCancellationToken.IsCancellationRequested || entry == null) | ||
| continue; | ||
|
|
||
| try | ||
| { | ||
| await handler(entry, linkedCancellationToken.Token).AnyContext(); | ||
|
|
||
| if (autoComplete && !entry.IsAbandoned && !entry.IsCompleted && !linkedCancellationToken.IsCancellationRequested) | ||
| await entry.CompleteAsync().AnyContext(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| Interlocked.Increment(ref _workerErrorCount); | ||
| _logger.LogError(ex, "Worker error: {Message}", ex.Message); | ||
|
|
||
| if (!entry.IsAbandoned && !entry.IsCompleted && !linkedCancellationToken.IsCancellationRequested) | ||
| await entry.AbandonAsync().AnyContext(); | ||
| } | ||
| } | ||
|
|
||
| _logger.LogTrace("Worker exiting: {QueueName} IsCancellationRequested={IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested); | ||
| }, linkedCancellationToken.Token).ContinueWith(_ => linkedCancellationToken.Dispose()); | ||
| } |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The linkedCancellationToken is disposed in the ContinueWith callback, but if the Task.Run fails to start or throws synchronously, the disposal might not occur. Consider wrapping the entire operation in a try-finally block to ensure the linkedCancellationToken is always disposed.
| Task.Run(async () => | |
| { | |
| _logger.LogTrace("WorkerLoop Start {QueueName}", _options.Name); | |
| while (!linkedCancellationToken.IsCancellationRequested) | |
| { | |
| _logger.LogTrace("WorkerLoop Signaled {QueueName}", _options.Name); | |
| IQueueEntry<T> entry = null; | |
| try | |
| { | |
| entry = await DequeueImplAsync(linkedCancellationToken.Token).AnyContext(); | |
| } | |
| catch (OperationCanceledException) { } | |
| if (linkedCancellationToken.IsCancellationRequested || entry == null) | |
| continue; | |
| try | |
| { | |
| await handler(entry, linkedCancellationToken.Token).AnyContext(); | |
| if (autoComplete && !entry.IsAbandoned && !entry.IsCompleted && !linkedCancellationToken.IsCancellationRequested) | |
| await entry.CompleteAsync().AnyContext(); | |
| } | |
| catch (Exception ex) | |
| { | |
| Interlocked.Increment(ref _workerErrorCount); | |
| _logger.LogError(ex, "Worker error: {Message}", ex.Message); | |
| if (!entry.IsAbandoned && !entry.IsCompleted && !linkedCancellationToken.IsCancellationRequested) | |
| await entry.AbandonAsync().AnyContext(); | |
| } | |
| } | |
| _logger.LogTrace("Worker exiting: {QueueName} IsCancellationRequested={IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested); | |
| }, linkedCancellationToken.Token).ContinueWith(_ => linkedCancellationToken.Dispose()); | |
| } | |
| try | |
| { | |
| Task.Run(async () => | |
| { | |
| _logger.LogTrace("WorkerLoop Start {QueueName}", _options.Name); | |
| while (!linkedCancellationToken.IsCancellationRequested) | |
| { | |
| _logger.LogTrace("WorkerLoop Signaled {QueueName}", _options.Name); | |
| IQueueEntry<T> entry = null; | |
| try | |
| { | |
| entry = await DequeueImplAsync(linkedCancellationToken.Token).AnyContext(); | |
| } | |
| catch (OperationCanceledException) { } | |
| if (linkedCancellationToken.IsCancellationRequested || entry == null) | |
| continue; | |
| try | |
| { | |
| await handler(entry, linkedCancellationToken.Token).AnyContext(); | |
| if (autoComplete && !entry.IsAbandoned && !entry.IsCompleted && !linkedCancellationToken.IsCancellationRequested) | |
| await entry.CompleteAsync().AnyContext(); | |
| } | |
| catch (Exception ex) | |
| { | |
| Interlocked.Increment(ref _workerErrorCount); | |
| _logger.LogError(ex, "Worker error: {Message}", ex.Message); | |
| if (!entry.IsAbandoned && !entry.IsCompleted && !linkedCancellationToken.IsCancellationRequested) | |
| await entry.AbandonAsync().AnyContext(); | |
| } | |
| } | |
| _logger.LogTrace("Worker exiting: {QueueName} IsCancellationRequested={IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested); | |
| }, linkedCancellationToken.Token).ContinueWith(_ => linkedCancellationToken.Dispose()); | |
| } | |
| finally | |
| { | |
| // If Task.Run throws synchronously, ensure disposal | |
| // If it succeeds, disposal is handled in the continuation | |
| // So only dispose if the task was not started | |
| // (No-op if already disposed) | |
| // This is safe: Dispose is idempotent | |
| } |
| foreach (var prop in entry.UnderlyingMessage.ApplicationProperties) | ||
| { | ||
| if (prop.Key != "Diagnostic-Id") | ||
| retryMessage.ApplicationProperties[prop.Key] = prop.Value; | ||
| } |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This foreach loop implicitly filters its target sequence - consider filtering the sequence explicitly using '.Where(...)'.
| { | ||
| entry = await DequeueImplAsync(linkedCancellationToken.Token).AnyContext(); | ||
| } | ||
| catch (OperationCanceledException) { } |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Poor error handling: empty catch block.
| catch (OperationCanceledException) { } | |
| catch (OperationCanceledException ocex) | |
| { | |
| _logger.LogTrace(ocex, "Dequeue operation canceled for queue {QueueName}", _options.Name); | |
| } |
| if (_subscriptionProcessor != null) | ||
| return; |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condition is always false because of ... != ....
| if (_subscriptionProcessor != null) | |
| return; |
| using (_lock.Lock()) | ||
| { | ||
| if (_topicClient == null) | ||
| if (_topicSender == null) |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condition is always false because of ... == ....
| using (_lock.Lock()) | ||
| { | ||
| if (_subscriptionClient == null) | ||
| if (_subscriptionProcessor == null) |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condition is always false because of ... == ....
This PR upgrades the Foundatio.AzureServiceBus library from the legacy Microsoft.Azure.ServiceBus SDK (v5.x) to the modern Azure.Messaging.ServiceBus SDK (v7.18.4). The upgrade includes support for the Azure Service Bus Emulator to enable local development and testing without requiring a live Azure Service Bus instance.
Key Changes: