Skip to content

Commit 63fa7eb

Browse files
feat: add support for Kafka Schema Registry serialization and deserialization
1 parent ec736e4 commit 63fa7eb

31 files changed

+3658
-37
lines changed

src/.idea/.idea.Take.Elephant/.idea/indexLayout.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/.idea/.idea.Take.Elephant/.idea/modules.xml

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/.idea/.idea.Take.Elephant/.idea/projectSettingsUpdater.xml

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Take.Elephant.Kafka/README.md

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,44 @@ Docker Compose
2323

2424
Create one file named `docker-compose.yml` with this content:
2525
```
26-
version: "3"
2726
services:
28-
zookeeper:
29-
image: "confluentinc/cp-zookeeper"
27+
broker:
28+
image: confluentinc/cp-kafka:8.0.0
29+
hostname: broker
30+
container_name: broker
3031
ports:
31-
- 2181:2181
32+
- "9092:9092"
33+
- "9101:9101"
3234
environment:
33-
- ZOOKEEPER_CLIENT_PORT=2181
34-
volumes:
35-
- zookeeperData:/var/lib/zookeeper/data
36-
- zookeeperLogs:/var/lib/zookeeper/log
37-
38-
kafka:
39-
image: "confluentinc/cp-kafka"
35+
KAFKA_NODE_ID: 1
36+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
37+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
38+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
39+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
40+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
41+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
42+
KAFKA_JMX_PORT: 9101
43+
KAFKA_JMX_HOSTNAME: localhost
44+
KAFKA_PROCESS_ROLES: 'broker,controller'
45+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
46+
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
47+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
48+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
49+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
50+
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
51+
52+
schema-registry:
53+
image: confluentinc/cp-schema-registry:8.0.0
54+
hostname: schema-registry
55+
container_name: schema-registry
56+
depends_on:
57+
- broker
4058
ports:
41-
- 9092:9092
59+
- "8081:8081"
4260
environment:
43-
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
44-
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
45-
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
46-
volumes:
47-
- kafka:/var/lib/kafka/data
48-
depends_on:
49-
- zookeeper
50-
volumes:
51-
kafka:
52-
zookeeperData:
53-
zookeeperLogs:
54-
61+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
62+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
63+
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
5564
```
5665

