Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ private sealed record MessageSource(
// the document and which message it points at.
private readonly record struct ChannelMessageKey(string ChannelId, string MessageName);

// The action + channel-id pair forms an operation's identity in the document.
private readonly record struct OperationKey(string Action, string ChannelId);

private readonly AsyncApiOptions _options;
private readonly IAmASchemaGenerator _schemaGenerator;
Expand Down Expand Up @@ -177,15 +175,20 @@ private async Task ProcessSourceAsync(
CancellationToken ct)
{
var channelId = SanitizeChannelId(source.Address);
var actionString = source.Action == V3OperationAction.Send ? "send" : "receive";

// Dedup by (channel, action). First source wins — covers producer-registry vs
// SupplementalPublications collisions and assembly-scan vs producer-registry
// collisions alike. Same channel + different action (e.g. a subscription and
// a publication on the same routing key) is not a duplicate and proceeds.
if (!context.CoveredChannelActions.Add((channelId, actionString))) return;

EnsureChannel(context.Channels, new ChannelDescriptor(channelId, source.Address));

var messageName = await EnsureMessageForSourceAsync(source, channelId, context.Messages, ct).ConfigureAwait(false);
AddChannelMessageRef(context.Channels, new ChannelMessageKey(channelId, messageName));

var actionString = source.Action == V3OperationAction.Send ? "send" : "receive";
context.CoveredChannelActions.Add((channelId, actionString));

var operationId = GetUniqueOperationId(context.Operations, new OperationKey(actionString, channelId));
var operationId = $"{actionString}_{channelId}";
context.Operations[operationId] = BuildOperation(source.Action, channelId, messageName);
}

Expand Down Expand Up @@ -228,8 +231,7 @@ private async Task AddFromAssemblyScanningAsync(
{
foreach (var (type, topic) in GetPublicationTopicTypes(assembly))
{
if (context.CoveredChannelActions.Contains((SanitizeChannelId(topic), "send"))) continue;

// ProcessSourceAsync dedups by (channel, action) so no early-skip needed here.
await ProcessSourceAsync(
new MessageSource(topic, V3OperationAction.Send, type),
context, ct).ConfigureAwait(false);
Expand Down Expand Up @@ -341,19 +343,6 @@ private static void AddChannelMessageRef(Dictionary<string, V3ChannelDefinition>
});
}

private static string GetUniqueOperationId(Dictionary<string, V3OperationDefinition> operations, OperationKey key)
{
var baseId = $"{key.Action}_{key.ChannelId}";
if (!operations.TryGetValue(baseId, out _))
return baseId;

var counter = 2;
while (operations.TryGetValue($"{baseId}_{counter}", out _))
counter++;

return $"{baseId}_{counter}";
}

private static V3SchemaDefinition EmptyObjectSchema()
{
using var doc = JsonDocument.Parse("{}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ public When_Deduplicating_Channels_And_Messages()
}

[Fact]
public async Task It_Should_Produce_One_Channel_And_Two_Operations_For_Duplicate_Subscriptions()
public async Task It_Should_Dedup_Duplicate_Subscriptions_On_The_Same_Topic()
{
// Two subscriptions for the same routing key collapse into one channel + one
// receive operation. The first subscription wins; the duplicate is dropped
// rather than emitted as a misleading "_2"-suffixed second operation.
var subscriptions = new[]
{
new Subscription(
Expand All @@ -79,16 +82,13 @@ public async Task It_Should_Produce_One_Channel_And_Two_Operations_For_Duplicate
var generator = new AsyncApiDocumentGenerator(_options, _schemaGenerator, subscriptions, null);
var result = await generator.GenerateAsync();

// One channel for the shared topic
Assert.NotNull(result.Channels);
Assert.Single(result.Channels);
Assert.True(result.Channels.ContainsKey("shared_topic"));

// Two receive operations with unique IDs
Assert.NotNull(result.Operations);
Assert.Equal(2, result.Operations.Count);
Assert.Single(result.Operations);
Assert.True(result.Operations.ContainsKey("receive_shared_topic"));
Assert.True(result.Operations.ContainsKey("receive_shared_topic_2"));

// Only one message component
Assert.NotNull(result.Components?.Messages);
Expand Down Expand Up @@ -155,7 +155,9 @@ public async Task It_Should_Produce_One_Message_Component_For_Same_Type_Across_M
[Fact]
public async Task It_Should_Dedup_Producer_Registry_Over_Supplemental_Publications()
{
// Both producer registry and supplemental have the same topic
// Both producer registry and supplemental declare the same (topic, action). The
// producer-registry entry comes first and wins; the supplemental duplicate is
// dropped rather than emitted as a misleading "_2"-suffixed second operation.
var producerPubs = new[]
{
new Publication { Topic = new RoutingKey("dedup.topic"), RequestType = typeof(SharedEvent) }
Expand All @@ -166,16 +168,14 @@ public async Task It_Should_Dedup_Producer_Registry_Over_Supplemental_Publicatio
new Publication { Topic = new RoutingKey("dedup.topic"), RequestType = typeof(SharedEvent) }
};

// Combine: producer registry publications first, supplemental second
var allPubs = producerPubs.Concat(supplementalPubs).ToArray();

var generator = new AsyncApiDocumentGenerator(_options, _schemaGenerator, null, allPubs);
var result = await generator.GenerateAsync();

Assert.NotNull(result.Operations);
// Should have send_dedup_topic and send_dedup_topic_2 since both get added
// The producer registry "wins" by being first; the supplemental gets a unique ID
Assert.Equal(2, result.Operations.Count);
Assert.Single(result.Operations);
Assert.True(result.Operations.ContainsKey("send_dedup_topic"));
}

public class SharedEvent : Event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public async Task It_Should_Let_DI_Source_Win_Dedup()
}

[Fact]
public async Task It_Should_Skip_Assembly_Scanned_Type_When_DI_Has_Uniqueified_Operation_Id()
public async Task It_Should_Collapse_Duplicate_DI_And_Scanned_Publications_To_One_Operation()
{
var options = new AsyncApiOptions
{
Expand All @@ -128,10 +128,11 @@ public async Task It_Should_Skip_Assembly_Scanned_Type_When_DI_Has_Uniqueified_O
AssembliesToScan = new[] { typeof(ScannableEvent).Assembly }
};

// Two DI publications on the same topic as the [PublicationTopic]-decorated ScannableEvent.
// The first gets send_scannable_topic, the second gets send_scannable_topic_2.
// Assembly scanning must still detect that a send operation for scannable_topic
// already exists from DI and skip the scanned type.
// Two DI publications on the same topic as the [PublicationTopic]-decorated
// ScannableEvent. Deduplication is by (channel, action): the first DI
// publication wins, the second DI publication is dropped, and the
// assembly-scanned type is also dropped. The document ends up with a single
// send operation rather than misleading "_2"-suffixed duplicates.
var publications = new[]
{
new Publication
Expand All @@ -150,11 +151,8 @@ public async Task It_Should_Skip_Assembly_Scanned_Type_When_DI_Has_Uniqueified_O
var result = await generator.GenerateAsync();

Assert.NotNull(result.Operations);
// Only the two DI publications should produce operations (send_scannable_topic and send_scannable_topic_2)
// The assembly-scanned type must NOT add a third operation
Assert.Equal(2, result.Operations.Count);
Assert.Single(result.Operations);
Assert.True(result.Operations.ContainsKey("send_scannable_topic"));
Assert.True(result.Operations.ContainsKey("send_scannable_topic_2"));
}

[PublicationTopic("scannable.topic")]
Expand Down
Loading