Skip to content

Commit 37e3d6d

Browse files
Merge pull request #41 from Aiven-Open/update-testkit
added overrides to kafak startup
2 parents 4fa9dc0 + 6c1bff9 commit 37e3d6d

3 files changed

Lines changed: 19 additions & 16 deletions

File tree

kafka-testkit/src/main/java/io/aiven/commons/kafka/testkit/KafkaConnectRunner.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,10 @@ public String getGroupId() {
148148
* @throws IOException
149149
* if listener ports can not be found.
150150
*/
151-
public void startConnectCluster(final String clusterName, final Class<? extends Connector> connectorClass)
152-
throws IOException {
151+
public void startConnectCluster(final String clusterName, final Class<? extends Connector> connectorClass,
152+
Map<String, String> configOverrides) throws IOException {
153153
final List<Integer> ports = findListenerPorts();
154-
startConnectCluster(clusterName, ports.get(0), ports.get(1), connectorClass);
154+
startConnectCluster(clusterName, ports.get(0), ports.get(1), connectorClass, configOverrides);
155155
}
156156

157157
/**
@@ -167,7 +167,7 @@ public void startConnectCluster(final String clusterName, final Class<? extends
167167
* the class for the connector.
168168
*/
169169
public void startConnectCluster(final String clusterName, final int localPort, final int containerPort,
170-
final Class<? extends Connector> connectorClass) {
170+
final Class<? extends Connector> connectorClass, Map<String, String> configOverrides) {
171171
this.clusterName = clusterName;
172172
this.containerListenerPort = containerPort;
173173
final Properties brokerProperties = new Properties();
@@ -178,7 +178,7 @@ public void startConnectCluster(final String clusterName, final int localPort, f
178178
brokerProperties.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,TESTCONTAINERS:PLAINTEXT");
179179

180180
connectCluster = new EmbeddedConnectCluster.Builder().name(clusterName).brokerProps(brokerProperties)
181-
.workerProps(getWorkerProperties(connectorClass)).numWorkers(1).build();
181+
.workerProps(getWorkerProperties(connectorClass, configOverrides)).numWorkers(1).build();
182182
connectCluster.start();
183183
LOGGER.info("connectCluster {} started", clusterName);
184184
}
@@ -286,14 +286,15 @@ public void restartConnector(final String connectorName) {
286286
* the connector class to start, may be {@code null}.
287287
* @return the default set of worker properties.
288288
*/
289-
public Map<String, String> getWorkerProperties(final Class<? extends Connector> connectorClass) {
289+
public Map<String, String> getWorkerProperties(final Class<? extends Connector> connectorClass,
290+
final Map<String, String> configOverrides) {
290291
Map<String, String> workerProperties = new HashMap<>();
291292
workerProperties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getName());
292293
workerProperties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getCanonicalName());
293294
workerProperties.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG,
294295
Long.toString(offsetFlushInterval.toMillis()));
295296
workerProperties.put("plugin.discovery", "HYBRID_WARN");
296-
297+
workerProperties.putAll(configOverrides);
297298
if (connectorClass != null) {
298299
workerProperties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
299300
}

kafka-testkit/src/main/java/io/aiven/commons/kafka/testkit/KafkaIntegrationTestBase.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.time.Duration;
3131
import java.util.Collection;
3232
import java.util.Locale;
33+
import java.util.Map;
3334
import java.util.UUID;
3435
import java.util.concurrent.ExecutionException;
3536
import java.util.function.Supplier;
@@ -181,9 +182,9 @@ void captureTestInfo(final TestInfo testInfo) {
181182
* @throws InterruptedException
182183
* on interrupted thread.
183184
*/
184-
final protected KafkaManager setupKafka(final Class<? extends Connector> connectorClass)
185-
throws IOException, ExecutionException, InterruptedException {
186-
return setupKafka(false, connectorClass);
185+
final protected KafkaManager setupKafka(final Class<? extends Connector> connectorClass,
186+
Map<String, String> configOverrides) throws IOException, ExecutionException, InterruptedException {
187+
return setupKafka(false, connectorClass, configOverrides);
187188
}
188189

189190
/**
@@ -201,8 +202,8 @@ final protected KafkaManager setupKafka(final Class<? extends Connector> connect
201202
* @throws IOException
202203
* on IO error.
203204
*/
204-
final protected KafkaManager setupKafka(final boolean forceRestart, final Class<? extends Connector> connectorClass)
205-
throws IOException {
205+
final protected KafkaManager setupKafka(final boolean forceRestart, final Class<? extends Connector> connectorClass,
206+
Map<String, String> configOverrides) throws IOException {
206207
KafkaManager kafkaManager = KAFKA_MANAGER_THREAD_LOCAL.get();
207208
if (kafkaManager != null && forceRestart) {
208209
tearDownKafka();
@@ -212,7 +213,7 @@ final protected KafkaManager setupKafka(final boolean forceRestart, final Class<
212213
final String clusterName = new CasedString(CasedString.StringCase.CAMEL,
213214
testInfo.getTestClass().get().getSimpleName()).toCase(CasedString.StringCase.KEBAB)
214215
.toLowerCase(Locale.ROOT);
215-
kafkaManager = new KafkaManager(clusterName, getOffsetFlushInterval(), connectorClass);
216+
kafkaManager = new KafkaManager(clusterName, getOffsetFlushInterval(), connectorClass, configOverrides);
216217
KAFKA_MANAGER_THREAD_LOCAL.set(kafkaManager);
217218
}
218219
return kafkaManager;
@@ -301,11 +302,12 @@ protected final <K> void waitForStorage(final Duration timeout, final Supplier<C
301302
* @param <K>
302303
* the data type of the storage value. (must implement equals).
303304
*/
305+
@SuppressWarnings("PMD.ClassCastExceptionWithToArray")
304306
protected final <K> void waitForStorage(final Duration timeout, final Supplier<Collection<K>> storageList,
305307
final Collection<K> expectedStorage) {
306308
// wait for them to show up.
307309
await().atMost(timeout).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
308-
assertThat(storageList.get()).containsExactlyInAnyOrderElementsOf(expectedStorage);
310+
assertThat(storageList.get()).contains((K[]) expectedStorage.toArray());
309311
});
310312
}
311313
}

kafka-testkit/src/main/java/io/aiven/commons/kafka/testkit/KafkaManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ public final class KafkaManager {
6161
* if the cluster can not be started.
6262
*/
6363
public KafkaManager(final String clusterName, final Duration offsetFlushInterval,
64-
final Class<? extends Connector> connectorClass) throws IOException {
64+
final Class<? extends Connector> connectorClass, Map<String, String> configOverrides) throws IOException {
6565
connectRunner = new KafkaConnectRunner(offsetFlushInterval);
66-
connectRunner.startConnectCluster(clusterName, connectorClass);
66+
connectRunner.startConnectCluster(clusterName, connectorClass, configOverrides);
6767

6868
final Map<String, Object> adminClientConfig = new HashMap<>();
6969
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectRunner.getBootstrapServers());

0 commit comments

Comments
 (0)