diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index cd2a660c8e0..6eee68d4dbe 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -54,6 +54,18 @@ Create a `KafkaContainer` to use it in your tests: [Creating a KafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java) inside_block:constructorWithVersion +### withNetworkAliases + +The first alias host name defined using `withNetworkAliases` configures port 9092 on the network set by `withNetwork`. Example for `kafka:9092`: + + +[Creating a KafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java) inside_block:registerAlias + + +This works with `org.testcontainers.kafka.KafkaContainer` and `org.testcontainers.kafka.ConfluentKafkaContainer` but not with deprecated `org.testcontainers.containers.KafkaContainer`. + +It only works if `withListener` is not used and `KAFKA_BROKER_ID` environment variable is not set. + ## Options ### Using external Zookeeper diff --git a/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java index 42942cdd3a5..74c9aa1ea4d 100644 --- a/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/kafka/ConfluentKafkaContainer.java @@ -48,13 +48,22 @@ protected void configure() { @Override protected void containerIsStarting(InspectContainerResponse containerInfo) { + List networkAliases = getNetworkAliases(); + String plaintextAdvertisedListener; + if (!getEnvMap().containsKey("KAFKA_BROKER_ID") && listeners.isEmpty() && networkAliases.size() > 1) { + // 0 is the random network alias generated by GenericContainer + plaintextAdvertisedListener = "PLAINTEXT://" + networkAliases.get(1) + ":" + KafkaHelper.KAFKA_PORT; + } else { + plaintextAdvertisedListener = "PLAINTEXT://" + getBootstrapServers(); + } + String brokerAdvertisedListener = String.format( "BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9093" ); List advertisedListeners = new ArrayList<>(); - advertisedListeners.add("PLAINTEXT://" + getBootstrapServers()); + advertisedListeners.add(plaintextAdvertisedListener); advertisedListeners.add(brokerAdvertisedListener); advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners)); @@ -87,8 +96,15 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { *

* Default advertised listeners: *

+ *

+ * Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used, + * and {@code KAFKA_BROKER_ID} environment variable is not set: + *

