Skip to content

Conversation

@lillo42
Copy link
Contributor

@lillo42 lillo42 commented Jul 23, 2025

Add support to Apache Pulsar #3655

  • Apache Pulsar will create the Topic/Subscription if it doesn't exists
  • The current .NET implementation of Apache Pulsar doesn't have support to dead letter queue or retry queue

@lillo42 lillo42 self-assigned this Jul 23, 2025
@lillo42 lillo42 added 3 - Done feature request v10 .NET Pull requests that update .net code V10.X labels Jul 23, 2025
codescene-delta-analysis[bot]

This comment was marked as outdated.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@iancooper iancooper removed the RC2 label Jul 25, 2025
codescene-delta-analysis[bot]

This comment was marked as outdated.

codescene-delta-analysis[bot]

This comment was marked as outdated.

lillo42 added 2 commits August 5, 2025 11:44
# Conflicts:
#	.github/workflows/ci.yml
#	Brighter.sln
#	Directory.Packages.props
codescene-delta-analysis[bot]

This comment was marked as outdated.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@lillo42 lillo42 marked this pull request as ready for review August 15, 2025 16:27
codescene-delta-analysis[bot]

This comment was marked as outdated.

codescene-delta-analysis[bot]

This comment was marked as outdated.

codescene-delta-analysis[bot]

This comment was marked as outdated.

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce advisory code health rules (3 files with Constructor Over-Injection, Complex Method, Code Duplication)

