Skip to content

Commit f1b2e0b

Browse files
feat: Add network support to the Kafka container (#1316)
Co-authored-by: Andre Hofmeister <[email protected]>
1 parent 63422de commit f1b2e0b

9 files changed

+300
-5
lines changed

Directory.Packages.props

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
<PackageVersion Include="Azure.Storage.Blobs" Version="12.17.0"/>
3737
<PackageVersion Include="Azure.Storage.Queues" Version="12.15.0"/>
3838
<PackageVersion Include="ClickHouse.Client" Version="7.9.1"/>
39-
<PackageVersion Include="Confluent.Kafka" Version="2.0.2"/>
39+
<PackageVersion Include="Confluent.Kafka" Version="2.8.0"/>
40+
<PackageVersion Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.8.0"/>
41+
<PackageVersion Include="Confluent.SchemaRegistry" Version="2.8.0"/>
4042
<PackageVersion Include="Consul" Version="1.6.10.9"/>
4143
<PackageVersion Include="CouchbaseNetClient" Version="3.6.4"/>
4244
<PackageVersion Include="DotPulsar" Version="3.3.2"/>

src/Testcontainers.Kafka/KafkaBuilder.cs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@ public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer
1010

1111
public const ushort BrokerPort = 9093;
1212

13+
public const ushort ControllerPort = 9094;
14+
1315
public const ushort ZookeeperPort = 2181;
1416

1517
public const string StartupScriptFilePath = "/testcontainers.sh";
1618

19+
private const string ProtocolPrefix = "TC";
20+
1721
/// <summary>
1822
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
1923
/// </summary>
@@ -43,6 +47,49 @@ public override KafkaContainer Build()
4347
return new KafkaContainer(DockerResourceConfiguration);
4448
}
4549

50+
/// <summary>
51+
/// Adds a listener to the Kafka configuration in the format <c>host:port</c>.
52+
/// </summary>
53+
/// <remarks>
54+
/// The host will be included as a network alias, allowing additional connections
55+
/// to the Kafka broker within the same container network.
56+
///
57+
/// This method is useful for registering custom listeners beyond the default ones,
58+
/// enabling specific connection points for Kafka brokers.
59+
///
60+
/// Default listeners include:
61+
/// - <c>PLAINTEXT://0.0.0.0:9092</c>
62+
/// - <c>BROKER://0.0.0.0:9093</c>
63+
/// - <c>CONTROLLER://0.0.0.0:9094</c>
64+
/// </remarks>
65+
/// <param name="kafka">The MsSql database.</param>
66+
/// <returns>A configured instance of <see cref="KafkaBuilder" />.</returns>
67+
public KafkaBuilder WithListener(string kafka)
68+
{
69+
var index = DockerResourceConfiguration.Listeners?.Count() ?? 0;
70+
var protocol = $"{ProtocolPrefix}-{index}";
71+
var listener = $"{protocol}://{kafka}";
72+
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";
73+
74+
var listeners = new[] { listener };
75+
var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };
76+
77+
var host = kafka.Split(':')[0];
78+
79+
var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
80+
.Split(',')
81+
.Concat(listeners);
82+
83+
var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
84+
.Split(',')
85+
.Concat(listenersSecurityProtocolMap);
86+
87+
return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
88+
.WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
89+
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
90+
.WithNetworkAliases(host);
91+
}
92+
4693
/// <inheritdoc />
4794
protected override KafkaBuilder Init()
4895
{
@@ -51,10 +98,12 @@ protected override KafkaBuilder Init()
5198
.WithPortBinding(KafkaPort, true)
5299
.WithPortBinding(BrokerPort, true)
53100
.WithPortBinding(ZookeeperPort, true)
54-
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort)
55-
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
101+
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
102+
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
56103
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
57104
.WithEnvironment("KAFKA_BROKER_ID", "1")
105+
.WithEnvironment("KAFKA_NODE_ID", "1")
106+
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
58107
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
59108
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
60109
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
@@ -68,6 +117,7 @@ protected override KafkaBuilder Init()
68117
.WithStartupCallback((container, ct) =>
69118
{
70119
const char lf = '\n';
120+
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
71121
var startupScript = new StringBuilder();
72122
startupScript.Append("#!/bin/bash");
73123
startupScript.Append(lf);
@@ -79,7 +129,7 @@ protected override KafkaBuilder Init()
79129
startupScript.Append(lf);
80130
startupScript.Append("zookeeper-server-start zookeeper.properties &");
81131
startupScript.Append(lf);
82-
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort);
132+
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners);
83133
startupScript.Append(lf);
84134
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
85135
startupScript.Append(lf);

