Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kafka add kraft support #1353

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 181 additions & 37 deletions src/Testcontainers.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
using Testcontainers.Kafka.Vendors;

namespace Testcontainers.Kafka;

/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
[PublicAPI]
public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer, KafkaConfiguration>
{
public const string KafkaImage = "confluentinc/cp-kafka:6.1.9";
public const string KafkaImage = "confluentinc/cp-kafka:7.5.1";

public const string KafkaNodeId = "1";

public const ushort KafkaPort = 9092;

Expand All @@ -14,10 +18,18 @@ public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer

public const ushort ZookeeperPort = 2181;

public const string ClusterId = "4L6g3nShT-eMCtK--X86sw";

public const string StartupScriptFilePath = "/testcontainers.sh";

private const string ProtocolPrefix = "TC";

private static readonly IKafkaVendor[] Vendors =
[
new ConfluentVendor(),
new ApacheKafkaVendor(),
];

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
Expand All @@ -44,25 +56,58 @@ private KafkaBuilder(KafkaConfiguration resourceConfiguration)
public override KafkaContainer Build()
{
Validate();
return new KafkaContainer(DockerResourceConfiguration);

var kafkaVendor = GetKafkaVendor();

var kafkaBuilder = new KafkaBuilder(DockerResourceConfiguration);
if (!DockerResourceConfiguration.ConsensusProtocol.HasValue)
{
kafkaBuilder = kafkaBuilder
.WithConsensusProtocol(kafkaVendor.DefaultConsensusProtocol);
}

kafkaVendor.ValidateConfigurationAndThrow(kafkaBuilder.DockerResourceConfiguration);

kafkaBuilder = kafkaBuilder.DockerResourceConfiguration.ConsensusProtocol switch
{
KafkaConsensusProtocol.KRaft => kafkaBuilder.WithKRaftSupport(kafkaVendor),
KafkaConsensusProtocol.Zookeeper => kafkaBuilder.WithZookeeperSupport(kafkaVendor),
_ => throw new ArgumentOutOfRangeException(nameof(DockerResourceConfiguration.ConsensusProtocol)),
};
return new KafkaContainer(kafkaBuilder.DockerResourceConfiguration);
}

/// <summary>
/// Adds a listener to the Kafka configuration in the format <c>host:port</c>.
/// </summary>
/// <remarks>
/// <para>
/// The host will be included as a network alias, allowing additional connections
/// to the Kafka broker within the same container network.
///
/// </para>
///
/// <para>
/// This method is useful for registering custom listeners beyond the default ones,
/// enabling specific connection points for Kafka brokers.
///
/// </para>
///
/// <para>
/// Default listeners include:
/// - <c>PLAINTEXT://0.0.0.0:9092</c>
/// - <c>BROKER://0.0.0.0:9093</c>
/// - <c>CONTROLLER://0.0.0.0:9094</c>
/// </para>
///
/// <list type="bullet">
/// <item>
/// <description><c>PLAINTEXT://0.0.0.0:9092</c></description>
/// </item>
/// <item>
/// <description><c>BROKER://0.0.0.0:9093</c> (if Zookeeper is used)</description>
/// </item>
/// <item>
/// <description><c>CONTROLLER://0.0.0.0:9094</c></description>
/// </item>
/// </list>
/// </remarks>
/// <param name="kafka">The MsSql database.</param>
/// <param name="kafka">Kafka connection string</param>
/// <returns>A configured instance of <see cref="KafkaBuilder" />.</returns>
public KafkaBuilder WithListener(string kafka)
{
Expand Down Expand Up @@ -90,52 +135,151 @@ public KafkaBuilder WithListener(string kafka)
.WithNetworkAliases(host);
}

/// <summary>
/// Configures the Kafka to use the KRaft consensus protocol instead of Zookeeper.
/// </summary>
/// <returns>An updated instance of the <see cref="KafkaBuilder"/> configured with KRaft.</returns>
public KafkaBuilder WithKRaft()
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(
consensusProtocol: KafkaConsensusProtocol.KRaft));
}

/// <summary>
/// Configures the Kafka to use Zookeeper as the consensus protocol.<br/>
/// If no external Zookeeper connection string is provided, a default local Zookeeper instance will be set up within the container
/// if supported.
/// </summary>
/// <param name="connectionString">The optional external Zookeeper connection string. If <c>null</c>, a default local setup will be used.</param>
/// <returns>An updated instance of the <see cref="KafkaBuilder"/> configured with Zookeeper.</returns>
public KafkaBuilder WithZookeeper(string? connectionString = null)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(
consensusProtocol: KafkaConsensusProtocol.Zookeeper,
externalZookeeperConnectionString: connectionString));
}

