Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
159ee9c
Init support to pulsar
Jul 23, 2025
91083e5
Merge branch 'master' into add-support-pulsar
Jul 23, 2025
892f467
Add comment
Jul 23, 2025
65c3d93
Add Pulsar unit test
Jul 23, 2025
2581d23
Add comments to header name
Jul 23, 2025
48e1889
feat: Update csproj
Jul 23, 2025
215f017
fix: build
Jul 23, 2025
211bd26
Merge branch 'master' into add-support-pulsar
Jul 28, 2025
5ebf049
feat: Add Pulsar CI
Jul 28, 2025
641f783
fix: error to run pulsar
Jul 28, 2025
ccd37c7
Merge branch 'master' into add-support-pulsar
lillo42 Aug 5, 2025
309f4c6
fix: build
lillo42 Aug 5, 2025
cb86df3
fix: Remove RocketMQ build & build tests
lillo42 Aug 5, 2025
1800a5f
fix: pulsar tests
lillo42 Aug 5, 2025
9a3d34d
Merge branch 'master' into add-support-pulsar
lillo42 Aug 7, 2025
f656cb9
fix: Try to fix unit tests
lillo42 Aug 8, 2025
fb17289
fix: Remove unnecessary vars
lillo42 Aug 8, 2025
9f88177
Merge branch 'master' into add-support-pulsar
lillo42 Aug 8, 2025
6cecb71
fix: Build
lillo42 Aug 8, 2025
233f841
fix: unit tests
lillo42 Aug 8, 2025
68fba51
Merge branch 'master' into add-support-pulsar
Aug 15, 2025
5ed1cc1
Update Pulsar docs
Aug 15, 2025
7cf9466
Merge branch 'master' into add-support-pulsar
lillo42 Sep 16, 2025
afe0fa3
Change the max of message same as NoOfPerformermers
lillo42 Sep 16, 2025
bbf95bf
Merge branch 'master' into add-support-pulsar
lillo42 Sep 16, 2025
afe30ba
Merge branch 'master' into add-support-pulsar
Sep 26, 2025
777734e
Merge branch 'master' into add-support-pulsar
Sep 26, 2025
00ecbbc
Merge branch 'master' into add-support-pulsar
lillo42 Oct 3, 2025
f302def
Revert unnecessary changes
lillo42 Oct 3, 2025
5f44003
Merge branch 'master' into add-support-pulsar
lillo42 Dec 2, 2025
34efccd
Merge with master
lillo42 Dec 2, 2025
443f065
Updata pacakges
lillo42 Dec 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,35 @@ jobs:
run: dotnet restore
- name: Azure Tests
run: dotnet test ./tests/Paramore.Brighter.AzureServiceBus.Tests/Paramore.Brighter.AzureServiceBus.Tests.csproj --filter "Fragile!=CI" --configuration Release --logger "console;verbosity=normal" --logger GitHubActions --blame -v n

pulsar-ci:
runs-on: ubuntu-latest
timeout-minutes: 5
needs: [build]
steps:
- uses: actions/checkout@v5
with:
fetch-depth: 0 # Required to fetch the Git tags
filter: tree:0
show-progress: false
- name: Setup dotnet
uses: actions/setup-dotnet@v5
with:
dotnet-version: |
8.0.x
9.0.x
- uses: actions/cache@v4
with:
path: ~/.nuget/packages
key: Linux-nuget-${{ hashFiles('**/Directory.Packages.props') }}
restore-keys: |
Linux-nuget

- name: Set up Apache Pulsar
uses: reugn/github-action-pulsar@v1

- name: Pulsar Tests
run: dotnet test ./tests/Paramore.Brighter.Pulsar.Tests/Paramore.Brighter.Pulsar.Tests.csproj --filter "Fragile!=CI" --configuration Release --logger "console;verbosity=normal" --logger GitHubActions --blame -v n