src/Testcontainers.Kafka/KafkaConfiguration.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@ public sealed class KafkaConfiguration : ContainerConfiguration
77
/// <summary>
88
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
99
/// </summary>
10-
public KafkaConfiguration()
10+
/// <param name="listeners">A list of listeners.</param>
11+
/// <param name="advertisedListeners">A list of advertised listeners.</param>
12+
public KafkaConfiguration(
13+
IEnumerable<string> listeners = null,
14+
IEnumerable<string> advertisedListeners = null)
1115
{
16+
Listeners = listeners;
17+
AdvertisedListeners = advertisedListeners;
1218
}
1319

1420
/// <summary>
@@ -49,5 +55,17 @@ public KafkaConfiguration(KafkaConfiguration resourceConfiguration)
4955
public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue)
5056
: base(oldValue, newValue)
5157
{
58+
Listeners = BuildConfiguration.Combine(oldValue.Listeners, newValue.Listeners);
59+
AdvertisedListeners = BuildConfiguration.Combine(oldValue.AdvertisedListeners, newValue.AdvertisedListeners);
5260
}
61+
62+
/// <summary>
63+
/// Gets a list of listeners.
64+
/// </summary>
65+
public IEnumerable<string> Listeners { get; }
66+
67+
/// <summary>
68+
/// Gets a list of advertised listeners.
69+
/// </summary>
70+
public IEnumerable<string> AdvertisedListeners { get; }
5371
}

src/Testcontainers.Kafka/KafkaContainer.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ namespace Testcontainers.Kafka;
44
[PublicAPI]
55
public sealed class KafkaContainer : DockerContainer
66
{
7+
private readonly KafkaConfiguration _configuration;
8+
79
/// <summary>
810
/// Initializes a new instance of the <see cref="KafkaContainer" /> class.
911
/// </summary>
1012
/// <param name="configuration">The container configuration.</param>
1113
public KafkaContainer(KafkaConfiguration configuration)
1214
: base(configuration)
1315
{
16+
_configuration = configuration;
1417
}
1518

1619
/// <summary>
@@ -21,4 +24,15 @@ public string GetBootstrapAddress()
2124
{
2225
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
2326
}
27+
28+
/// <summary>
29+
/// Gets a list of advertised listeners.
30+
/// </summary>
31+
public IEnumerable<string> AdvertisedListeners
32+
{
33+
get
34+
{
35+
return _configuration.AdvertisedListeners;
36+
}
37+
}
2438
}