5766
Basically this file is starting two services on your machine, the `Zookeeper` and the `kafka` itself. Inside the folder that you created the docker compose file, run the command `docker-compose up -d`, the `-d` is for second plan run. After the command run sucessfully you should run `docker-compose ps` to check if the services are running on your pc. You should see:
@@ -86,4 +95,4 @@ kafka_1 | [2019-11-25 16:39:34,220] INFO [KafkaServer id=1] started (kafka.
8695
```
8796
#### Check logs on kafka to see events of consuming and producing
8897

89-
`docker-compose logs --f`
98+
`docker-compose logs --f`
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
using System;
2+
using System.Diagnostics;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Channels;
6+
using System.Threading.Tasks;
7+
using Confluent.Kafka;
8+
using Confluent.SchemaRegistry;
9+
10+
namespace Take.Elephant.Kafka.SchemaRegistry
11+
{
12+
/// <summary>
13+
/// A Kafka event stream consumer that uses Schema Registry for deserialization.
14+
/// </summary>
15+
/// <typeparam name="TKey">The type of the message key.</typeparam>
16+
/// <typeparam name="TEvent">The type of the event.</typeparam>
17+
public class KafkaSchemaRegistryEventStreamConsumer<TKey, TEvent> : IEventStreamConsumer<TKey, TEvent>, IOpenable, ICloseable, IDisposable
18+
where TEvent : class
19+
{
20+
private readonly IConsumer<TKey, TEvent> _consumer;
21+
private readonly ISchemaRegistryClient _schemaRegistryClient;
22+
private readonly bool _ownsSchemaRegistryClient;
23+
private readonly SemaphoreSlim _consumerStartSemaphore;
24+
private readonly CancellationTokenSource _cts;
25+
private readonly Channel<Message<TKey, TEvent>> _channel;
26+
private Task _consumerTask;
27+
private bool _closed;
28+
29+
/// <summary>
30+
/// Creates a new instance of <see cref="KafkaSchemaRegistryEventStreamConsumer{TKey, TEvent}"/>.
31+
/// </summary>
32+
/// <param name="bootstrapServers">The Kafka bootstrap servers.</param>
33+
/// <param name="topic">The topic name.</param>
34+
/// <param name="groupId">The consumer group ID.</param>
35+
/// <param name="schemaRegistryOptions">The Schema Registry options.</param>
36+
public KafkaSchemaRegistryEventStreamConsumer(
37+
string bootstrapServers,
38+
string topic,
39+
string groupId,
40+
SchemaRegistryOptions schemaRegistryOptions)
41+
: this(
42+
new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupId },
43+
topic,
44+
schemaRegistryOptions)
45+
{
46+
}
47+
48+
/// <summary>
49+
/// Creates a new instance of <see cref="KafkaSchemaRegistryEventStreamConsumer{TKey, TEvent}"/>.
50+
/// </summary>
51+
/// <param name="consumerConfig">The consumer configuration.</param>
52+
/// <param name="topic">The topic name.</param>
53+
/// <param name="schemaRegistryOptions">The Schema Registry options.</param>
54+
public KafkaSchemaRegistryEventStreamConsumer(
55+
ConsumerConfig consumerConfig,
56+
string topic,
57+
SchemaRegistryOptions schemaRegistryOptions)
58+
: this(
59+
consumerConfig,
60+
topic,
61+
new CachedSchemaRegistryClient(schemaRegistryOptions.SchemaRegistryConfig),
62+
schemaRegistryOptions,
63+
ownsSchemaRegistryClient: true)
64+
{
65+
}
66+
67+
/// <summary>
68+
/// Creates a new instance of <see cref="KafkaSchemaRegistryEventStreamConsumer{TKey, TEvent}"/>.
69+
/// </summary>
70+
/// <param name="consumerConfig">The consumer configuration.</param>
71+
/// <param name="topic">The topic name.</param>
72+
/// <param name="schemaRegistryClient">The Schema Registry client.</param>
73+
/// <param name="schemaRegistryOptions">The Schema Registry options.</param>
74+
public KafkaSchemaRegistryEventStreamConsumer(
75+
ConsumerConfig consumerConfig,
76+
string topic,
77+
ISchemaRegistryClient schemaRegistryClient,
78+
SchemaRegistryOptions schemaRegistryOptions)
79+
: this(consumerConfig, topic, schemaRegistryClient, schemaRegistryOptions, ownsSchemaRegistryClient: false)
80+
{
81+
}
82+
83+
private KafkaSchemaRegistryEventStreamConsumer(
84+
ConsumerConfig consumerConfig,
85+
string topic,
86+
ISchemaRegistryClient schemaRegistryClient,
87+
SchemaRegistryOptions schemaRegistryOptions,
88+
bool ownsSchemaRegistryClient)
89+
{
90+
if (string.IsNullOrWhiteSpace(topic))
91+
throw new ArgumentException("Value cannot be null or whitespace.", nameof(topic));
92+
93+
Topic = topic;
94+
_schemaRegistryClient = schemaRegistryClient ?? throw new ArgumentNullException(nameof(schemaRegistryClient));
95+
_ownsSchemaRegistryClient = ownsSchemaRegistryClient;
96+
97+
var deserializer = SchemaRegistrySerializerFactory.CreateDeserializer<TEvent>(schemaRegistryClient, schemaRegistryOptions);
98+
_consumer = new ConsumerBuilder<TKey, TEvent>(consumerConfig)
99+
.SetValueDeserializer(deserializer)
100+
.Build();
101+
102+
_consumerStartSemaphore = new SemaphoreSlim(1, 1);
103+
_cts = new CancellationTokenSource();
104+
_channel = Channel.CreateBounded<Message<TKey, TEvent>>(1);
105+
}
106+
107+
/// <summary>
108+
/// Creates a new instance of <see cref="KafkaSchemaRegistryEventStreamConsumer{TKey, TEvent}"/> using a pre-built consumer.
109+
/// </summary>
110+
/// <param name="consumer">The Kafka consumer.</param>
111+
/// <param name="topic">The topic name.</param>
112+
/// <param name="schemaRegistryClient">The Schema Registry client (optional, for lifecycle management).</param>
113+
public KafkaSchemaRegistryEventStreamConsumer(
114+
IConsumer<TKey, TEvent> consumer,
115+
string topic,
116+
ISchemaRegistryClient schemaRegistryClient = null)
117+
{
118+
if (string.IsNullOrWhiteSpace(topic))
119+
throw new ArgumentException("Value cannot be null or whitespace.", nameof(topic));
120+
121+
_consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
122+
Topic = topic;
123+
_schemaRegistryClient = schemaRegistryClient;
124+
_ownsSchemaRegistryClient = false;
125+
_consumerStartSemaphore = new SemaphoreSlim(1, 1);
126+
_cts = new CancellationTokenSource();
127+
_channel = Channel.CreateBounded<Message<TKey, TEvent>>(1);
128+
}
129+
130+
/// <summary>
131+
/// Gets the topic name.
132+
/// </summary>
133+
public string Topic { get; }
134+
135+
/// <summary>
136+
/// Occurs when the consumer fails.
137+
/// </summary>
138+
public event EventHandler<ExceptionEventArgs> ConsumerFailed;
139+
140+
/// <inheritdoc />
141+
public virtual async Task<(TKey key, TEvent item)> ConsumeOrDefaultAsync(CancellationToken cancellationToken)
142+
{
143+
await StartConsumerTaskIfNotAsync(cancellationToken);
144+
if (_channel.Reader.TryRead(out var message))
145+
{
146+
return (message.Key, message.Value);
147+
}
148+
149+
return default;
150+
}
151+
152+
/// <inheritdoc />
153+
public Task OpenAsync(CancellationToken cancellationToken)
154+
{
155+
return StartConsumerTaskIfNotAsync(cancellationToken);
156+
}
157+
158+
/// <inheritdoc />
159+
public virtual async Task CloseAsync(CancellationToken cancellationToken)
160+
{
161+
if (!_closed)
162+
{
163+
_closed = true;
164+
await _cts.CancelAsync();
165+
if (_consumerTask != null)
166+
{
167+
await _consumerTask;
168+
}
169+
_consumer.Close();
170+
}
171+
}
172+
173+
/// <inheritdoc />
174+
public void Dispose()
175+
{
176+
Dispose(true);
177+
GC.SuppressFinalize(this);
178+
}
179+
180+
protected virtual void Dispose(bool disposing)
181+
{
182+
if (disposing)
183+
{
184+
if (!_closed)
185+
{
186+
_consumer.Close();
187+
}
188+
189+
_consumer.Dispose();
190+
_cts.Dispose();
191+
_consumerStartSemaphore.Dispose();
192+
193+
if (_ownsSchemaRegistryClient)
194+
{
195+
_schemaRegistryClient?.Dispose();
196+
}
197+
}
198+
}
199+
200+
private async Task StartConsumerTaskIfNotAsync(CancellationToken cancellationToken)
201+
{
202+
if (_consumerTask != null) return;
203+
204+
await _consumerStartSemaphore.WaitAsync(cancellationToken);
205+
try
206+
{
207+
if (_consumerTask == null)
208+
{
209+
_consumerTask = Task
210+
.Factory
211+
.StartNew(
212+
() => ConsumeAsync(_cts.Token),
213+
TaskCreationOptions.LongRunning)
214+
.Unwrap();
215+
}
216+
}
217+
finally
218+
{
219+
_consumerStartSemaphore.Release();
220+
}
221+
}
222+
223+
private async Task ConsumeAsync(CancellationToken cancellationToken)
224+
{
225+
if (_consumer.Subscription.All(s => s != Topic))
226+
{
227+
_consumer.Subscribe(Topic);
228+
}
229+
230+
while (!cancellationToken.IsCancellationRequested)
231+
{
232+
try
233+
{
234+
var result = _consumer.Consume(cancellationToken);
235+
await _channel.Writer.WriteAsync(result.Message, cancellationToken);
236+
}
237+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
238+
{
239+
break;
240+
}
241+
catch (Exception ex)
242+
{
243+
var handler = ConsumerFailed;
244+
if (handler != null)
245+
{
246+
handler.Invoke(this, new ExceptionEventArgs(ex));
247+
}
248+
else
249+
{
250+
Trace.TraceError("An unhandled exception occurred on KafkaSchemaRegistryEventStreamConsumer: {0}", ex);
251+
}
252+
}
253+
}
254+
255+
_consumer.Unsubscribe();
256+
_channel.Writer.Complete();
257+
}
258+
}
259+
}
260+

0 commit comments

Comments
 (0)