mongodb-ci:
runs-on: ubuntu-latest
Expand Down
29 changes: 29 additions & 0 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TaskReceiverConsole", "samp
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TaskStatusSender", "samples\TaskQueue\KafkaDynamicEventStream\TaskStatusSender\TaskStatusSender.csproj", "{24360989-A956-45E9-BF07-7FD9E7553C7D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MessagingGateway.Pulsar", "src\Paramore.Brighter.MessagingGateway.Pulsar\Paramore.Brighter.MessagingGateway.Pulsar.csproj", "{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Pulsar.Tests", "tests\Paramore.Brighter.Pulsar.Tests\Paramore.Brighter.Pulsar.Tests.csproj", "{94C2C616-E5B1-439A-AD78-1146180AEC84}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.RocketMQ.Tests", "tests\Paramore.Brighter.RocketMQ.Tests\Paramore.Brighter.RocketMQ.Tests\Paramore.Brighter.RocketMQ.Tests.csproj", "{9063F17B-5636-4AD5-999B-C894517DB5FD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Transformers.Gcp", "src\Paramore.Brighter.Transformers.Gcp\Paramore.Brighter.Transformers.Gcp.csproj", "{231C1680-1590-458A-B9B2-38A3EDEE5E24}"
Expand Down Expand Up @@ -2732,6 +2736,30 @@ Global
{261E1392-7713-525F-2859-7B40CA416A50}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{261E1392-7713-525F-2859-7B40CA416A50}.Release|x86.ActiveCfg = Release|Any CPU
{261E1392-7713-525F-2859-7B40CA416A50}.Release|x86.Build.0 = Release|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Debug|x86.ActiveCfg = Debug|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Debug|x86.Build.0 = Debug|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Release|Any CPU.Build.0 = Release|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Release|x86.ActiveCfg = Release|Any CPU
{BDC154A4-5978-4D39-BEEC-4E2F41DF334A}.Release|x86.Build.0 = Release|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Debug|Any CPU.Build.0 = Debug|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Debug|x86.ActiveCfg = Debug|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Debug|x86.Build.0 = Debug|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Release|Any CPU.ActiveCfg = Release|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Release|Any CPU.Build.0 = Release|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Release|x86.ActiveCfg = Release|Any CPU
{94C2C616-E5B1-439A-AD78-1146180AEC84}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -2871,6 +2899,7 @@ Global
{79CA356E-B08C-4D88-88C9-653EC8D8BF4D} = {9347BD22-1E8A-4A79-9D86-D76D076E566F}
{43F23C67-6C9D-44BB-B3A8-9313E570879D} = {9347BD22-1E8A-4A79-9D86-D76D076E566F}
{24360989-A956-45E9-BF07-7FD9E7553C7D} = {9347BD22-1E8A-4A79-9D86-D76D076E566F}
{94C2C616-E5B1-439A-AD78-1146180AEC84} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
{9063F17B-5636-4AD5-999B-C894517DB5FD} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
{39B7CFF4-4CA9-4B1F-B9C4-EED3A657D00D} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
{261E1392-7713-525F-2859-7B40CA416A50} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD}
Expand Down
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<PackageVersion Include="Dapper" Version="2.1.66" />
<PackageVersion Include="Dapper.Contrib" Version="2.0.78" />
<PackageVersion Include="DapperExtensions" Version="1.7.0" />
<PackageVersion Include="DotPulsar" Version="5.1.0" />
<PackageVersion Include="EventStore.ClientAPI.NetCore" Version="4.1.0.23" />
<PackageVersion Include="FluentMigrator" Version="7.1.0" />
<PackageVersion Include="FluentMigrator.Runner" Version="7.1.0" />
Expand Down
87 changes: 87 additions & 0 deletions docker-compose-pulsar.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
services:
# Start zookeeper
zookeeper:
image: apachepulsar/pulsar:latest
container_name: zookeeper
restart: on-failure
environment:
- metadataStoreUrl=zk:zookeeper:2181
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m
command:
- bash
- -c
- |
bin/apply-config-from-env.py conf/zookeeper.conf && \
bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
exec bin/pulsar zookeeper
healthcheck:
test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
interval: 10s
timeout: 5s
retries: 30

# Init cluster metadata
pulsar-init:
container_name: pulsar-init
hostname: pulsar-init
image: apachepulsar/pulsar:latest
command:
- bash
- -c
- |
bin/pulsar initialize-cluster-metadata \
--cluster cluster-a \
--zookeeper zookeeper:2181 \
--configuration-store zookeeper:2181 \
--web-service-url http://broker:8080 \
--broker-service-url pulsar://broker:6650
depends_on:
zookeeper:
condition: service_healthy

# Start bookie
bookie:
image: apachepulsar/pulsar:latest
container_name: bookie
restart: on-failure
environment:
- clusterName=cluster-a
- zkServers=zookeeper:2181
- metadataServiceUri=metadata-store:zk:zookeeper:2181
# otherwise every time we run docker compose uo or down we fail to start due to Cookie
# See: https://github.com/apache/bookkeeper/blob/405e72acf42bb1104296447ea8840d805094c787/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java#L57-68
- advertisedAddress=bookie
- BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
depends_on:
zookeeper:
condition: service_healthy
pulsar-init:
condition: service_completed_successfully
# Map the local directory to the container to avoid bookie startup failure due to insufficient container disks.
command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"

# Start broker
broker:
image: apachepulsar/pulsar:latest
container_name: broker
hostname: broker
restart: on-failure
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker
- advertisedListeners=external:pulsar://127.0.0.1:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
63 changes: 63 additions & 0 deletions src/Paramore.Brighter.MessagingGateway.Pulsar/HeaderNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace Paramore.Brighter.MessagingGateway.Pulsar;

/// <summary>
/// Contains constant definitions for header names used in messaging systems,
/// particularly for Apache Pulsar integration with Brighter.
/// Includes standard headers, CloudEvents (CE) headers, and custom Brighter-Pulsar headers.
/// </summary>
public static class HeaderNames
{
/// <summary>Content type of the message payload (e.g., application/json)</summary>
public const string ContentType = "ContentType";

/// <summary>Correlation ID for tracing related messages</summary>
public const string CorrelationId = "CorrelationId";

/// <summary>CloudEvents-formatted unique message identifier</summary>
public const string MessageId = "CE-EventId";

/// <summary>Number of times a message has been processed/requeued</summary>
public const string HandledCount = "HandledCount";

/// <summary>Brighter message type classification (e.g., MT_COMMAND, MT_EVENT)</summary>
public const string MessageType = "MessageType";

/// <summary>Reply destination for request-reply patterns</summary>
public const string ReplyTo = "ReplyTo";

/// <summary>CloudEvents specification version (e.g., "1.0")</summary>
public const string SpecVersion = "CE-SpecVersion";

/// <summary>CloudEvents event type descriptor</summary>
public const string Type = "CE-EventType";

/// <summary>Timestamp of event occurrence in RFC3339 format</summary>
public const string Time = "CE-EventTime";

/// <summary>CloudEvents subject describing event content</summary>
public const string Subject = "CE-Subject";

/// <summary>CloudEvents schema URL for payload validation</summary>
public const string DataSchema = "CE-DataSchema";

/// <summary>CloudEvents source URI identifying event origin</summary>
public const string Source = "CE-Source";

/// <summary>W3C Trace Context traceparent value</summary>
public const string TraceParent = "CE-X-TraceParent";

/// <summary>W3C Trace Context tracestate value</summary>
public const string TraceState = "CE-X-TraceState";

/// <summary>OpenTelemetry baggage items (key-value pairs)</summary>
public const string Baggage = "CE-X-Baggage";

/// <summary>Original message topic/routing key</summary>
public const string Topic = "Topic";

/// <summary>Pulsar schema version identifier</summary>
public const string SchemaVersion = "Brighter-Pulsar-SchemaVersion";

/// <summary>Pulsar message sequence identifier</summary>
public const string SequenceId = "Brighter-Pulsar-SequenceId";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>$(BrighterTargetFrameworks)</TargetFrameworks>
<Description>Provides an implementation of the messaging gateway for decoupled invocation in the Paramore.Brighter pipeline, using Apache Pulsar</Description>
<PackageTags>Apache Pulsar;Pub/Sub;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability</PackageTags>
<Authors>Rafael Andrade</Authors>
<Nullable>enable</Nullable>
<SignAssembly>false</SignAssembly>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Paramore.Brighter\Paramore.Brighter.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="DotPulsar" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="System.Threading.Channels" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using System.Buffers;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using DotPulsar.Abstractions;

namespace Paramore.Brighter.MessagingGateway.Pulsar;

/// <summary>
/// Background message consumer for Apache Pulsar that buffers messages in a bounded channel.
/// </summary>
/// <remarks>
/// This class manages a background message consumption loop that:
/// <list type="bullet">
/// <item><description>Receives messages from Pulsar using an <see cref="IConsumer{T}"/></description></item>
/// <item><description>Writes messages to a bounded channel for consumption by other components</description></item>
/// <item><description>Implements reference counting for safe start/stop operations</description></item>
/// </list>
///
/// The consumer uses a fire-and-forget pattern where the consumption loop runs independently once started.
/// </remarks>
/// <param name="maxLenght">Maximum number of messages to buffer in the channel</param>
/// <param name="consumer">Pulsar message consumer implementation</param>
public sealed class PulsarBackgroundMessageConsumer(int maxLenght, IConsumer<ReadOnlySequence<byte>> consumer)
{
private int _total;
private CancellationTokenSource? _cancellationTokenSource;
private readonly Channel<IMessage<ReadOnlySequence<byte>>> _channel = System.Threading.Channels.Channel.CreateBounded<IMessage<ReadOnlySequence<byte>>>(new BoundedChannelOptions(maxLenght)
{
SingleReader = false, SingleWriter = true
});

/// <summary>
/// Provides read access to the message channel
/// </summary>
public ChannelReader<IMessage<ReadOnlySequence<byte>>> Reader => _channel.Reader;

/// <summary>
/// Gets the underlying Pulsar consumer instance
/// </summary>
public IConsumer<ReadOnlySequence<byte>> Consumer => consumer;

/// <summary>
/// Starts the background message consumption loop
/// </summary>
/// <remarks>
/// Implements reference counting:
/// <list type="bullet">
/// <item><description>First call starts the background loop</description></item>
/// <item><description>Subsequent calls increment the reference count but don't start additional loops</description></item>
/// </list>
/// </remarks>
public void Start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a background loop here? Why not just consume as we do elsewhere and rely on the single-threaded pump to scale performers?

{
var total = Interlocked.Increment(ref _total);
if (total == 1)
{
_cancellationTokenSource = new CancellationTokenSource();
_ = ExecuteAsync(_cancellationTokenSource.Token);
}
}

/// <summary>
/// Background message consumption loop
/// </summary>
/// <param name="cancellationToken">Cancellation token to stop the loop</param>
/// <remarks>
/// Continuously performs:
/// <list type="number">
/// <item><description>Receive message from Pulsar</description></item>
/// <item><description>Write message to output channel</description></item>
/// <item><description>Wait for channel write availability</description></item>
/// </list>
///
/// <para>Errors during message reception are silently ignored to maintain loop continuity.</para>
/// </remarks>
private async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var message = await consumer.Receive(cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a pump within our pump?

if (message is null)
{
continue;
}

await _channel.Writer.WriteAsync(message, cancellationToken);
await _channel.Writer.WaitToWriteAsync(cancellationToken);
}
catch
{
// Ignoring any errors
}
}
}

/// <summary>
/// Stops the background message consumption loop
/// </summary>
/// <remarks>
/// Implements reference counting:
/// <list type="bullet">
/// <item><description>Decrements the reference count</description></item>
/// <item><description>Stops the loop when reference count reaches zero</description></item>
/// </list>
///
/// Safe to call multiple times - only the last call that brings the count to zero will stop the loop.
/// </remarks>
public void Stop()
{
var total = Interlocked.Decrement(ref _total);
if (total == 0 && _cancellationTokenSource != null)
{
_cancellationTokenSource.Cancel();
}
}
}
Loading
Loading