src/Testcontainers.Kafka/Usings.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
global using System;
2+
global using System.Collections.Generic;
3+
global using System.Linq;
24
global using System.Text;
35
global using Docker.DotNet.Models;
46
global using DotNet.Testcontainers.Builders;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
namespace Testcontainers.Kafka;
2+
3+
public sealed class KafkaContainerNetworkTest : IAsyncLifetime
4+
{
5+
private const string Message = "Message produced by kafkacat";
6+
7+
private const string Listener = "kafka:19092";
8+
9+
private const string DataFilePath = "/data/msgs.txt";
10+
11+
private readonly INetwork _network;
12+
13+
private readonly IContainer _kafkaContainer;
14+
15+
private readonly IContainer _kCatContainer;
16+
17+
public KafkaContainerNetworkTest()
18+
{
19+
_network = new NetworkBuilder()
20+
.Build();
21+
22+
_kafkaContainer = new KafkaBuilder()
23+
.WithImage("confluentinc/cp-kafka:6.1.9")
24+
.WithNetwork(_network)
25+
.WithListener(Listener)
26+
.Build();
27+
28+
_kCatContainer = new ContainerBuilder()
29+
.WithImage("confluentinc/cp-kafkacat:6.1.9")
30+
.WithNetwork(_network)
31+
.WithEntrypoint(CommonCommands.SleepInfinity)
32+
.WithResourceMapping(Encoding.Default.GetBytes(Message), DataFilePath)
33+
.Build();
34+
}
35+
36+
public async Task InitializeAsync()
37+
{
38+
await _kafkaContainer.StartAsync()
39+
.ConfigureAwait(false);
40+
41+
await _kCatContainer.StartAsync()
42+
.ConfigureAwait(false);
43+
}
44+
45+
public async Task DisposeAsync()
46+
{
47+
await _kafkaContainer.StartAsync()
48+
.ConfigureAwait(false);
49+
50+
await _kCatContainer.StartAsync()
51+
.ConfigureAwait(false);
52+
53+
await _network.DisposeAsync()
54+
.ConfigureAwait(false);
55+
}
56+
57+
[Fact]
58+
public async Task ConsumesProducedKafkaMessage()
59+
{
60+
_ = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-t", "msgs", "-P", "-l", DataFilePath })
61+
.ConfigureAwait(true);
62+
63+
var execResult = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-C", "-t", "msgs", "-c", "1" })
64+
.ConfigureAwait(true);
65+
66+
Assert.Equal(Message, execResult.Stdout.Trim());
67+
}
68+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
namespace Testcontainers.Kafka;
2+
3+
public sealed class KafkaContainerRegistryTest : IAsyncLifetime
4+
{
5+
private const string Schema = @"
6+
{
7+
""$schema"": ""http://json-schema.org/draft-04/schema#"",
8+
""title"": ""User"",
9+
""type"": ""object"",
10+
""additionalProperties"": false,
11+
""properties"": {
12+
""FirstName"": {
13+
""type"": [""null"", ""string""]
14+
},
15+
""LastName"": {
16+
""type"": [""null"", ""string""]
17+
}
18+
}
19+
}";
20+
21+
private const ushort RestPort = 8085;
22+
23+
private const string SchemaRegistryNetworkAlias = "schema-registry";
24+
25+
private const string Listener = "kafka:19092";
26+
27+
private readonly INetwork _network;
28+
29+
private readonly KafkaContainer _kafkaContainer;
30+
31+
private readonly IContainer _schemaRegistryContainer;
32+
33+
public KafkaContainerRegistryTest()
34+
{
35+
_network = new NetworkBuilder()
36+
.Build();
37+
38+
_kafkaContainer = new KafkaBuilder()
39+
.WithImage("confluentinc/cp-kafka:6.1.9")
40+
.WithNetwork(_network)
41+
.WithListener(Listener)
42+
.Build();
43+
44+
_schemaRegistryContainer = new ContainerBuilder()
45+
.WithImage("confluentinc/cp-schema-registry:6.1.9")
46+
.WithPortBinding(RestPort, true)
47+
.WithNetwork(_network)
48+
.WithNetworkAliases(SchemaRegistryNetworkAlias)
49+
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + RestPort)
50+
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
51+
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + Listener)
52+
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", SchemaRegistryNetworkAlias)
53+
.WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(request =>
54+
request.ForPort(RestPort).ForPath("/subjects")))
55+
.Build();
56+
}
57+
58+
public async Task InitializeAsync()
59+
{
60+
await _kafkaContainer.StartAsync()
61+
.ConfigureAwait(false);
62+
63+
await _schemaRegistryContainer.StartAsync()
64+
.ConfigureAwait(false);
65+
}
66+
67+
public async Task DisposeAsync()
68+
{
69+
await _kafkaContainer.StartAsync()
70+
.ConfigureAwait(false);
71+
72+
await _schemaRegistryContainer.StartAsync()
73+
.ConfigureAwait(false);
74+
75+
await _network.DisposeAsync()
76+
.ConfigureAwait(false);
77+
}
78+
79+
[Fact]
80+
public async Task ConsumerReturnsProducerMessage()
81+
{
82+
// Given
83+
const string topic = "user";
84+
85+
var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topic);
86+
87+
var bootstrapServer = _kafkaContainer.GetBootstrapAddress();
88+
89+
var producerConfig = new ProducerConfig();
90+
producerConfig.BootstrapServers = bootstrapServer;
91+
92+
var consumerConfig = new ConsumerConfig();
93+
consumerConfig.BootstrapServers = bootstrapServer;
94+
consumerConfig.GroupId = "sample-consumer";
95+
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
96+
97+
var message = new Message<string, User>();
98+
message.Value = new User("John", "Doe");
99+
100+
var schemaRegistryConfig = new SchemaRegistryConfig();
101+
schemaRegistryConfig.Url = new UriBuilder(Uri.UriSchemeHttp, _schemaRegistryContainer.Hostname, _schemaRegistryContainer.GetMappedPublicPort(RestPort)).ToString();
102+
103+
// When
104+
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
105+
_ = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(Schema, SchemaType.Json))
106+
.ConfigureAwait(true);
107+
108+
using var producer = new ProducerBuilder<string, User>(producerConfig)
109+
.SetValueSerializer(new JsonSerializer<User>(schemaRegistry))
110+
.Build();
111+
112+
_ = await producer.ProduceAsync(topic, message)
113+
.ConfigureAwait(true);
114+
115+
using var consumer = new ConsumerBuilder<string, User>(consumerConfig)
116+
.SetValueDeserializer(new JsonDeserializer<User>().AsSyncOverAsync())
117+
.Build();
118+
119+
consumer.Subscribe(topic);
120+
121+
var result = consumer.Consume(TimeSpan.FromSeconds(15));
122+
123+
// Then
124+
Assert.NotNull(result);
125+
Assert.Equal(message.Value, result.Message.Value);
126+
}
127+
128+
private record User(string FirstName, string LastName);
129+
}

tests/Testcontainers.Kafka.Tests/Testcontainers.Kafka.Tests.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
<PackageReference Include="xunit.runner.visualstudio"/>
1111
<PackageReference Include="xunit"/>
1212
<PackageReference Include="Confluent.Kafka"/>
13+
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Json"/>
14+
<PackageReference Include="Confluent.SchemaRegistry"/>
1315
</ItemGroup>
1416
<ItemGroup>
1517
<ProjectReference Include="../../src/Testcontainers.Kafka/Testcontainers.Kafka.csproj"/>

0 commit comments

Comments
 (0)