Skip to content

Commit c78188b

Browse files
committed
admin client once for autoCreateSettingCache
1 parent 31e1acb commit c78188b

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/validator/CachedTopicsExistenceValidator.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,20 @@ class CachedTopicsExistenceValidator(kafkaConfig: KafkaConfig) extends TopicsExi
9595

9696
private def isAutoCreateEnabled: Boolean = autoCreateSettingCache.getOrCreate {
9797
val timeout = validatorConfig.adminClientTimeout.toMillis.toInt
98-
val randomKafkaNodeId = usingAdminClient {
99-
_.describeCluster(new DescribeClusterOptions().timeoutMs(timeout)).nodes().get().asScala.head.id().toString
100-
}
101-
usingAdminClient {
102-
_.describeConfigs(
103-
List(new ConfigResource(ConfigResource.Type.BROKER, randomKafkaNodeId)).asJava,
104-
new DescribeConfigsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt)
105-
)
98+
usingAdminClient { admin =>
99+
val randomKafkaNodeId = admin
100+
.describeCluster(new DescribeClusterOptions().timeoutMs(timeout))
101+
.nodes()
102+
.get()
103+
.asScala
104+
.head
105+
.id()
106+
.toString
107+
admin
108+
.describeConfigs(
109+
List(new ConfigResource(ConfigResource.Type.BROKER, randomKafkaNodeId)).asJava,
110+
new DescribeConfigsOptions().timeoutMs(timeout)
111+
)
106112
.values()
107113
.values()
108114
.asScala

0 commit comments

Comments
 (0)