* @param listener a listener with format {@code host:port} * @return this {@link ConfluentKafkaContainer} instance @@ -118,8 +134,15 @@ public ConfluentKafkaContainer withListener(String listener) { *

* Default advertised listeners: *

+ *

+ * Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used, + * and {@code KAFKA_BROKER_ID} environment variable is not set: + *

* @param listener a supplier that will provide a listener * @param advertisedListener a supplier that will provide a listener diff --git a/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java index e946e0a8992..2f60bb11e78 100644 --- a/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java @@ -54,18 +54,27 @@ protected void configure() { @Override protected void containerIsStarting(InspectContainerResponse containerInfo) { + List networkAliases = getNetworkAliases(); + String plaintextAdvertisedListener; + if (!getEnvMap().containsKey("KAFKA_BROKER_ID") && listeners.isEmpty() && networkAliases.size() > 1) { + // 0 is the random network alias generated by GenericContainer + plaintextAdvertisedListener = "PLAINTEXT://" + networkAliases.get(1) + ":" + KafkaHelper.KAFKA_PORT; + } else { + plaintextAdvertisedListener = "PLAINTEXT://" + getBootstrapServers(); + } + String brokerAdvertisedListener = String.format( "BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9093" ); + List advertisedListeners = new ArrayList<>(); - advertisedListeners.add("PLAINTEXT://" + getBootstrapServers()); + advertisedListeners.add(plaintextAdvertisedListener); advertisedListeners.add(brokerAdvertisedListener); advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners)); String kafkaAdvertisedListeners = String.join(",", advertisedListeners); - String command = "#!/bin/bash\n"; // exporting KAFKA_ADVERTISED_LISTENERS with the container hostname command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners); @@ -93,8 +102,15 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { *

* Default advertised listeners: *

    - *
  • {@code container.getConfig().getHostName():9092}
  • - *
  • {@code container.getHost():container.getMappedPort(9093)}
  • + *
  • {@code container.getConfig().getHostName():container.getMappedPort(9092)}
  • + *
  • {@code container.getHost():9093}
  • + *
+ *

+ * Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used, + * and {@code KAFKA_BROKER_ID} environment variable is not set: + *

    + *
  • {@code first network alias:9092}
  • + *
  • {@code container.getHost():9093}
  • *
* @param listener a listener with format {@code host:port} * @return this {@link KafkaContainer} instance @@ -124,8 +140,15 @@ public KafkaContainer withListener(String listener) { *

* Default advertised listeners: *

    - *
  • {@code container.getConfig().getHostName():9092}
  • - *
  • {@code container.getHost():container.getMappedPort(9093)}
  • + *
  • {@code container.getConfig().getHostName():container.getMappedPort(9092)}
  • + *
  • {@code container.getHost():9093}
  • + *
+ *

+ * Default advertised listeners if {@code withNetworkAliases} is used, {@code withListener} is not used, + * and {@code KAFKA_BROKER_ID} environment variable is not set: + *

    + *
  • {@code first network alias:9092}
  • + *
  • {@code container.getHost():9093}
  • *
* @param listener a supplier that will provide a listener * @param advertisedListener a supplier that will provide a listener diff --git a/modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java b/modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java index 90977e61c18..c3d50e025b8 100644 --- a/modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java +++ b/modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java @@ -1,6 +1,7 @@ package org.testcontainers; import com.google.common.collect.ImmutableMap; +import lombok.SneakyThrows; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -15,6 +16,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.awaitility.Awaitility; +import org.testcontainers.containers.Network; import java.time.Duration; import java.util.Collection; @@ -150,4 +152,16 @@ protected static String getJaasConfig() { "user_test=\"secret\";"; return jaasConfig; } + + @SneakyThrows + protected void assertKafka(String listener, Network network) { + try (KCatContainer kcat = new KCatContainer().withNetwork(network)) { + kcat.start(); + + kcat.execInContainer("kcat", "-b", listener, "-t", "msgs", "-P", "-l", "/data/msgs.txt"); + String stdout = kcat.execInContainer("kcat", "-b", listener, "-C", "-t", "msgs", "-c", "1").getStdout(); + + assertThat(stdout).contains("Message produced by kcat"); + } + } } diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index 49b21108da8..7fddc3ce7bc 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -185,6 +185,19 @@ public void testKraftPrecedenceOverEmbeddedZookeeper() throws Exception { } } + @Test + public void testUsageWithNetworkAlias() { + try ( + Network network = Network.newNetwork(); + KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE) + .withNetworkAliases("mykafka") + .withNetwork(network) + ) { + kafka.start(); + assertKafka("mykafka:9092", network); + } + } + @Test public void testUsageWithListener() throws Exception { try ( diff --git a/modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java index 725dc2d2a62..26371402e32 100644 --- a/modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java @@ -9,8 +9,6 @@ import org.testcontainers.containers.SocatContainer; import org.testcontainers.utility.MountableFile; -import static org.assertj.core.api.Assertions.assertThat; - public class ConfluentKafkaContainerTest extends AbstractKafka { @Test @@ -24,6 +22,19 @@ public void testUsage() throws Exception { } } + @Test + public void testUsageWithNetworkAlias() { + try ( + Network network = Network.newNetwork(); + ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0") + .withNetworkAliases("mykafka") + .withNetwork(network) + ) { + kafka.start(); + assertKafka("mykafka:9092", network); + } + } + @Test public void testUsageWithListener() throws Exception { try ( @@ -36,14 +47,7 @@ public void testUsageWithListener() throws Exception { KCatContainer kcat = new KCatContainer().withNetwork(network) ) { kafka.start(); - kcat.start(); - - kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"); - String stdout = kcat - .execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1") - .getStdout(); - - assertThat(stdout).contains("Message produced by kcat"); + assertKafka("kafka:19092", network); } } diff --git a/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java index e81b52574ef..f56709c5ec3 100644 --- a/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java @@ -2,12 +2,9 @@ import org.junit.Test; import org.testcontainers.AbstractKafka; -import org.testcontainers.KCatContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.SocatContainer; -import static org.assertj.core.api.Assertions.assertThat; - public class KafkaContainerTest extends AbstractKafka { @Test @@ -22,7 +19,22 @@ public void testUsage() throws Exception { } @Test - public void testUsageWithListener() throws Exception { + public void testUsageWithNetworkAlias() { + try ( + // registerAlias { + Network network = Network.newNetwork(); + KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0") + .withNetworkAliases("kafka") + .withNetwork(network); + // } + ) { + kafka.start(); + assertKafka("kafka:9092", network); + } + } + + @Test + public void testUsageWithListener() { try ( Network network = Network.newNetwork(); // registerListener { @@ -30,17 +42,9 @@ public void testUsageWithListener() throws Exception { .withListener("kafka:19092") .withNetwork(network); // } - KCatContainer kcat = new KCatContainer().withNetwork(network) ) { kafka.start(); - kcat.start(); - - kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"); - String stdout = kcat - .execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1") - .getStdout(); - - assertThat(stdout).contains("Message produced by kcat"); + assertKafka("kafka:19092", network); } }