Skip to content

Commit 1d6ef19

Browse files
committed
Refactored
1 parent b8c64b6 commit 1d6ef19

File tree

3 files changed

+14
-44
lines changed

3 files changed

+14
-44
lines changed

src/integrationTest/scala/com/segence/kafka/connect/kafka/EmbeddedKafkaSpecSupport.scala

-33
This file was deleted.

src/integrationTest/scala/com/segence/kafka/connect/kafka/KafkaConnectorSpec.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@ import io.confluent.kafka.serializers.{AbstractKafkaSchemaSerDeConfig, KafkaAvro
66
import io.github.embeddedkafka.schemaregistry.connect.EmbeddedKafkaConnect._
77
import io.github.embeddedkafka.schemaregistry.EmbeddedKafkaConfig
88
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
9+
import org.scalatest.wordspec.AnyWordSpecLike
10+
import org.scalatest.matchers.should.Matchers._
911

1012
import java.nio.file.Files
1113
import scala.jdk.CollectionConverters._
1214
import scala.util.{Failure, Success}
1315
import scala.concurrent.duration._
1416
import scala.language.postfixOps
1517

16-
class KafkaConnectorSpec extends EmbeddedKafkaSpecSupport {
18+
class KafkaConnectorSpec extends AnyWordSpecLike {
1719

1820
implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
1921
schemaRegistryPort = 6002

src/main/java/com/segence/kafka/connect/kafka/KafkaSinkTask.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ public class KafkaSinkTask extends SinkTask {
3535
private Converter keyConverter;
3636
private Converter valueConverter;
3737

38+
private Converter instantiateConverter(Map<String, String> configuration, ConnectorConfigurationEntry configKeyName) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
39+
final var clazz = getClass().getClassLoader().loadClass(
40+
configuration.get(configKeyName.getConfigKeyName())
41+
);
42+
return (Converter) clazz.getDeclaredConstructor().newInstance();
43+
}
44+
3845
/**
3946
* Returns the Kafka topic set
4047
*
@@ -91,24 +98,18 @@ public void start(Map<String, String> configuration) {
9198
}
9299

93100
try {
94-
final var clazz = getClass().getClassLoader().loadClass(
95-
configuration.get(ConnectorConfigurationEntry.KEY_CONVERTER_CLASS.getConfigKeyName())
96-
);
97-
keyConverter = (Converter) clazz.getDeclaredConstructor().newInstance();
101+
keyConverter = instantiateConverter(configuration, ConnectorConfigurationEntry.KEY_CONVERTER_CLASS);
98102
keyConverter.configure(ConnectorConfiguration.getKeyConverterProperties(configuration), true);
99-
LOGGER.debug("Instantiated Key Converter class: {}", clazz);
103+
LOGGER.debug("Instantiated Key Converter class: {}", keyConverter.getClass().getCanonicalName());
100104
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
101105
| InstantiationException | InvocationTargetException e) {
102106
throw new IllegalArgumentException(e);
103107
}
104108

105109
try {
106-
final var clazz = getClass().getClassLoader().loadClass(
107-
configuration.get(ConnectorConfigurationEntry.VALUE_CONVERTER_CLASS.getConfigKeyName())
108-
);
109-
valueConverter = (Converter) clazz.getDeclaredConstructor().newInstance();
110+
valueConverter = instantiateConverter(configuration, ConnectorConfigurationEntry.VALUE_CONVERTER_CLASS);
110111
valueConverter.configure(ConnectorConfiguration.getValueConverterProperties(configuration), false);
111-
LOGGER.debug("Instantiated Value Converter class: {}", clazz);
112+
LOGGER.debug("Instantiated Value Converter class: {}", valueConverter.getClass().getCanonicalName());
112113
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
113114
| InstantiationException | InvocationTargetException e) {
114115
throw new IllegalArgumentException(e);

0 commit comments

Comments
 (0)