Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Azure ServiceBus persistent container support #7136

Merged
merged 23 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3be1e3d
Fix Azure ServiceBus persistent container support
sebastienros Jan 17, 2025
86513bf
Refactor state persistence
sebastienros Jan 18, 2025
1e9de41
Fix tests
sebastienros Jan 22, 2025
97ce49c
PR feedback
sebastienros Jan 23, 2025
ac1e1f5
Fix build
sebastienros Jan 24, 2025
3ebd0da
Merge remote-tracking branch 'origin/main' into sebros/sbpersist
sebastienros Jan 28, 2025
4ffcda7
Test AspireStore
sebastienros Jan 29, 2025
650e3df
Refactor KeyValueStore
sebastienros Jan 29, 2025
d929346
Add tests
sebastienros Jan 29, 2025
fcb23e7
Update src/Aspire.Hosting.Azure.ServiceBus/AzureServiceBusExtensions.cs
sebastienros Jan 29, 2025
d031cd2
Use /obj folder to store files
sebastienros Jan 30, 2025
376fcca
Create ResourcesPreparingEvent
sebastienros Jan 31, 2025
5620c61
Merge remote-tracking branch 'origin/main' into sebros/sbpersist
sebastienros Feb 3, 2025
b8a1025
Remove unused AddPersistentParameter
sebastienros Feb 3, 2025
727f0f0
Only fallback folder on ENV
sebastienros Feb 3, 2025
417c0ff
Fix method documentation
sebastienros Feb 3, 2025
663ee7a
Remove newly added event
sebastienros Feb 4, 2025
33868bc
Merge remote-tracking branch 'origin/main' into sebros/sbpersist
sebastienros Feb 4, 2025
ec769ed
Fix tests
sebastienros Feb 4, 2025
e691cb9
Use temp path for store in functional tests
sebastienros Feb 4, 2025
21808c5
PR feedback
sebastienros Feb 4, 2025
ad785ef
Moving things
sebastienros Feb 4, 2025
1bd23ae
Merge remote-tracking branch 'origin/main' into sebros/sbpersist
sebastienros Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 118 additions & 112 deletions src/Aspire.Hosting.Azure.ServiceBus/AzureServiceBusExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ public static IResourceBuilder<AzureServiceBusResource> AddSubscription(this IRe
/// </example>
public static IResourceBuilder<AzureServiceBusResource> RunAsEmulator(this IResourceBuilder<AzureServiceBusResource> builder, Action<IResourceBuilder<AzureServiceBusEmulatorResource>>? configureContainer = null)
{
if (builder.Resource.IsEmulator)
{
throw new InvalidOperationException("The Azure Service Bus resource is already configured to run as an emulator.");
}
Comment on lines +244 to +247
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this override the behavior? Why would it throw?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this there is an exception when we call RunAsEmulator twice (I did this mistake at some point):

Unhandled exception. Aspire.Hosting.DistributedApplicationException: Endpoint with name 'emulator' already exists. Endpoint name may not have been explicitly specified and was derived automatically from scheme argument (e.g. 'http', 'https', or 'tcp'). Multiple calls to WithEndpoint (and related methods) may result in a conflict if name argument is not specified. Each endpoint must have a unique name. For more information on networking in .NET Aspire see: https://aka.ms/dotnet/aspire/networking

It took me a while to realize the problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want to handle this differently but this is ok for now.


if (builder.ApplicationBuilder.ExecutionContext.IsPublishMode)
{
return builder;
Expand Down Expand Up @@ -275,9 +280,6 @@ public static IResourceBuilder<AzureServiceBusResource> RunAsEmulator(this IReso

var aspireStore = AspireStore.Create(builder.ApplicationBuilder);

// Deterministic file path for the configuration file
var configHostFile = aspireStore.GetOrCreateFile($"{builder.Resource.Name}-Config.json");

if (configureContainer != null)
{
var surrogate = new AzureServiceBusEmulatorResource(builder.Resource);
Expand All @@ -292,131 +294,35 @@ public static IResourceBuilder<AzureServiceBusResource> RunAsEmulator(this IReso

sqlEdgeResource = sqlEdgeResource.WithLifetime(lifetime);

var defaultConfigFileMount = new ContainerMountAnnotation(
configHostFile,
AzureServiceBusEmulatorResource.EmulatorConfigJsonPath,
ContainerMountType.BindMount,
isReadOnly: true);

var hasCustomConfigJson = builder.Resource.Annotations.OfType<ContainerMountAnnotation>().Any(v => v.Target == AzureServiceBusEmulatorResource.EmulatorConfigJsonPath);

if (!hasCustomConfigJson)
{
builder.WithAnnotation(defaultConfigFileMount);
}
// RunAsEmulator() can be followed by custom model configuration so we need to delay the creation of the Config.json file
// until all resources are about to be prepared and annotations can't be updated anymore.

builder.ApplicationBuilder.Eventing.Subscribe<BeforeResourceStartedEvent>(builder.Resource, async (@event, ct) =>
builder.ApplicationBuilder.Eventing.Subscribe<BeforeResourcesPreparedEvent>((@event, ct) =>
{
var serviceBusEmulatorResources = builder.ApplicationBuilder.Resources.OfType<AzureServiceBusResource>().Where(x => x is { } serviceBusResource && serviceBusResource.IsEmulator);

if (!serviceBusEmulatorResources.Any())
{
// No-op if there is no Azure Service Bus emulator resource.
return;
}
// Create JSON configuration file

var connectionString = await builder.Resource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
var hasCustomConfigJson = builder.Resource.Annotations.OfType<ContainerMountAnnotation>().Any(v => v.Target == AzureServiceBusEmulatorResource.EmulatorConfigJsonPath);

if (connectionString == null)
if (hasCustomConfigJson)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{builder.Resource.Name}' resource but the connection string was null.");
return Task.CompletedTask;
}

// Retrieve a queue/topic name to configure the health check

var noRetryOptions = new ServiceBusClientOptions { RetryOptions = new ServiceBusRetryOptions { MaxRetries = 0 } };
serviceBusClient = new ServiceBusClient(connectionString, noRetryOptions);

queueOrTopicName =
serviceBusEmulatorResources.SelectMany(x => x.Queues).Select(x => x.Name).FirstOrDefault()
?? serviceBusEmulatorResources.SelectMany(x => x.Topics).Select(x => x.Name).FirstOrDefault();

// Create JSON configuration file
// Create Config.json file content and its alterations in a temporary file
var tempConfigFile = WriteEmulatorConfigJson(builder.Resource);

foreach (var emulatorResource in serviceBusEmulatorResources)
try
{
var configFileMount = emulatorResource.Annotations.OfType<ContainerMountAnnotation>().Single(v => v.Target == AzureServiceBusEmulatorResource.EmulatorConfigJsonPath);

// If there is a custom mount for EmulatorConfigJsonPath we don't need to create the Config.json file.
if (configFileMount != defaultConfigFileMount)
{
continue;
}

// Truncate the file since we are going to write to it.
var fileStreamOptions = new FileStreamOptions() { Mode = FileMode.Truncate, Access = FileAccess.Write };

using (var stream = new FileStream(configFileMount.Source!, fileStreamOptions))
{
using var writer = new Utf8JsonWriter(stream, new JsonWriterOptions { Indented = true });

writer.WriteStartObject(); // {
writer.WriteStartObject("UserConfig"); // "UserConfig": {
writer.WriteStartArray("Namespaces"); // "Namespaces": [
writer.WriteStartObject(); // {
writer.WriteString("Name", emulatorResource.Name);
writer.WriteStartArray("Queues"); // "Queues": [

foreach (var queue in emulatorResource.Queues)
{
writer.WriteStartObject();
queue.WriteJsonObjectProperties(writer);
writer.WriteEndObject();
}

writer.WriteEndArray(); // ] (/Queues)

writer.WriteStartArray("Topics"); // "Topics": [
foreach (var topic in emulatorResource.Topics)
{
writer.WriteStartObject(); // "{ (Topic)"
topic.WriteJsonObjectProperties(writer);

writer.WriteStartArray("Subscriptions"); // "Subscriptions": [
foreach (var subscription in topic.Subscriptions)
{
writer.WriteStartObject(); // "{ (Subscription)"
subscription.WriteJsonObjectProperties(writer);

writer.WriteStartArray("Rules"); // "Rules": [
foreach (var rule in subscription.Rules)
{
writer.WriteStartObject();
rule.WriteJsonObjectProperties(writer);
writer.WriteEndObject();
}

writer.WriteEndArray(); // ] (/Rules)

writer.WriteEndObject(); // } (/Subscription)
}

writer.WriteEndArray(); // ] (/Subscriptions)

writer.WriteEndObject(); // } (/Topic)
}
writer.WriteEndArray(); // ] (/Topics)

writer.WriteEndObject(); // } (/Namespace)
writer.WriteEndArray(); // ], (/Namespaces)
writer.WriteStartObject("Logging"); // "Logging": {
writer.WriteString("Type", "File"); // "Type": "File"
writer.WriteEndObject(); // } (/LoggingConfig)

writer.WriteEndObject(); // } (/UserConfig)
writer.WriteEndObject(); // } (/Root)
}

// Apply ConfigJsonAnnotation modifications
var configJsonAnnotations = emulatorResource.Annotations.OfType<ConfigJsonAnnotation>();
var configJsonAnnotations = builder.Resource.Annotations.OfType<ConfigJsonAnnotation>();

foreach (var annotation in configJsonAnnotations)
{
using var readStream = new FileStream(configFileMount.Source!, FileMode.Open, FileAccess.Read);
using var readStream = new FileStream(tempConfigFile, FileMode.Open, FileAccess.Read);
var jsonObject = JsonNode.Parse(readStream);
readStream.Close();

using var writeStream = new FileStream(configFileMount.Source!, FileMode.Open, FileAccess.Write);
using var writeStream = new FileStream(tempConfigFile, FileMode.Open, FileAccess.Write);
using var writer = new Utf8JsonWriter(writeStream, new JsonWriterOptions { Indented = true });

if (jsonObject == null)
Expand All @@ -426,7 +332,41 @@ public static IResourceBuilder<AzureServiceBusResource> RunAsEmulator(this IReso
annotation.Configure(jsonObject);
jsonObject.WriteTo(writer);
}

// Deterministic file path for the configuration file based on its content
var configJsonPath = aspireStore.GetFileNameWithContent($"{builder.Resource.Name}-Config.json", tempConfigFile);

builder.WithAnnotation(new ContainerMountAnnotation(
configJsonPath,
AzureServiceBusEmulatorResource.EmulatorConfigJsonPath,
ContainerMountType.BindMount,
isReadOnly: true));
}
finally
{
File.Delete(tempConfigFile);
}

return Task.CompletedTask;
});

builder.ApplicationBuilder.Eventing.Subscribe<BeforeResourceStartedEvent>(builder.Resource, async (@event, ct) =>
{
var connectionString = await builder.Resource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);

if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{builder.Resource.Name}' resource but the connection string was null.");
}

// Retrieve a queue/topic name to configure the health check

var noRetryOptions = new ServiceBusClientOptions { RetryOptions = new ServiceBusRetryOptions { MaxRetries = 0 } };
serviceBusClient = new ServiceBusClient(connectionString, noRetryOptions);

queueOrTopicName =
builder.Resource.Queues.Select(x => x.Name).FirstOrDefault()
?? builder.Resource.Topics.Select(x => x.Name).FirstOrDefault();
});

var healthCheckKey = $"{builder.Resource.Name}_check";
Expand Down Expand Up @@ -497,4 +437,70 @@ public static IResourceBuilder<AzureServiceBusEmulatorResource> WithHostPort(thi
endpoint.Port = port;
});
}

private static string WriteEmulatorConfigJson(AzureServiceBusResource emulatorResource)
{
var filePath = Path.GetTempFileName();

using var stream = new FileStream(filePath, FileMode.Open, FileAccess.Write);
using var writer = new Utf8JsonWriter(stream, new JsonWriterOptions { Indented = true });

writer.WriteStartObject(); // {
writer.WriteStartObject("UserConfig"); // "UserConfig": {
writer.WriteStartArray("Namespaces"); // "Namespaces": [
writer.WriteStartObject(); // {
writer.WriteString("Name", emulatorResource.Name);
writer.WriteStartArray("Queues"); // "Queues": [

foreach (var queue in emulatorResource.Queues)
{
writer.WriteStartObject();
queue.WriteJsonObjectProperties(writer);
writer.WriteEndObject();
}

writer.WriteEndArray(); // ] (/Queues)

writer.WriteStartArray("Topics"); // "Topics": [
foreach (var topic in emulatorResource.Topics)
{
writer.WriteStartObject(); // "{ (Topic)"
topic.WriteJsonObjectProperties(writer);

writer.WriteStartArray("Subscriptions"); // "Subscriptions": [
foreach (var subscription in topic.Subscriptions)
{
writer.WriteStartObject(); // "{ (Subscription)"
subscription.WriteJsonObjectProperties(writer);

writer.WriteStartArray("Rules"); // "Rules": [
foreach (var rule in subscription.Rules)
{
writer.WriteStartObject();
rule.WriteJsonObjectProperties(writer);
writer.WriteEndObject();
}

writer.WriteEndArray(); // ] (/Rules)

writer.WriteEndObject(); // } (/Subscription)
}

writer.WriteEndArray(); // ] (/Subscriptions)

writer.WriteEndObject(); // } (/Topic)
}
writer.WriteEndArray(); // ] (/Topics)

writer.WriteEndObject(); // } (/Namespace)
writer.WriteEndArray(); // ], (/Namespaces)
writer.WriteStartObject("Logging"); // "Logging": {
writer.WriteString("Type", "File"); // "Type": "File"
writer.WriteEndObject(); // } (/LoggingConfig)

writer.WriteEndObject(); // } (/UserConfig)
writer.WriteEndObject(); // } (/Root)

return filePath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.Eventing;

namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// This event is raised by orchestrators before the resources are prepared.
/// </summary>
public class BeforeResourcesPreparedEvent() : IDistributedApplicationEvent
{
}
2 changes: 2 additions & 0 deletions src/Aspire.Hosting/Dcp/DcpExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public async Task RunApplicationAsync(CancellationToken cancellationToken = defa

try
{
await _executorEvents.PublishAsync(new OnResourcesPreparingContext(cancellationToken)).ConfigureAwait(false);

PrepareServices();
PrepareContainers();
PrepareExecutables();
Expand Down
1 change: 1 addition & 0 deletions src/Aspire.Hosting/Dcp/DcpExecutorEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Aspire.Hosting.Dcp;
internal record ResourceStatus(string? State, DateTime? StartupTimestamp, DateTime? FinishedTimestamp);
internal record OnEndpointsAllocatedContext(CancellationToken CancellationToken);
internal record OnResourceStartingContext(CancellationToken CancellationToken, string ResourceType, IResource Resource, string? DcpResourceName);
internal record OnResourcesPreparingContext(CancellationToken CancellationToken);
internal record OnResourcesPreparedContext(CancellationToken CancellationToken);
internal record OnResourceChangedContext(CancellationToken CancellationToken, string ResourceType, IResource Resource, string DcpResourceName, ResourceStatus Status, Func<CustomResourceSnapshot, CustomResourceSnapshot> UpdateSnapshot);
internal record OnResourceFailedToStartContext(CancellationToken CancellationToken, string ResourceType, IResource Resource, string? DcpResourceName);
Expand Down
7 changes: 7 additions & 0 deletions src/Aspire.Hosting/Orchestrator/ApplicationOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public ApplicationOrchestrator(DistributedApplicationModel model,

dcpExecutorEvents.Subscribe<OnEndpointsAllocatedContext>(OnEndpointsAllocated);
dcpExecutorEvents.Subscribe<OnResourceStartingContext>(OnResourceStarting);
dcpExecutorEvents.Subscribe<OnResourcesPreparingContext>(OnResourcesPreparing);
dcpExecutorEvents.Subscribe<OnResourcesPreparedContext>(OnResourcesPrepared);
dcpExecutorEvents.Subscribe<OnResourceChangedContext>(OnResourceChanged);
dcpExecutorEvents.Subscribe<OnResourceFailedToStartContext>(OnResourceFailedToStart);
Expand Down Expand Up @@ -94,6 +95,12 @@ await _notificationService.PublishUpdateAsync(context.Resource, s => s with
await _eventing.PublishAsync(beforeResourceStartedEvent, context.CancellationToken).ConfigureAwait(false);
}

private async Task OnResourcesPreparing(OnResourcesPreparingContext context)
{
var beforeResourcePreparedEvent = new BeforeResourcesPreparedEvent();
await _eventing.PublishAsync(beforeResourcePreparedEvent, context.CancellationToken).ConfigureAwait(false);
}

private async Task OnResourcesPrepared(OnResourcesPreparedContext _)
{
await PublishResourcesWithInitialStateAsync().ConfigureAwait(false);
Expand Down
2 changes: 2 additions & 0 deletions src/Aspire.Hosting/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#nullable enable
Aspire.Hosting.ApplicationModel.BeforeResourcesPreparedEvent
Aspire.Hosting.ApplicationModel.BeforeResourcesPreparedEvent.BeforeResourcesPreparedEvent() -> void
Aspire.Hosting.ApplicationModel.ContainerLifetime.Session = 0 -> Aspire.Hosting.ApplicationModel.ContainerLifetime
Aspire.Hosting.ApplicationModel.CustomResourceSnapshot.HealthStatus.get -> Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus?
Aspire.Hosting.ApplicationModel.CustomResourceSnapshot.Relationships.get -> System.Collections.Immutable.ImmutableArray<Aspire.Hosting.ApplicationModel.RelationshipSnapshot!>
Expand Down
Loading
Loading