Gates Passed
3 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce advisory code health rules Violations Code Health Impact
PulsarMessageConsumer.cs 1 advisory rule 10.00 → 9.39 Suppress
PulsarSubscription.cs 1 advisory rule 10.00 → 9.69 Suppress
PulsarMessageProducer.cs 1 advisory rule 10.00 → 9.69 Suppress

Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Comment on lines +43 to +67
public PulsarSubscription(SubscriptionName subscriptionName, ChannelName channelName, RoutingKey routingKey,
Type? requestType = null, Func<Message, Type>? getRequestType = null, int bufferSize = 1, int noOfPerformers = 1,
TimeSpan? timeOut = null, int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create, TimeSpan? emptyChannelDelay = null,
TimeSpan? channelFailureDelay = null,
ISchema<ReadOnlySequence<byte>>? schema = null,
SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Earliest,
int priorityLevel = 0,
bool readCompacted = false,
SubscriptionType subscriptionType = SubscriptionType.Exclusive,
bool allowOutOfOrderDeliver = false,
Action<IConsumerBuilder<ReadOnlySequence<byte>>>? configuration = null)
: base(subscriptionName, channelName, routingKey, requestType, getRequestType, bufferSize, noOfPerformers, timeOut, requeueCount,
requeueDelay, unacceptableMessageLimit, messagePumpType, channelFactory, makeChannels, emptyChannelDelay,
channelFailureDelay)
{
Schema = schema ?? DotPulsar.Schema.ByteSequence;
InitialPosition = initialPosition;
PriorityLevel = priorityLevel;
ReadCompacted = readCompacted;
SubscriptionType = subscriptionType;
AllowOutOfOrderDeliver = allowOutOfOrderDeliver;
Configuration = configuration;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Constructor Over-Injection
PulsarSubscription has 23 arguments, max arguments = 5

Suppress

Comment on lines +163 to +182
public PulsarSubscription(SubscriptionName subscriptionName, ChannelName channelName, RoutingKey routingKey,
Func<Message, Type>? getRequestType = null, int bufferSize = 1, int noOfPerformers = 1, TimeSpan? timeOut = null,
int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create, TimeSpan? emptyChannelDelay = null,
TimeSpan? channelFailureDelay = null,
ISchema<ReadOnlySequence<byte>>? schema = null,
SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Earliest,
int priorityLevel = 0,
bool readCompacted = false,
SubscriptionType subscriptionType = SubscriptionType.Exclusive,
bool allowOutOfOrderDeliver = false,
Action<IConsumerBuilder<ReadOnlySequence<byte>>>? configuration = null)
: base(subscriptionName, channelName, routingKey, typeof(T), getRequestType, bufferSize, noOfPerformers, timeOut, requeueCount,
requeueDelay, unacceptableMessageLimit, messagePumpType, channelFactory, makeChannels, emptyChannelDelay,
channelFailureDelay, schema, initialPosition, priorityLevel, readCompacted, subscriptionType,
allowOutOfOrderDeliver, configuration)
{

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Constructor Over-Injection
PulsarSubscription has 22 arguments, max arguments = 5

Suppress

Copy link
Member

@iancooper iancooper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am probably missing something here, but this feels like we are re-implementing our message pump inside the consumer, which doesn't feel like the "right thing" to be doing. Maybe we need a catch up conversation

throw new ConfigurationException("We expect PulsarSubscription or PulsarSubscription<T> as a parameter");
}

var background = s_backgroundConsumers.GetOrAdd(pulsarSubscription, CreateConsumerBackground);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to chat about what this is, as I don't know Pulsar? But I might need to understand it to see how this works with our pump


var consumer = builder.Create();

return new PulsarBackgroundMessageConsumer(pulsarSubscription.NoOfPerformers, consumer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to be single threaded and rely on our performers to start multiple pumps. This would seen to counteract that, but, again I don't know Pulsar well enough to understand that.

/// <item><description>Subsequent calls increment the reference count but don't start additional loops</description></item>
/// </list>
/// </remarks>
public void Start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a background loop here? Why not just consume as we do elsewhere and rely on the single-threaded pump to scale performers?

{
try
{
var message = await consumer.Receive(cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a pump within our pump?

@lillo42 lillo42 added 2 - In Progress Draft This is a work in progress and removed 3 - Done labels Dec 1, 2025
# Conflicts:
#	Brighter.sln
#	src/Paramore.Brighter/Observability/MessagingSystem.cs
Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce advisory code health rules (3 files with Constructor Over-Injection, Complex Method, Code Duplication)

Gates Passed
3 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce advisory code health rules Violations Code Health Impact
PulsarMessageConsumer.cs 1 advisory rule 9.39 Suppress
PulsarSubscription.cs 1 advisory rule 9.69 Suppress
PulsarMessageProducer.cs 1 advisory rule 9.69 Suppress

Quality Gate Profile: Clean Code Collective
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Comment on lines +43 to +67
public PulsarSubscription(SubscriptionName subscriptionName, ChannelName channelName, RoutingKey routingKey,
Type? requestType = null, Func<Message, Type>? getRequestType = null, int bufferSize = 1, int noOfPerformers = 1,
TimeSpan? timeOut = null, int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create, TimeSpan? emptyChannelDelay = null,
TimeSpan? channelFailureDelay = null,
ISchema<ReadOnlySequence<byte>>? schema = null,
SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Earliest,
int priorityLevel = 0,
bool readCompacted = false,
SubscriptionType subscriptionType = SubscriptionType.Exclusive,
bool allowOutOfOrderDeliver = false,
Action<IConsumerBuilder<ReadOnlySequence<byte>>>? configuration = null)
: base(subscriptionName, channelName, routingKey, requestType, getRequestType, bufferSize, noOfPerformers, timeOut, requeueCount,
requeueDelay, unacceptableMessageLimit, messagePumpType, channelFactory, makeChannels, emptyChannelDelay,
channelFailureDelay)
{
Schema = schema ?? DotPulsar.Schema.ByteSequence;
InitialPosition = initialPosition;
PriorityLevel = priorityLevel;
ReadCompacted = readCompacted;
SubscriptionType = subscriptionType;
AllowOutOfOrderDeliver = allowOutOfOrderDeliver;
Configuration = configuration;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Constructor Over-Injection
PulsarSubscription has 23 arguments, max arguments = 5

Suppress

Comment on lines +163 to +182
public PulsarSubscription(SubscriptionName subscriptionName, ChannelName channelName, RoutingKey routingKey,
Func<Message, Type>? getRequestType = null, int bufferSize = 1, int noOfPerformers = 1, TimeSpan? timeOut = null,
int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create, TimeSpan? emptyChannelDelay = null,
TimeSpan? channelFailureDelay = null,
ISchema<ReadOnlySequence<byte>>? schema = null,
SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Earliest,
int priorityLevel = 0,
bool readCompacted = false,
SubscriptionType subscriptionType = SubscriptionType.Exclusive,
bool allowOutOfOrderDeliver = false,
Action<IConsumerBuilder<ReadOnlySequence<byte>>>? configuration = null)
: base(subscriptionName, channelName, routingKey, typeof(T), getRequestType, bufferSize, noOfPerformers, timeOut, requeueCount,
requeueDelay, unacceptableMessageLimit, messagePumpType, channelFactory, makeChannels, emptyChannelDelay,
channelFailureDelay, schema, initialPosition, priorityLevel, readCompacted, subscriptionType,
allowOutOfOrderDeliver, configuration)
{

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Constructor Over-Injection
PulsarSubscription has 22 arguments, max arguments = 5

Suppress

Comment on lines +92 to +149
private MessageMetadata CreateMessageMetadata(Message message, TimeSpan? delay)
{
var metadata = new MessageMetadata
{
Key = PartitionKey.IsNullOrEmpty(message.Header.PartitionKey) ? null : message.Header.PartitionKey.Value,
EventTimeAsDateTimeOffset = message.Header.TimeStamp,
SchemaVersion = publication.SchemaVersion,
SequenceId = publication.GenerateSequenceId(message),
};

if (delay.HasValue && delay.Value != TimeSpan.Zero)
{
metadata.DeliverAtTimeAsDateTimeOffset = time.GetUtcNow() + delay.Value;
}

metadata[HeaderNames.ContentType] = message.Header.ContentType.ToString();
metadata[HeaderNames.CorrelationId] = message.Header.CorrelationId;
metadata[HeaderNames.MessageType] = message.Header.MessageType.ToString();
metadata[HeaderNames.MessageId] = message.Header.MessageId;
metadata[HeaderNames.SpecVersion] = message.Header.SpecVersion;
metadata[HeaderNames.Type] = message.Header.Type;
metadata[HeaderNames.Time] = message.Header.TimeStamp.ToRfc3339();
metadata[HeaderNames.Topic] = message.Header.Topic;
metadata[HeaderNames.Source] = message.Header.Source.ToString();
metadata[HeaderNames.Baggage] = message.Header.Baggage.ToString();

if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))
{
metadata[HeaderNames.ReplyTo] = message.Header.ReplyTo;
}

if (!string.IsNullOrEmpty(message.Header.Subject))
{
metadata[HeaderNames.Subject] = message.Header.Subject;
}

if (message.Header.DataSchema != null)
{
metadata[HeaderNames.DataSchema] = message.Header.DataSchema.ToString();
}

if (!TraceParent.IsNullOrEmpty(message.Header.TraceParent))
{
metadata[HeaderNames.TraceParent] = message.Header.TraceParent;
}

if (!TraceState.IsNullOrEmpty(message.Header.TraceState))
{
metadata[HeaderNames.TraceState] = message.Header.TraceState;
}

foreach (var pair in message.Header.Bag)
{
metadata[pair.Key] = pair.Value.ToString();
}

return metadata;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Complex Method
CreateMessageMetadata has a cyclomatic complexity of 10, threshold = 9

Suppress

Comment on lines +51 to +73
public async Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)
{
if (!message.Header.Bag.TryGetValue("ReceiptHandle", out var receiptHandle))
{
return;
}

if (receiptHandle is not MessageId messageId)
{
return;
}

try
{
await backgroundPulsarConsumer.Consumer.Acknowledge(messageId, cancellationToken);
Log.AcknowledgedMessage(s_logger, message.Id, messageId.ToString());
}
catch (Exception ex)
{
Log.ErrorAcknowledgingMessage(s_logger, ex, message.Id, messageId.ToString());
throw;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 3 functions with similar structure: AcknowledgeAsync,RejectAsync,RequeueAsync

Suppress

@iancooper iancooper removed the v10 label Dec 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 - In Progress Draft This is a work in progress feature request .NET Pull requests that update .net code V10.X

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants