Skip to content

Commit e149f9c

Browse files
committed
feat: implement kafka headers support
1 parent d7b2d01 commit e149f9c

20 files changed

+1405
-231
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System.Collections.Generic;
2+
using Confluent.Kafka;
3+
4+
namespace Take.Elephant.Kafka
5+
{
6+
/// <summary>
7+
/// Provides Kafka message headers to be injected into every produced message.
8+
/// Implementations are resolved at produce-time, so headers can be dynamic
9+
/// (e.g. trace IDs) or static (e.g. deployment metadata read once at startup).
10+
/// </summary>
11+
public interface IKafkaHeaderProvider
12+
{
13+
IEnumerable<IHeader> GetHeaders();
14+
}
15+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace Take.Elephant.Kafka
5+
{
6+
/// <summary>
7+
/// Optional Kafka receiver contract that allows reading message headers together with the payload.
8+
/// </summary>
9+
/// <typeparam name="T">Payload type.</typeparam>
10+
public interface IKafkaReceiverQueue<T> : IReceiverQueue<T>, IBlockingReceiverQueue<T>
11+
{
12+
/// <summary>
13+
/// Dequeues a payload with its Kafka headers, if available.
14+
/// </summary>
15+
Task<KafkaConsumedMessage<T>> DequeueWithHeadersOrDefaultAsync(
16+
CancellationToken cancellationToken = default
17+
);
18+
19+
/// <summary>
20+
/// Dequeues a payload with its Kafka headers, awaiting for a message if needed.
21+
/// </summary>
22+
Task<KafkaConsumedMessage<T>> DequeueWithHeadersAsync(
23+
CancellationToken cancellationToken
24+
);
25+
}
26+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using System.Collections.Generic;
2+
using System.Collections.ObjectModel;
3+
using System.Text;
4+
5+
namespace Take.Elephant.Kafka
6+
{
7+
internal static class KafkaConsumedMessageDefaults
8+
{
9+
internal static readonly IReadOnlyDictionary<string, byte[]> EmptyHeaders =
10+
new ReadOnlyDictionary<string, byte[]>(new Dictionary<string, byte[]>());
11+
}
12+
13+
/// <summary>
14+
/// Represents a consumed Kafka payload with its message headers.
15+
/// </summary>
16+
/// <typeparam name="T">Payload type.</typeparam>
17+
public sealed class KafkaConsumedMessage<T>(T item, IReadOnlyDictionary<string, byte[]> headers)
18+
{
19+
20+
private readonly IReadOnlyDictionary<string, byte[]> _headers =
21+
headers is null
22+
? KafkaConsumedMessageDefaults.EmptyHeaders
23+
: new ReadOnlyDictionary<string, byte[]>(new Dictionary<string, byte[]>(headers));
24+
25+
public T Item { get; } = item;
26+
27+
public IReadOnlyDictionary<string, byte[]> Headers => _headers;
28+
29+
public bool TryGetHeader(string key, out byte[] value)
30+
{
31+
if (!string.IsNullOrWhiteSpace(key)) return Headers.TryGetValue(key, out value);
32+
value = null;
33+
return false;
34+
35+
}
36+
37+
public bool TryGetHeaderAsUtf8String(string key, out string value)
38+
{
39+
value = null;
40+
if (!TryGetHeader(key, out var bytes) || bytes == null)
41+
{
42+
return false;
43+
}
44+
45+
value = Encoding.UTF8.GetString(bytes);
46+
return true;
47+
}
48+
}
49+
}

src/Take.Elephant.Kafka/KafkaEventStreamPublisher.cs

Lines changed: 87 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Text;
33
using System.Threading;
44
using System.Threading.Tasks;
@@ -9,6 +9,7 @@ namespace Take.Elephant.Kafka
99
public class KafkaEventStreamPublisher<TKey, TEvent> : IEventStreamPublisher<TKey, TEvent>, IDisposable
1010
{
1111
private readonly IProducer<TKey, TEvent> _producer;
12+
private readonly IKafkaHeaderProvider _headerProvider;
1213

1314
public KafkaEventStreamPublisher(string bootstrapServers, string topic, ISerializer<TEvent> serializer)
1415
: this(new ProducerConfig() { BootstrapServers = bootstrapServers }, topic, serializer)
@@ -21,14 +22,29 @@ public KafkaEventStreamPublisher(string bootstrapServers, string topic, Confluen
2122
}
2223

2324
public KafkaEventStreamPublisher(
24-
ProducerConfig producerConfig,
25+
ProducerConfig producerConfig,
2526
string topic,
2627
ISerializer<TEvent> serializer)
2728
: this(
2829
new ProducerBuilder<TKey, TEvent>(producerConfig)
2930
.SetValueSerializer(new EventSerializer(serializer))
3031
.Build(),
31-
topic)
32+
topic,
33+
null)
34+
{
35+
}
36+
37+
public KafkaEventStreamPublisher(
38+
ProducerConfig producerConfig,
39+
string topic,
40+
ISerializer<TEvent> serializer,
41+
IKafkaHeaderProvider headerProvider)
42+
: this(
43+
new ProducerBuilder<TKey, TEvent>(producerConfig)
44+
.SetValueSerializer(new EventSerializer(serializer))
45+
.Build(),
46+
topic,
47+
headerProvider)
3248
{
3349
}
3450

@@ -40,7 +56,22 @@ public KafkaEventStreamPublisher(
4056
new ProducerBuilder<TKey, TEvent>(producerConfig)
4157
.SetValueSerializer(kafkaSerializer)
4258
.Build(),
43-
topic)
59+
topic,
60+
null)
61+
{
62+
}
63+
64+
public KafkaEventStreamPublisher(
65+
ProducerConfig producerConfig,
66+
string topic,
67+
Confluent.Kafka.ISerializer<TEvent> kafkaSerializer,
68+
IKafkaHeaderProvider headerProvider)
69+
: this(
70+
new ProducerBuilder<TKey, TEvent>(producerConfig)
71+
.SetValueSerializer(kafkaSerializer)
72+
.Build(),
73+
topic,
74+
headerProvider)
4475
{
4576
}
4677

@@ -53,22 +84,64 @@ public KafkaEventStreamPublisher(
5384
throw new ArgumentException("Value cannot be null or whitespace.", nameof(topic));
5485
}
5586

87+
if (producer == null)
88+
{
89+
throw new ArgumentNullException(nameof(producer));
90+
}
91+
92+
_producer = producer;
93+
Topic = topic;
94+
_headerProvider = null;
95+
}
96+
97+
public KafkaEventStreamPublisher(
98+
IProducer<TKey, TEvent> producer,
99+
string topic,
100+
IKafkaHeaderProvider headerProvider)
101+
{
102+
if (string.IsNullOrWhiteSpace(topic))
103+
{
104+
throw new ArgumentException("Value cannot be null or whitespace.", nameof(topic));
105+
}
106+
107+
if (producer == null)
108+
{
109+
throw new ArgumentNullException(nameof(producer));
110+
}
111+
56112
_producer = producer;
57113
Topic = topic;
114+
_headerProvider = headerProvider;
58115
}
59116

60117
public string Topic { get; }
61118

62119
public async Task PublishAsync(TKey key, TEvent item, CancellationToken cancellationToken)
63120
{
64-
await _producer.ProduceAsync(
65-
Topic,
66-
new Message<TKey, TEvent>
121+
var message = new Message<TKey, TEvent>
122+
{
123+
Key = key,
124+
Value = item
125+
};
126+
127+
if (_headerProvider != null)
128+
{
129+
message.Headers = [];
130+
var headers = _headerProvider.GetHeaders();
131+
if (headers != null)
67132
{
68-
Key = key,
69-
Value = item
70-
},
71-
cancellationToken);
133+
foreach (var header in headers)
134+
{
135+
if (header == null || header.Key == null)
136+
{
137+
continue;
138+
}
139+
message.Headers.Add(header.Key, header.GetValueBytes());
140+
}
141+
}
142+
}
143+
144+
await _producer.ProduceAsync(Topic, message, cancellationToken);
72145
}
73146

74147
protected virtual void Dispose(bool disposing)
@@ -85,18 +158,12 @@ public void Dispose()
85158
GC.SuppressFinalize(this);
86159
}
87160

88-
public class EventSerializer : Confluent.Kafka.ISerializer<TEvent>
161+
public class EventSerializer(ISerializer<TEvent> serializer) : Confluent.Kafka.ISerializer<TEvent>
89162
{
90-
private readonly ISerializer<TEvent> _serializer;
91-
92-
public EventSerializer(ISerializer<TEvent> serializer)
93-
{
94-
_serializer = serializer;
95-
}
96163
public byte[] Serialize(TEvent data, SerializationContext context)
97164
{
98-
return Encoding.UTF8.GetBytes(_serializer.Serialize(data));
165+
return Encoding.UTF8.GetBytes(serializer.Serialize(data));
99166
}
100167
}
101168
}
102-
}
169+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Collections.Generic;
2+
using Confluent.Kafka;
3+
4+
namespace Take.Elephant.Kafka
5+
{
6+
internal static class KafkaHeadersConverter
7+
{
8+
internal static KafkaConsumedMessage<T> BuildConsumedMessage<T>(T item, Headers headers)
9+
{
10+
return new KafkaConsumedMessage<T>(item, ToDictionary(headers));
11+
}
12+
13+
internal static Dictionary<string, byte[]> ToDictionary(Headers headers)
14+
{
15+
if (headers == null || headers.Count == 0)
16+
{
17+
return null;
18+
}
19+
20+
var result = new Dictionary<string, byte[]>(headers.Count);
21+
foreach (var header in headers)
22+
{
23+
if (header?.Key == null)
24+
{
25+
continue;
26+
}
27+
28+
var valueBytes = header.GetValueBytes();
29+
result[header.Key] = valueBytes != null ? (byte[])valueBytes.Clone() : null;
30+
}
31+
32+
return result;
33+
}
34+
}
35+
}
Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
using System;
2-
using System.Collections.Generic;
3-
using System.Text;
42
using System.Threading;
53
using System.Threading.Tasks;
64
using Confluent.Kafka;
75

86
namespace Take.Elephant.Kafka
97
{
10-
public class KafkaPartitionQueue<T> : IReceiverQueue<T>, IPartitionSenderQueue<T>, ICloseable, IDisposable
8+
public class KafkaPartitionQueue<T> : IKafkaReceiverQueue<T>, IPartitionSenderQueue<T>, ICloseable, IDisposable
119
{
1210
private readonly KafkaPartitionSenderQueue<T> _senderQueue;
1311
private readonly KafkaReceiverQueue<T> _receiverQueue;
@@ -16,12 +14,13 @@ public KafkaPartitionQueue(
1614
ProducerConfig producerConfig,
1715
ConsumerConfig consumerConfig,
1816
string topic,
19-
Take.Elephant.ISerializer<T> serializer,
17+
ISerializer<T> serializer,
2018
Confluent.Kafka.ISerializer<string> kafkaSerializer = null,
21-
IDeserializer<string> kafkaDeserializer = null)
19+
IDeserializer<string> kafkaDeserializer = null,
20+
IKafkaHeaderProvider headerProvider = null)
2221
{
23-
_senderQueue = new KafkaPartitionSenderQueue<T>(producerConfig, topic, serializer, kafkaSerializer);
24-
_receiverQueue = new KafkaReceiverQueue<T>(consumerConfig, topic, serializer, kafkaDeserializer);
22+
_senderQueue = new(producerConfig, topic, serializer, kafkaSerializer, headerProvider);
23+
_receiverQueue = new(consumerConfig, topic, serializer, kafkaDeserializer);
2524
}
2625

2726
public Task EnqueueAsync(T item, string key, CancellationToken cancellationToken = default)
@@ -34,11 +33,25 @@ public Task<T> DequeueOrDefaultAsync(CancellationToken cancellationToken = defau
3433
return _receiverQueue.DequeueOrDefaultAsync(cancellationToken);
3534
}
3635

37-
public Task<T> DequeueAsync(CancellationToken cancellationToken = default)
36+
public Task<KafkaConsumedMessage<T>> DequeueWithHeadersOrDefaultAsync(
37+
CancellationToken cancellationToken = default
38+
)
39+
{
40+
return _receiverQueue.DequeueWithHeadersOrDefaultAsync(cancellationToken);
41+
}
42+
43+
public Task<T> DequeueAsync(CancellationToken cancellationToken)
3844
{
3945
return _receiverQueue.DequeueAsync(cancellationToken);
4046
}
4147

48+
public Task<KafkaConsumedMessage<T>> DequeueWithHeadersAsync(
49+
CancellationToken cancellationToken
50+
)
51+
{
52+
return _receiverQueue.DequeueWithHeadersAsync(cancellationToken);
53+
}
54+
4255
public Task CloseAsync(CancellationToken cancellationToken) => _receiverQueue.CloseAsync(cancellationToken);
4356

4457
public void Dispose()
@@ -47,13 +60,11 @@ public void Dispose()
4760
GC.SuppressFinalize(this);
4861
}
4962

50-
protected void Dispose(bool disposing)
63+
protected virtual void Dispose(bool disposing)
5164
{
52-
if (disposing)
53-
{
54-
_senderQueue?.Dispose();
55-
_receiverQueue?.Dispose();
56-
}
65+
if (!disposing) return;
66+
_senderQueue?.Dispose();
67+
_receiverQueue?.Dispose();
5768
}
5869
}
5970
}

0 commit comments

Comments
 (0)