Skip to content

Commit 51f4aec

Browse files
committed
Kafka Admin Client: creating once instead of many times
1 parent e7008c8 commit 51f4aec

File tree

5 files changed

+57
-52
lines changed

5 files changed

+57
-52
lines changed

engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala

+7-6
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
2424
private lazy val confluentClient = schemaRegistryClientFactory.create(kafkaConfig)
2525

2626
test("all topic strategy test") {
27-
val strategy = new TopicsWithExistingSubjectSelectionStrategy()
28-
strategy.getTopics(confluentClient, kafkaConfig).toList.map(_.toSet) shouldBe List(
27+
val strategy = new TopicsWithExistingSubjectSelectionStrategy(confluentClient)
28+
strategy.getTopics.toList.map(_.toSet) shouldBe List(
2929
Set(
3030
RecordTopic,
3131
RecordTopicWithKey,
@@ -40,8 +40,9 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
4040
}
4141

4242
test("topic filtering strategy test") {
43-
val strategy = new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*"))
44-
strategy.getTopics(confluentClient, kafkaConfig).toList shouldBe List(
43+
val strategy =
44+
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile(".*Record.*"))
45+
strategy.getTopics.toList shouldBe List(
4546
List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey)
4647
)
4748
}
@@ -53,8 +54,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
5354
testModelDependencies,
5455
new FlinkKafkaSourceImplFactory(None)
5556
) {
56-
override def topicSelectionStrategy =
57-
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*"))
57+
override lazy val topicSelectionStrategy =
58+
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(confluentClient, Pattern.compile("test-.*"))
5859
}
5960
}
6061

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala

+15-5
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,13 @@ import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, TopicN
1313
import pl.touk.nussknacker.engine.api.validation.ValidationMode
1414
import pl.touk.nussknacker.engine.api.{NodeId, Params}
1515
import pl.touk.nussknacker.engine.kafka.validator.WithCachedTopicsExistenceValidator
16-
import pl.touk.nussknacker.engine.kafka.{KafkaComponentsUtils, KafkaConfig, PreparedKafkaTopic, UnspecializedTopicName}
16+
import pl.touk.nussknacker.engine.kafka.{
17+
KafkaComponentsUtils,
18+
KafkaConfig,
19+
KafkaUtils,
20+
PreparedKafkaTopic,
21+
UnspecializedTopicName
22+
}
1723
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._
1824
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher
1925
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName._
@@ -47,10 +53,14 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
4753
@transient protected lazy val schemaRegistryClient: SchemaRegistryClient =
4854
schemaRegistryClientFactory.create(kafkaConfig)
4955

50-
protected def topicSelectionStrategy: TopicSelectionStrategy = {
56+
@transient protected lazy val topicSelectionStrategy: TopicSelectionStrategy = {
5157
if (kafkaConfig.showTopicsWithoutSchema) {
52-
new AllNonHiddenTopicsSelectionStrategy
53-
} else new TopicsWithExistingSubjectSelectionStrategy
58+
new AllNonHiddenTopicsSelectionStrategy(
59+
schemaRegistryClient,
60+
KafkaUtils.createKafkaAdminClient(kafkaConfig),
61+
kafkaConfig.topicsWithoutSchemaFetchTimeout
62+
)
63+
} else new TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient)
5464
}
5565

5666
@transient protected lazy val kafkaConfig: KafkaConfig = prepareKafkaConfig
@@ -67,7 +77,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
6777
protected def getTopicParam(
6878
implicit nodeId: NodeId
6979
): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
70-
val topics = topicSelectionStrategy.getTopics(schemaRegistryClient, kafkaConfig)
80+
val topics = topicSelectionStrategy.getTopics
7181

7282
(topics match {
7383
case Valid(topics) => Writer[List[ProcessCompilationError], List[UnspecializedTopicName]](Nil, topics)
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,53 @@
11
package pl.touk.nussknacker.engine.schemedkafka
22

33
import cats.data.Validated
4-
import org.apache.kafka.clients.admin.ListTopicsOptions
4+
import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions}
55
import org.apache.kafka.common.KafkaException
66
import org.apache.kafka.common.errors.TimeoutException
7-
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, UnspecializedTopicName}
7+
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
88
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryError}
99

1010
import java.util.concurrent.ExecutionException
1111
import java.util.regex.Pattern
12+
import scala.concurrent.duration.FiniteDuration
1213
import scala.jdk.CollectionConverters._
1314

1415
trait TopicSelectionStrategy extends Serializable {
1516

16-
def getTopics(
17-
schemaRegistryClient: SchemaRegistryClient,
18-
kafkaConfig: KafkaConfig
19-
): Validated[SchemaRegistryError, List[UnspecializedTopicName]]
17+
def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]]
2018

2119
}
2220