private KafkaBuilder WithConsensusProtocol(KafkaConsensusProtocol consensusProtocol)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(
consensusProtocol: consensusProtocol));
}

/// <summary>
/// Configures the Kafka container with the specified cluster ID.
/// </summary>
/// <param name="clusterId">The unique identifier for the Kafka cluster.</param>
/// <returns>The current <see cref="KafkaBuilder"/> instance configured with the specified cluster ID.</returns>
public KafkaBuilder WithClusterId(string clusterId)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration())
.WithEnvironment("CLUSTER_ID", clusterId);
}

/// <summary>
/// Explicitly sets the Kafka Docker image vendor. Use this method only when the image vendor cannot be automatically detected
/// from the provided image name. This allows the container to be set up properly since different vendors have different base configurations.
/// </summary>
/// <remarks>
/// This method is typically required when using a custom Kafka image that is built on top of an existing base image,
/// such as a Confluent or Apache Kafka base image. By specifying the vendor, the appropriate configurations
/// for the Kafka container can be applied.
/// <para>
/// Automatic detection of the vendor is based on the image name. However, if the image name does not clearly
/// indicate the vendor, this method allows you to manually specify the correct one.
/// </para>
/// </remarks>
/// <param name="imageVendor">The Kafka image vendor to use for configuration.</param>
/// <returns>
/// A configured instance of the <see cref="KafkaBuilder"/> class with the supplied image vendor settings.
/// </returns>
public KafkaBuilder WithImageVendor(KafkaImageVendor imageVendor)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(imageVendor: imageVendor));
}

private IKafkaVendor GetKafkaVendor()
{
if (!DockerResourceConfiguration.ImageVendor.HasValue)
{
var detectedVendor = Vendors.FirstOrDefault(v => v.ImageBelongsToVendor(DockerResourceConfiguration.Image));
if (detectedVendor is not null)
{
return detectedVendor;
}

// Using Confluent one for backward compatibility
return Vendors.Single(x => x.ImageVendor == KafkaImageVendor.Confluent);
}

return Vendors.Single(x => x.ImageVendor == DockerResourceConfiguration.ImageVendor);
}

private KafkaBuilder WithKRaftSupport(IKafkaVendor kafkaVendor)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration())
.WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller")
.WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(".*Transitioning from RECOVERY to RUNNING.*"))
.WithStartupCallback((container, ct) =>
{
var startupScript = kafkaVendor.GetStartupScript(new StartupScriptContext
{
Container = container,
Configuration = DockerResourceConfiguration,
});
return container.CopyAsync(Encoding.Default.GetBytes(startupScript), StartupScriptFilePath, Unix.FileMode755, ct);
});
}

private KafkaBuilder WithZookeeperSupport(IKafkaVendor kafkaVendor)
{
var kafkaBuilder = Merge(DockerResourceConfiguration, new KafkaConfiguration())
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(@"\[KafkaServer id=\d+\] started"))
.WithStartupCallback((container, ct) =>
{
var startupScript = kafkaVendor.GetStartupScript(new StartupScriptContext
{
Container = container,
Configuration = DockerResourceConfiguration,
});
return container.CopyAsync(Encoding.Default.GetBytes(startupScript), StartupScriptFilePath, Unix.FileMode755, ct);
});

if (DockerResourceConfiguration.ExternalZookeeperConnectionString is null)
{
return kafkaBuilder
.WithPortBinding(ZookeeperPort, true)
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", $"localhost:{ZookeeperPort}");
}

// External Zookeeper instance is provided. There is no need to expose Zookeeper port ourselves.
return kafkaBuilder
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", DockerResourceConfiguration.ExternalZookeeperConnectionString);
}

/// <inheritdoc />
protected override KafkaBuilder Init()
{
return base.Init()
.WithImage(KafkaImage)
.WithPortBinding(KafkaPort, true)
.WithPortBinding(BrokerPort, true)
.WithPortBinding(ZookeeperPort, true)
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://:{KafkaPort},BROKER://:{BrokerPort},CONTROLLER://:{ControllerPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
.WithEnvironment("KAFKA_NODE_ID", KafkaNodeId)
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", $"{KafkaNodeId}@localhost:{ControllerPort}")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
.WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort)
.WithEnvironment("CLUSTER_ID", ClusterId)
.WithEntrypoint("/bin/sh", "-c")
.WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath)
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaServer id=\\d+\\] started"))
.WithStartupCallback((container, ct) =>
{
const char lf = '\n';
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
startupScript.Append("echo 'clientPort=" + ZookeeperPort + "' > zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("zookeeper-server-start zookeeper.properties &");
startupScript.Append(lf);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners);
startupScript.Append(lf);
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
startupScript.Append(lf);
startupScript.Append("exec /etc/confluent/docker/run");
return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StartupScriptFilePath, Unix.FileMode755, ct);
});
.WithCommand($"while [ ! -f {StartupScriptFilePath} ]; do sleep 0.1; done; {StartupScriptFilePath}");
}

