Description
Describe the bug
When I have a topic and a schema defined in my resource file, and the schema registry I use has 1000s of subjects, the commands A/ take a very long time B/ fail with a NPE on topic management.
Additional info: I have observed, via DEBUG, that the client seems to list all the subjects, and retrieve all the schemas, every single time. Not sure why that is. The SR management should only mean to retrieve the existing subject, versions, if not found, create the first version, if found, try update to have a new version with the new schema definition, along with checking compatibility, and possibly setting compatibility on the subject, etc. But it does not need to pull all the schemas.
To Reproduce
Try to create/update a topic on a cluster and small (empty/few) schema registry -> Works
Try the same with a schema registry that has 1000s of subjects -> fails with NPE below
Remove/comment out the Schema from the resource file-> works fine for the topic management.
Expected behavior
My topic is created/updated and the operations for the schema should work too.
Screenshots/Configs
---
apiVersion: kafka.jikkou.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic-name.v1
labels: {}
annotations: {}
spec:
partitions: 1
replicas: 3
configs:
min.insync.replicas: 2
#---
#apiVersion: schemaregistry.jikkou.io/v1beta2
#kind: SchemaRegistrySubject
#metadata:
# name: my-topic-name.v1-value
# labels:
# annotations:
# schemaregistry.jikkou.io/normalize-schema: true
#spec:
# compatibilityLevel: FULL_TRANSITIVE
# schemaType: AVRO
# schema:
# $ref: ./schemas/user-test.avsc
jikkou apply -f topics/private.yaml
java.lang.NullPointerException
at io.streamthoughts.jikkou.kafka.change.topics.CreateTopicChangeHandler.toNewTopic(CreateTopicChangeHandler.java:118)
at [email protected]/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at [email protected]/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at [email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at [email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at [email protected]/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at [email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at [email protected]/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at io.streamthoughts.jikkou.kafka.change.topics.CreateTopicChangeHandler.handleChanges(CreateTopicChangeHandler.java:74)
at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.execute(DefaultChangeExecutor.java:104)
at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.lambda$execute$3(DefaultChangeExecutor.java:97)
at [email protected]/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)
at [email protected]/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)
at [email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at [email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at [email protected]/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at [email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at [email protected]/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.execute(DefaultChangeExecutor.java:99)
at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.applyChanges(DefaultChangeExecutor.java:72)
at io.streamthoughts.jikkou.kafka.reconciler.AdminClientKafkaTopicController.execute(AdminClientKafkaTopicController.java:104)
at io.streamthoughts.jikkou.core.reconciler.Reconciler.apply(Reconciler.java:57)
at io.streamthoughts.jikkou.core.DefaultApi.patch(DefaultApi.java:426)
at io.streamthoughts.jikkou.core.DefaultApi.reconcile(DefaultApi.java:396)
at io.streamthoughts.jikkou.client.command.reconcile.BaseResourceCommand.call(BaseResourceCommand.java:54)
at io.streamthoughts.jikkou.client.command.reconcile.BaseResourceCommand.call(BaseResourceCommand.java:30)
at picocli.CommandLine.executeUserObject(CommandLine.java:2045)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
at io.streamthoughts.jikkou.client.Jikkou.executionStrategy(Jikkou.java:150)
at picocli.CommandLine.execute(CommandLine.java:2174)
at io.streamthoughts.jikkou.client.Jikkou.execute(Jikkou.java:140)
at io.streamthoughts.jikkou.client.Jikkou.main(Jikkou.java:128)
at [email protected]/java.lang.invoke.LambdaForm$DMH/sa346b79c.invokeStaticInit(LambdaForm$DMH)
Error: NullPointerException: null
Configuration is as you'd expect, with SASL_SSL/PLAIN and the registry details set to use Confluent Cloud URL, basic auth.
Runtime environment
- OS: Fedora
- Jikkou: 0.35.4
- Kafka Cluster Version: 3.6+
- SR: Confluent Cloud