23-
class TopicsWithExistingSubjectSelectionStrategy extends TopicSelectionStrategy {
21+
class TopicsWithExistingSubjectSelectionStrategy(schemaRegistryClient: SchemaRegistryClient)
22+
extends TopicSelectionStrategy {
2423

25-
override def getTopics(
26-
schemaRegistryClient: SchemaRegistryClient,
27-
kafkaConfig: KafkaConfig
28-
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
24+
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
2925
schemaRegistryClient.getAllTopics
3026
}
3127

3228
}
3329

34-
class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
30+
// TODO: Close client
31+
class AllNonHiddenTopicsSelectionStrategy(
32+
schemaRegistryClient: SchemaRegistryClient,
33+
kafkaAdminClient: Admin,
34+
fetchTimeout: FiniteDuration
35+
) extends TopicSelectionStrategy {
3536

36-
override def getTopics(
37-
schemaRegistryClient: SchemaRegistryClient,
38-
kafkaConfig: KafkaConfig
39-
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
37+
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
4038
val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics
4139

4240
val schemaLessTopics: List[UnspecializedTopicName] = {
4341
try {
44-
KafkaUtils.usingAdminClient(kafkaConfig) {
45-
_.listTopics(new ListTopicsOptions().timeoutMs(kafkaConfig.topicsWithoutSchemaFetchTimeout.toMillis.toInt))
46-
.names()
47-
.get()
48-
.asScala
49-
.toSet
50-
.map(UnspecializedTopicName.apply)
51-
.filterNot(topic => topic.name.startsWith("_"))
52-
.toList
53-
}
42+
kafkaAdminClient
43+
.listTopics(new ListTopicsOptions().timeoutMs(fetchTimeout.toMillis.toInt))
44+
.names()
45+
.get()
46+
.asScala
47+
.toSet
48+
.map(UnspecializedTopicName.apply)
49+
.filterNot(topic => topic.name.startsWith("_"))
50+
.toList
5451
} catch {
5552
// In some tests we pass dummy kafka address, so when we try to get topics from kafka it fails
5653
case err: ExecutionException =>
@@ -68,13 +65,12 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
6865

6966
}
7067

71-
class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(val topicPattern: Pattern)
72-
extends TopicSelectionStrategy {
68+
class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(
69+
schemaRegistryClient: SchemaRegistryClient,
70+
topicPattern: Pattern
71+
) extends TopicSelectionStrategy {
7372

74-
override def getTopics(
75-
schemaRegistryClient: SchemaRegistryClient,
76-
kafkaConfig: KafkaConfig
77-
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
73+
override def getTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
7874
schemaRegistryClient.getAllTopics.map(_.filter(topic => topicPattern.matcher(topic.name).matches()))
7975

8076
}

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ trait SchemaRegistryClient extends Serializable {
4040

4141
def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]]
4242

43+
// FIXME: strategy created once
4344
def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = {
4445
if (!kafkaConfig.showTopicsWithoutSchema) {
4546
true
4647
} else {
47-
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
48+
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy(this).getTopics
4849
topicsWithSchema.exists(_.map(_.name).contains(topic))
4950
}
5051
}

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaRegistryClientFactory.scala

+5-8
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
4444
.credential(credential)
4545
.buildClient()
4646

47+
// TODO: close it
48+
private val kafkaAdminClient = KafkaUtils.createKafkaAdminClient(KafkaConfig(Some(config.kafkaProperties), None))
49+
4750
// We need to create our own schemas service because some operations like schema listing are not exposed by default client
4851
// or even its Schemas inner class. Others like listing of versions are implemented incorrectly (it has wrong json field name in model)
4952
private val enhancedSchemasService = new EnhancedSchemasImpl(
@@ -105,7 +108,7 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
105108
}
106109

107110
override def getAllTopics: Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
108-
val topics = fetchTopics(KafkaConfig(Some(config.kafkaProperties), None))
111+
val topics = fetchTopics
109112
val matchStrategy = SchemaNameTopicMatchStrategy(topics)
110113
getAllFullSchemaNames.map(matchStrategy.getAllMatchingTopics(_, isKey = false))
111114
}
@@ -117,13 +120,7 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
117120
getOneMatchingSchemaName(topicName, isKey).andThen(getVersions)
118121
}
119122

120-
private def fetchTopics(kafkaConfig: KafkaConfig) = {
121-
KafkaUtils
122-
.usingAdminClient(kafkaConfig) { admin =>
123-
admin.listTopics().names().get().asScala.toList
124-
}
125-
.map(UnspecializedTopicName.apply)
126-
}
123+
private def fetchTopics = kafkaAdminClient.listTopics().names().get().asScala.toList.map(UnspecializedTopicName.apply)
127124

128125
private def getOneMatchingSchemaName(
129126
topicName: UnspecializedTopicName,

0 commit comments

Comments
 (0)