/// <inheritdoc />
Expand Down
35 changes: 31 additions & 4 deletions src/Testcontainers.Kafka/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@ public sealed class KafkaConfiguration : ContainerConfiguration
/// </summary>
/// <param name="listeners">A list of listeners.</param>
/// <param name="advertisedListeners">A list of advertised listeners.</param>
/// <param name="imageVendor">Kafka image vendor.</param>
/// <param name="consensusProtocol">A consensus protocol to use.</param>
/// <param name="externalZookeeperConnectionString">A connection string to an external Zookeeper.</param>
public KafkaConfiguration(
IEnumerable<string> listeners = null,
IEnumerable<string> advertisedListeners = null)
IEnumerable<string>? listeners = null,
IEnumerable<string>? advertisedListeners = null,
KafkaImageVendor? imageVendor = null,
KafkaConsensusProtocol? consensusProtocol = null,
string? externalZookeeperConnectionString = null)
{
Listeners = listeners;
AdvertisedListeners = advertisedListeners;
ImageVendor = imageVendor;
ConsensusProtocol = consensusProtocol;
ExternalZookeeperConnectionString = externalZookeeperConnectionString;
}

/// <summary>
Expand Down Expand Up @@ -57,15 +66,33 @@ public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newVal
{
Listeners = BuildConfiguration.Combine(oldValue.Listeners, newValue.Listeners);
AdvertisedListeners = BuildConfiguration.Combine(oldValue.AdvertisedListeners, newValue.AdvertisedListeners);
ImageVendor = BuildConfiguration.Combine(oldValue.ImageVendor, newValue.ImageVendor);
ConsensusProtocol = BuildConfiguration.Combine(oldValue.ConsensusProtocol, newValue.ConsensusProtocol);
ExternalZookeeperConnectionString = BuildConfiguration.Combine(oldValue.ExternalZookeeperConnectionString, newValue.ExternalZookeeperConnectionString);
}

/// <summary>
/// Gets a list of listeners.
/// </summary>
public IEnumerable<string> Listeners { get; }
public IEnumerable<string>? Listeners { get; }

/// <summary>
/// Gets a list of advertised listeners.
/// </summary>
public IEnumerable<string> AdvertisedListeners { get; }
public IEnumerable<string>? AdvertisedListeners { get; }

/// <summary>
/// Gets the Kafka image vendor.
/// </summary>
public KafkaImageVendor? ImageVendor { get; }

/// <summary>
/// Gets consensus protocol to use in Kafka.
/// </summary>
public KafkaConsensusProtocol? ConsensusProtocol { get; }

/// <summary>
/// Gets the external Zookeeper connection string.
/// </summary>
public string? ExternalZookeeperConnectionString { get; }
}
7 changes: 7 additions & 0 deletions src/Testcontainers.Kafka/KafkaConsensusProtocol.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Testcontainers.Kafka;

public enum KafkaConsensusProtocol
{
Zookeeper,
KRaft,
}
8 changes: 1 addition & 7 deletions src/Testcontainers.Kafka/KafkaContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,5 @@ public string GetBootstrapAddress()
/// <summary>
/// Gets a list of advertised listeners.
/// </summary>
public IEnumerable<string> AdvertisedListeners
{
get
{
return _configuration.AdvertisedListeners;
}
}
public IEnumerable<string>? AdvertisedListeners => _configuration.AdvertisedListeners;
}
8 changes: 8 additions & 0 deletions src/Testcontainers.Kafka/KafkaImageVendor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Testcontainers.Kafka;

[PublicAPI]
public enum KafkaImageVendor
{
Apache,
Confluent,
}
1 change: 1 addition & 0 deletions src/Testcontainers.Kafka/Testcontainers.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<PropertyGroup>
<TargetFrameworks>net8.0;net9.0;netstandard2.0;netstandard2.1</TargetFrameworks>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="JetBrains.Annotations" VersionOverride="2023.3.0" PrivateAssets="All"/>
Expand Down
Loading