Skip to content

Commit 3ac857a

Browse files
authored
[NU-2021] Improved Kafka metadata caching: common cache and caching topics when schemaless topics are enabled (#8116)
1 parent 57f9130 commit 3ac857a

File tree

20 files changed

+271
-204
lines changed

20 files changed

+271
-204
lines changed

designer/server/src/test/scala/pl/touk/nussknacker/ui/api/testing/SchemalessKafkaJsonTypeTests.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ class SchemalessKafkaJsonTypeTests
208208
"scenarioTypes.streaming.modelConfig.components.kafka.config.useDataSampleParamForSchemalessJsonTopicBasedKafkaSource",
209209
ConfigValueFactory.fromAnyRef(true)
210210
)
211+
.withValue(
212+
"scenarioTypes.streaming.modelConfig.components.kafka.config.kafkaAdminConfig.cacheConfig.topicsExpirationTime",
213+
ConfigValueFactory.fromAnyRef(0)
214+
)
211215

212216
lazy val defaultKafkaConfig: KafkaConfig = KafkaConfig(
213217
kafkaProperties = Some(Map("bootstrap.servers" -> kafkaContainer.bootstrapServers)),

docs/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ description: Stay informed with detailed changelogs covering new features, impro
189189
}
190190
```
191191
* [#7982](https://github.com/TouK/nussknacker/pull/7982) Mock expression added to enrichers (except decision-table) which can be used to hardcode enricher output in tests without calling external services.
192+
* [#8116](https://github.com/TouK/nussknacker/pull/8116) Improved Kafka metadata caching: common cache and caching topics when schemaless topics are enabled
192193
193194
## 1.18
194195

docs/MigrationGuide.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ To see the biggest differences please consult the [changelog](Changelog.md).
2929
should be added to distinguish the components. In the new version of the component, OpenAPI `Service` should be
3030
selected. For this reason, the existing scenario may not compile. The automatic migration for this parameter is not
3131
easy and has to be done manually or through updates in the DB if you can read the swagger service name.
32+
* [#8116](https://github.com/TouK/nussknacker/pull/8116) Improved Kafka metadata caching
33+
* Dependencies for `TopicSelectionStrategy` (`SchemaRegistryClient`, and `KafkaConfig`) should be passed via constructor.
34+
* `kafkaConfig.topicsExistenceValidationConfig.validatorConfig` and `kafkaConfig.topicsWithoutSchemaFetchTimeout` were moved to
35+
common Kafka admin configuration `kafkaConfig.kafkaAdminConfig` - see [Kafka integration](integration/KafkaIntegration.md)
36+
for more details. The properties were moved as follows:
37+
* `topicsExistenceValidationConfig.validatorConfig.autoCreateFlagFetchCacheTtl` -> `kafkaAdminConfig.cacheConfig.autoCreateTopicSettingExpirationTime`
38+
* `topicsExistenceValidationConfig.validatorConfig.topicsFetchCacheTtl` -> `kafkaAdminConfig.cacheConfig.topicsExpirationTime`
39+
* `topicsExistenceValidationConfig.validatorConfig.adminClientTimeout` -> `kafkaAdminConfig.clientTimeout`
40+
* `topicsWithoutSchemaFetchTimeout` -> `kafkaAdminConfig.clientTimeout`
3241

3342
### REST API changes
3443

docs/integration/KafkaIntegration.md

Lines changed: 24 additions & 25 deletions
Large diffs are not rendered by default.

engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/TopicSelectionStrategySpec.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
2424
private lazy val confluentClient = schemaRegistryClientFactory.create(kafkaConfig)
2525

2626
test("all topic strategy test") {
27-
val strategy = new TopicsWithExistingSubjectSelectionStrategy()
28-
strategy.getTopics(confluentClient, kafkaConfig).toList.map(_.toSet) shouldBe List(
27+
val strategy = new TopicsWithExistingSubjectSelectionStrategy(confluentClient)
28+
strategy.getTopics.toList.map(_.toSet) shouldBe List(
2929
Set(
3030
RecordTopic,
3131
RecordTopicWithKey,
@@ -40,8 +40,9 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
4040
}
4141

4242
test("topic filtering strategy test") {
43-
val strategy = new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*"))
44-
strategy.getTopics(confluentClient, kafkaConfig).toList shouldBe List(
43+
val strategy =
44+
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*"), confluentClient)
45+
strategy.getTopics.toList shouldBe List(
4546
List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey)
4647
)
4748
}
@@ -53,8 +54,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
5354
testModelConfig,
5455
new FlinkKafkaSourceImplFactory(None)
5556
) {
56-
override def topicSelectionStrategy =
57-
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*"))
57+
override lazy val topicSelectionStrategy =
58+
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*"), confluentClient)
5859
}
5960
}
6061

engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ abstract class FlinkWithKafkaSuite
141141
s"config.kafkaEspProperties.${AvroSerializersRegistrar.autoRegisterRecordSchemaIdSerializationProperty}",
142142
fromAnyRef(false)
143143
)
144+
.withValue("config.kafkaAdminConfig.cacheConfig.topicsExpirationTime", fromAnyRef(0))
144145
maybeAddSchemaRegistryUrl(config)
145146
}
146147

utils/kafka-components-utils/src/it/scala/pl/touk/nussknacker/engine/kafka/validator/CachedTopicsExistenceValidatorTest.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,32 @@ class CachedTopicsExistenceValidatorWhenAutoCreateDisabledTest
2020

2121
test("should validate existing source topic") {
2222
val topic = createUniqueSourceTopic()
23-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
23+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
2424
validator.validateTopic(topic) shouldBe Valid(topic)
2525
}
2626

2727
test("should validate not existing source topic") {
28-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
28+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
2929
validator.validateTopic(notExistingSourceTopic) shouldBe Invalid(
3030
TopicExistenceValidationException(NonEmptyList.one(notExistingSourceTopic))
3131
)
3232
}
3333

3434
test("should validate existing sink topic") {
3535
val topic = createUniqueSinkTopic()
36-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
36+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
3737
validator.validateTopic(topic) shouldBe Valid(topic)
3838
}
3939

4040
test("should validate not existing sink topic") {
41-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
41+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
4242
validator.validateTopic(notExistingSinkTopic) shouldBe Invalid(
4343
TopicExistenceValidationException(NonEmptyList.one(notExistingSinkTopic))
4444
)
4545
}
4646

4747
test("should not validate not existing topic when validation disabled") {
48-
val validator = new CachedTopicsExistenceValidator(
48+
val validator = CachedTopicsExistenceValidator(
4949
defaultKafkaConfig.copy(
5050
topicsExistenceValidationConfig = TopicsExistenceValidationConfig(enabled = false)
5151
)
@@ -55,7 +55,7 @@ class CachedTopicsExistenceValidatorWhenAutoCreateDisabledTest
5555

5656
test("should fetch topics every time when not valid using cache") {
5757
val notExistingYetTopicName = createUniqueSourceTopicName()
58-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
58+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
5959

6060
validator.validateTopic(notExistingYetTopicName) shouldBe Invalid(
6161
TopicExistenceValidationException(NonEmptyList.one(notExistingYetTopicName))
@@ -75,30 +75,30 @@ class CachedTopicsExistenceValidatorWhenAutoCreateEnabledTest
7575

7676
test("should validate existing source topic") {
7777
val topic = createUniqueSourceTopic()
78-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
78+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
7979
validator.validateTopic(topic) shouldBe Valid(topic)
8080
}
8181

8282
test("should not validate not existing source topic") {
83-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
83+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
8484
validator.validateTopic(notExistingSourceTopic) shouldBe Invalid(
8585
TopicExistenceValidationException(NonEmptyList.one(notExistingSourceTopic))
8686
)
8787
}
8888

8989
test("should validate existing sink topic") {
9090
val topic = createUniqueSinkTopic()
91-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
91+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
9292
validator.validateTopic(topic) shouldBe Valid(topic)
9393
}
9494

9595
test("should validate not existing sink topic") {
96-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
96+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
9797
validator.validateTopic(notExistingSinkTopic) shouldBe Valid(notExistingSinkTopic)
9898
}
9999

100100
test("should not validate not existing topic when validation disabled") {
101-
val validator = new CachedTopicsExistenceValidator(
101+
val validator = CachedTopicsExistenceValidator(
102102
defaultKafkaConfig.copy(
103103
topicsExistenceValidationConfig = TopicsExistenceValidationConfig(enabled = false)
104104
)
@@ -107,7 +107,7 @@ class CachedTopicsExistenceValidatorWhenAutoCreateEnabledTest
107107
}
108108

109109
test("should use cache when validating") {
110-
val validator = new CachedTopicsExistenceValidator(defaultKafkaConfig)
110+
val validator = CachedTopicsExistenceValidator(defaultKafkaConfig)
111111
validator.validateTopic(notExistingSinkTopic) shouldBe Valid(notExistingSinkTopic)
112112
container.stop()
113113
validator.validateTopic(notExistingSinkTopic) shouldBe Valid(notExistingSinkTopic)
@@ -134,7 +134,6 @@ abstract class BaseCachedTopicsExistenceValidatorTest(kafkaAutoCreateEnabled: Bo
134134
// longer timeout, as container might need some time to make initial assignments etc.
135135
topicsExistenceValidationConfig = TopicsExistenceValidationConfig(
136136
enabled = true,
137-
validatorConfig = CachedTopicsExistenceValidatorConfig.DefaultConfig.copy(adminClientTimeout = 30 seconds)
138137
)
139138
)
140139

utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaComponentsUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ object KafkaComponentsUtils extends KafkaUtils {
1212
topics: NonEmptyList[PreparedKafkaTopic[T]],
1313
kafkaConfig: KafkaConfig
1414
): Unit = {
15-
new CachedTopicsExistenceValidator(kafkaConfig = kafkaConfig)
15+
CachedTopicsExistenceValidator(kafkaConfig)
1616
.validateTopics(topics.map(_.prepared))
1717
.valueOr(err => throw err)
1818
}

utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaSourceFactory.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import pl.touk.nussknacker.engine.kafka._
1919
import pl.touk.nussknacker.engine.kafka.KafkaFactory.TopicParamName
2020
import pl.touk.nussknacker.engine.kafka.serialization.{KafkaDeserializationSchema, KafkaDeserializationSchemaFactory}
2121
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory._
22-
import pl.touk.nussknacker.engine.kafka.validator.WithCachedTopicsExistenceValidator
22+
import pl.touk.nussknacker.engine.kafka.validator.CachedTopicsExistenceValidator
2323

2424
import scala.reflect.ClassTag
2525

@@ -45,7 +45,6 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](
4545
protected val implProvider: KafkaSourceImplFactory[K, V]
4646
) extends SourceFactory
4747
with SingleInputDynamicComponent[Source]
48-
with WithCachedTopicsExistenceValidator
4948
with WithExplicitTypesToExtract
5049
with UnboundedStreamComponent {
5150

@@ -78,7 +77,11 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](
7877
)(implicit nodeId: NodeId): List[ProcessCompilationError.CustomNodeError] = {
7978
val topics = topicNamesFrom(topicsString)
8079
val preparedTopics = topics.map(KafkaComponentsUtils.prepareKafkaTopic(_, modelConfig)).map(_.prepared)
81-
validateTopics(preparedTopics).swap.toList.map(_.toCustomNodeError(nodeId.id, Some(TopicParamName)))
80+
cachedTopicsExistenceValidator
81+
.validateTopics(preparedTopics)
82+
.swap
83+
.toList
84+
.map(_.toCustomNodeError(nodeId.id, Some(TopicParamName)))
8285
}
8386

8487
protected def nextSteps(context: ValidationContext, dependencies: List[NodeDependencyValue])(
@@ -199,7 +202,9 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](
199202
override def nodeDependencies: List[NodeDependency] =
200203
List(TypedNodeDependency[MetaData], TypedNodeDependency[NodeId], OutputVariableNameDependency)
201204

202-
override protected val kafkaConfig: KafkaConfig = KafkaConfig.parseConfig(modelConfig.underlyingConfig)
205+
protected val kafkaConfig: KafkaConfig = KafkaConfig.parseConfig(modelConfig.underlyingConfig)
206+
207+
protected lazy val cachedTopicsExistenceValidator = CachedTopicsExistenceValidator(kafkaConfig)
203208

204209
private def topicNamesFrom(value: String) = {
205210
val topicsList = value.split(topicNameSeparator).map(_.trim).map(TopicName.ForSource.apply).toList

utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/validator/CachedTopicsExistenceValidator.scala

Lines changed: 20 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,42 +3,25 @@ package pl.touk.nussknacker.engine.kafka.validator
33
import cats.data.{NonEmptyList, Validated}
44
import cats.data.Validated.{Invalid, Valid}
55
import com.typesafe.scalalogging.LazyLogging
6-
import org.apache.kafka.clients.admin.{Admin, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions}
7-
import org.apache.kafka.common.config.ConfigResource
86
import pl.touk.nussknacker.engine.api.process.TopicName
9-
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, UnspecializedTopicName}
7+
import pl.touk.nussknacker.engine.kafka.{CachingKafkaAdminClient, KafkaConfig, KafkaUtils, UnspecializedTopicName}
108
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName.ToUnspecializedTopicName
119
import pl.touk.nussknacker.engine.kafka.validator.TopicsExistenceValidator.TopicValidationType
12-
import pl.touk.nussknacker.engine.util.cache.SingleValueCache
1310

14-
import scala.jdk.CollectionConverters._
11+
object CachedTopicsExistenceValidator {
1512

16-
trait WithCachedTopicsExistenceValidator extends TopicsExistenceValidator {
17-
18-
protected val kafkaConfig: KafkaConfig
19-
20-
private lazy val validator = new CachedTopicsExistenceValidator(kafkaConfig)
21-
22-
final override def validateTopics[T <: TopicName: TopicValidationType](
23-
topics: NonEmptyList[T]
24-
): Validated[TopicExistenceValidationException[T], NonEmptyList[T]] =
25-
validator.validateTopics(topics)
13+
def apply(kafkaConfig: KafkaConfig): CachedTopicsExistenceValidator = {
14+
new CachedTopicsExistenceValidator(
15+
kafkaConfig.topicsExistenceValidationConfig.enabled,
16+
KafkaUtils.createCachingAdminClient(kafkaConfig)
17+
)
18+
}
2619

2720
}
2821

29-
class CachedTopicsExistenceValidator(kafkaConfig: KafkaConfig) extends TopicsExistenceValidator with LazyLogging {
30-
31-
private val validatorConfig = kafkaConfig.topicsExistenceValidationConfig.validatorConfig
32-
33-
@transient private lazy val autoCreateSettingCache = new SingleValueCache[Boolean](
34-
expireAfterAccess = None,
35-
expireAfterWrite = Some(validatorConfig.autoCreateFlagFetchCacheTtl)
36-
)
37-
38-
@transient private lazy val topicsCache = new SingleValueCache[Set[UnspecializedTopicName]](
39-
expireAfterAccess = None,
40-
expireAfterWrite = Some(validatorConfig.topicsFetchCacheTtl)
41-
)
22+
class CachedTopicsExistenceValidator(enabled: Boolean, cachingKafkaAdminClient: CachingKafkaAdminClient)
23+
extends TopicsExistenceValidator
24+
with LazyLogging {
4225

4326
def validateTopics[T <: TopicName: TopicValidationType](
4427
topics: NonEmptyList[T]
@@ -50,26 +33,30 @@ class CachedTopicsExistenceValidator(kafkaConfig: KafkaConfig) extends TopicsExi
5033
}
5134

5235
private def validateSourceTopics[T <: TopicName: TopicValidationType](topics: NonEmptyList[T]) = {
53-
if (kafkaConfig.topicsExistenceValidationConfig.enabled) {
36+
if (enabled) {
5437
doValidate(topics)
5538
} else {
5639
Valid(topics)
5740
}
5841
}
5942

6043
private def validateSinkTopics[T <: TopicName: TopicValidationType](topics: NonEmptyList[T]) = {
61-
if (kafkaConfig.topicsExistenceValidationConfig.enabled && !isAutoCreateEnabled) {
44+
if (enabled && !cachingKafkaAdminClient.getOrFetchAutoCreateTopicsSetting) {
6245
doValidate(topics)
6346
} else {
6447
Valid(topics)
6548
}
6649
}
6750

6851
private def doValidate[T <: TopicName](topics: NonEmptyList[T]) = {
69-
topicsCache.get() match {
52+
cachingKafkaAdminClient.getCachedTopics match {
7053
case Some(cachedTopics) if doAllExist(topics, cachedTopics).isRight =>
7154
Valid(topics)
72-
case Some(_) | None =>
55+
case Some(_) =>
56+
logger.debug("Topics do not exist, fetch topics from Kafka and validate")
57+
fetchTopicsAndValidate(topics)
58+
case None =>
59+
logger.debug("Empty topics cache, fetch topics from Kafka and validate")
7360
fetchTopicsAndValidate(topics)
7461
}
7562
}
@@ -86,47 +73,11 @@ class CachedTopicsExistenceValidator(kafkaConfig: KafkaConfig) extends TopicsExi
8673
}
8774

8875
private def fetchTopicsAndValidate[T <: TopicName](requestedTopics: NonEmptyList[T]) = {
89-
val existingTopics = fetchAllTopicsAndCache()
76+
val existingTopics = cachingKafkaAdminClient.fetchFreshTopicsAndCache
9077
doAllExist(requestedTopics, existingTopics) match {
9178
case Right(()) => Valid(requestedTopics)
9279
case Left(notExistingTopics) => Invalid(TopicExistenceValidationException(notExistingTopics))
9380
}
9481
}
9582

96-
private def fetchAllTopicsAndCache() = {
97-
val existingTopics = usingAdminClient {
98-
_.listTopics(new ListTopicsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt))
99-
.names()
100-
.get()
101-
.asScala
102-
.toSet
103-
.map(UnspecializedTopicName.apply)
104-
}
105-
topicsCache.put(existingTopics)
106-
existingTopics
107-
}
108-
109-
private def isAutoCreateEnabled: Boolean = autoCreateSettingCache.getOrCreate {
110-
val timeout = validatorConfig.adminClientTimeout.toMillis.toInt
111-
val randomKafkaNodeId = usingAdminClient {
112-
_.describeCluster(new DescribeClusterOptions().timeoutMs(timeout)).nodes().get().asScala.head.id().toString
113-
}
114-
usingAdminClient {
115-
_.describeConfigs(
116-
List(new ConfigResource(ConfigResource.Type.BROKER, randomKafkaNodeId)).asJava,
117-
new DescribeConfigsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt)
118-
)
119-
.values()
120-
.values()
121-
.asScala
122-
.map(_.get())
123-
.head // we ask for config of one node, but `describeConfigs` api have `List` of nodes, so here we got single element list
124-
.get("auto.create.topics.enable")
125-
.value()
126-
.toBoolean
127-
}
128-
}
129-
130-
private def usingAdminClient[T]: (Admin => T) => T = KafkaUtils.usingAdminClient[T](kafkaConfig)
131-
13283
}

0 commit comments

Comments
 (0)