Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 53 additions & 16 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TaskStatusSender", "samples
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.RocketMQ.Tests", "tests\Paramore.Brighter.RocketMQ.Tests\Paramore.Brighter.RocketMQ.Tests\Paramore.Brighter.RocketMQ.Tests.csproj", "{9063F17B-5636-4AD5-999B-C894517DB5FD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MessagingGateway.Pulsar", "src\Paramore.Brighter.MessagingGateway.Pulsar\Paramore.Brighter.MessagingGateway.Pulsar.csproj", "{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MessagingGateway.Pulsar.Tests", "tests\Paramore.Brighter.MessagingGateway.Pulsar.Tests\Paramore.Brighter.MessagingGateway.Pulsar.Tests.csproj", "{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -2046,6 +2050,18 @@ Global
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|x86.ActiveCfg = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|x86.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|x86.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|x86.Build.0 = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Any CPU.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|x86.ActiveCfg = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Release|x86.Build.0 = Release|Any CPU
{022AD920-4E8D-4370-9C6D-CA4D8DA3DB6F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{022AD920-4E8D-4370-9C6D-CA4D8DA3DB6F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{022AD920-4E8D-4370-9C6D-CA4D8DA3DB6F}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2274,6 +2290,18 @@ Global
{D530B147-067A-408D-BB1B-A4290324012F}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{D530B147-067A-408D-BB1B-A4290324012F}.Release|x86.ActiveCfg = Release|Any CPU
{D530B147-067A-408D-BB1B-A4290324012F}.Release|x86.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.Build.0 = Release|Any CPU
{7AA5B0BF-3520-45C4-9B8A-7F131EFDA227}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7AA5B0BF-3520-45C4-9B8A-7F131EFDA227}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7AA5B0BF-3520-45C4-9B8A-7F131EFDA227}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2406,18 +2434,6 @@ Global
{3B6D084F-C034-49C6-A8C4-3C23DCC83CF2}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{3B6D084F-C034-49C6-A8C4-3C23DCC83CF2}.Release|x86.ActiveCfg = Release|Any CPU
{3B6D084F-C034-49C6-A8C4-3C23DCC83CF2}.Release|x86.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Any CPU.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.ActiveCfg = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Debug|x86.Build.0 = Debug|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Any CPU.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.ActiveCfg = Release|Any CPU
{62D9AF6E-B671-4444-82A4-8416B8049E14}.Release|x86.Build.0 = Release|Any CPU
{4EA5F196-DDA8-4941-956B-D413B03051C9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4EA5F196-DDA8-4941-956B-D413B03051C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4EA5F196-DDA8-4941-956B-D413B03051C9}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2490,8 +2506,6 @@ Global
{0E6A0B80-58B7-4AA2-9E40-EE0AA5D4719E}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{0E6A0B80-58B7-4AA2-9E40-EE0AA5D4719E}.Release|x86.ActiveCfg = Release|Any CPU
{0E6A0B80-58B7-4AA2-9E40-EE0AA5D4719E}.Release|x86.Build.0 = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Any CPU.Build.0 = Debug|Any CPU
{79CA356E-B08C-4D88-88C9-653EC8D8BF4D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{79CA356E-B08C-4D88-88C9-653EC8D8BF4D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{79CA356E-B08C-4D88-88C9-653EC8D8BF4D}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -2528,8 +2542,6 @@ Global
{24360989-A956-45E9-BF07-7FD9E7553C7D}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{24360989-A956-45E9-BF07-7FD9E7553C7D}.Release|x86.ActiveCfg = Release|Any CPU
{24360989-A956-45E9-BF07-7FD9E7553C7D}.Release|x86.Build.0 = Release|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D7DFB30-1E56-46C7-A56E-2FBC6C58EB96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
Expand All @@ -2542,6 +2554,30 @@ Global
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Release|x86.ActiveCfg = Release|Any CPU
{9063F17B-5636-4AD5-999B-C894517DB5FD}.Release|x86.Build.0 = Release|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Debug|x86.ActiveCfg = Debug|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Debug|x86.Build.0 = Debug|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Release|Any CPU.Build.0 = Release|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Release|x86.ActiveCfg = Release|Any CPU
{525E736D-A262-8E2E-0C85-1A9B1C9BD3F3}.Release|x86.Build.0 = Release|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Debug|x86.ActiveCfg = Debug|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Debug|x86.Build.0 = Debug|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Release|Any CPU.Build.0 = Release|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Release|x86.ActiveCfg = Release|Any CPU
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -2681,6 +2717,7 @@ Global
{43F23C67-6C9D-44BB-B3A8-9313E570879D} = {9347BD22-1E8A-4A79-9D86-D76D076E566F}
{24360989-A956-45E9-BF07-7FD9E7553C7D} = {9347BD22-1E8A-4A79-9D86-D76D076E566F}
{9063F17B-5636-4AD5-999B-C894517DB5FD} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
{44DBA505-FB28-428F-AD4F-3BBF3BD12DB3} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8B7C7E31-2E32-4E0D-9426-BC9AF22E9F4C}
Expand Down
4 changes: 4 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<PackageVersion Include="AWSSDK.Scheduler" Version="[3.7.500.5, 4)" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="[3.7.500.5, 4)" />
<PackageVersion Include="AWSSDK.SQS" Version="[3.7.500.5, 4)" />
<PackageVersion Include="DotPulsar" Version="3.1.0" />
<PackageVersion Include="Azure.Identity" Version="1.14.2" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.20.1" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.25.0" />
Expand Down Expand Up @@ -104,6 +105,7 @@
<PackageVersion Include="Serilog.Sinks.TestCorrelator" Version="4.0.0" />
<PackageVersion Include="ServiceStack.Redis.Core" Version="8.8.0" />
<PackageVersion Include="Shouldly" Version="4.3.0" />
<PackageVersion Include="Moq" Version="4.20.69" />
<PackageVersion Include="Swashbuckle.AspNetCore" Version="9.0.3" />
<PackageVersion Include="System.Data.SqlClient" Version="4.9.0" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.8" />
Expand All @@ -116,6 +118,8 @@
<PackageVersion Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageVersion Include="System.Threading.Channels" Version="9.0.8" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />
<PackageVersion Include="xunit.extensibility.core" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>Brighter Messaging Gateway for Apache Pulsar</Description>
<Authors>Your Name</Authors>
<TargetFrameworks>$(BrighterTargetFrameworks)</TargetFrameworks>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<!-- Reference tới core Brighter -->
<ProjectReference Include="..\Paramore.Brighter\Paramore.Brighter.csproj" />

<!-- Apache Pulsar .NET Client -->
<PackageReference Include="DotPulsar"/>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using Paramore.Brighter;

namespace Paramore.Brighter.MessagingGateway.Pulsar
{
/// <summary>
/// Brighter message consumer implementation using DotPulsar.
/// Responsible for receiving, acknowledging, and requeuing messages from Pulsar.
/// </summary>
public sealed class PulsarMessageConsumer : IAmAMessageConsumerAsync
{
private readonly PulsarMessagingGatewayConfiguration _config;
private readonly IPulsarClient _client;
private IConsumer<byte[]>? _consumer;

// IMPORTANT: Map between Brighter's Message.Id -> Pulsar IMessage
// Needed to ACK or REQUEUE the exact message later.
private readonly ConcurrentDictionary<Id, IMessage<byte[]>> _inflight = new();

public PulsarMessageConsumer(PulsarMessagingGatewayConfiguration config)
: this(config, PulsarClient.Builder().ServiceUrl(new Uri(config.ServiceUrl)).Build())
{ }

public PulsarMessageConsumer(PulsarMessagingGatewayConfiguration config, IPulsarClient client)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_config.Validate();
_client = client ?? throw new ArgumentNullException(nameof(client));
}

/// <summary>
/// Ensure that the Pulsar consumer is created only once.
/// </summary>
private void EnsureConsumer()
{
if (_consumer != null)
return;

_consumer = _client
.NewConsumer(Schema.ByteArray)
.Topic(_config.Topic)
.SubscriptionName(_config.SubscriptionName)
.Create();
}

/// <summary>
/// Receive a single message from Pulsar and return it as a Brighter Message.
/// Note: Brighter's channel.Receive() often works in small batches; returning one message here is sufficient.
/// </summary>
public async Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)
{
EnsureConsumer();

using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeOut ?? TimeSpan.FromMilliseconds(1000));

try
{
await foreach (var pulsarMsg in _consumer!.Messages(cts.Token))
{
// Convert byte[] payload -> UTF8 string body
var payload = pulsarMsg.Data.ToArray();
var bodyText = Encoding.UTF8.GetString(payload);

// Create a Brighter message with a new unique Id
var brighterId = new Id(Guid.NewGuid().ToString());
var brighterMsg = new Message(
new MessageHeader(brighterId, _config.Topic, MessageType.MT_EVENT),
new MessageBody(bodyText)
);

// Track message for later ACK/REQUEUE
_inflight[brighterId] = pulsarMsg;

return new[] { brighterMsg };
}
}
catch (OperationCanceledException)
{
// IMPORTANT: Timeout reached -> return empty array (no messages)
}
catch (Exception ex)
{
// TODO: Replace Console.Error with ILogger
Console.Error.WriteLine($"[Pulsar] Receive error: {ex}");
}

return Array.Empty<Message>();
}

/// <summary>
/// Acknowledge a previously received message.
/// </summary>
public async Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)
{
if (message == null)
throw new ArgumentNullException(nameof(message));
if (_consumer == null)
return;

if (_inflight.TryRemove(message.Id, out var pulsarMsg))
{
// DotPulsar.Extensions – ACK must be called with the exact IMessage received.
await _consumer.Acknowledge(pulsarMsg, cancellationToken);
}
}

/// <summary>
/// Reject is not natively supported by Pulsar; we simulate by requesting re-delivery.
/// </summary>
public async Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default)
{
return await RequeueAsync(message, null, cancellationToken);
}

/// <summary>
/// Pulsar does not support purging messages directly for a topic/subscription.
/// </summary>
public async Task PurgeAsync(CancellationToken cancellationToken = default)
{
await Task.Yield();
throw new NotSupportedException("Apache Pulsar does not support purging messages directly.");
}

/// <summary>
/// Request re-delivery for a specific unacknowledged message.
/// NOTE: Pulsar will redeliver; delay parameter is ignored (not supported natively).
/// </summary>
public async Task<bool> RequeueAsync(Message message, TimeSpan? delay = null, CancellationToken cancellationToken = default)
{
if (message == null)
throw new ArgumentNullException(nameof(message));
if (_consumer == null)
return false;

if (_inflight.TryRemove(message.Id, out var pulsarMsg))
{
// IMPORTANT: Correct way to ask Pulsar to redeliver a specific message.
await _consumer.RedeliverUnacknowledgedMessages(new[] { pulsarMsg.MessageId }, cancellationToken);
return true;
}

// If message is not in _inflight, we cannot requeue it specifically.
return false;
}

/// <summary>
/// Dispose Pulsar consumer and client gracefully.
/// </summary>
public async ValueTask DisposeAsync()
{
try
{
if (_consumer is not null)
await _consumer.DisposeAsync();
}
finally
{
await _client.DisposeAsync();
}
}
}
}
Loading
Loading