Skip to content

Cannot create custom artifact and schema resolvers from Quarkus based camel-kafka application #4514

Open
@shuawest

Description

@shuawest

A Quarkus based camel-kafka application runs into classloader errors when trying to specify custom schema and artifact resolvers.

The expected behavior is having classloading handled by the client for quarkus based applications, or a documented set of constraints, or an example for using Quarkus + Camel-Kafka + Apicurio.

[1] Example quarkus camel-kafka application:
https://github.com/shuawest/apicurio-sandbox/tree/main/streaming-app/src/main/java

[2] Example kafka routes with apicurio configured for custom resolvers:

String kdestA = KafkaUriBuilder.create(ProducerService.TOPIC_A)
            .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
            .appendProperty("clientId", "producerA")
            .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
            .appendProperty("maxRequestSize", "5242880")
            .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
            .appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
            .appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "samplea")
            .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
            //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
            .value();
    
        String kdestB = KafkaUriBuilder.create(ProducerService.TOPIC_B)
            .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
            .appendProperty("clientId", "producerB")
            .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
            .appendProperty("maxRequestSize", "5242880")
            .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
            .appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
            .appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "sampleb")
            .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
            //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
            .value();
    
        log.info("Kafka connection A: {}\n\n\n", kdestA);
        log.info("Kafka connection B: {}\n\n\n", kdestB);

        from("timer:producerTimerA?repeatCount=1000&delay=1s&period=1s")
            .bean(this, "genA")
            //.log("producer timer fired ${headers.genCount}:\n${body}");
            .to(kdestA);

        from("timer:producerTimerB?repeatCount=1000&delay=1s&period=1s")
            .bean(this, "genB")
            //.log("producer timer fired ${headers.genCount}:\n${body}");
            .to(kdestB);

[3] Classloading exception:

2021-07-14 16:55:15,979 ERROR [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Error starting CamelContext (camel-1) due to exception thrown: Failed to start route route3 because of null: org.apache.camel.FailedToStartRouteException: Failed to start route route3 because of null
	at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:123)
	at org.apache.camel.impl.engine.InternalRouteStartupManager.doWarmUpRoutes(InternalRouteStartupManager.java:306)
	at org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:189)
	at org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:147)
	at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:3150)
	at org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2846)
	at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2797)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
	at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2492)
	at org.apache.camel.quarkus.core.CamelContextRuntime.start(CamelContextRuntime.java:57)
	at org.apache.camel.quarkus.core.CamelBootstrapRecorder.start(CamelBootstrapRecorder.java:45)
	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy_0(CamelBootstrapProcessor$boot-173480958.zig:101)
	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy(CamelBootstrapProcessor$boot-173480958.zig:40)
	at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:806)
	at io.quarkus.runtime.Application.start(Application.java:90)
	at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:100)
	at io.quarkus.runtime.Quarkus.run(Quarkus.java:66)
	at io.quarkus.runtime.Quarkus.run(Quarkus.java:42)
	at io.quarkus.runtime.Quarkus.run(Quarkus.java:119)
	at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.quarkus.runner.bootstrap.StartupActionImpl$3.run(StartupActionImpl.java:134)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:51)
	at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:101)
	at org.apache.camel.impl.engine.DefaultChannel.doStart(DefaultChannel.java:126)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
	at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:154)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
	at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:79)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
	at org.apache.camel.impl.engine.RouteService.startChildServices(RouteService.java:398)
	at org.apache.camel.impl.engine.RouteService.doWarmUp(RouteService.java:195)
	at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:121)
	... 25 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:434)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at org.apache.camel.component.kafka.DefaultKafkaClientFactory.getProducer(DefaultKafkaClientFactory.java:29)
	at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:114)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
	at org.apache.camel.impl.engine.AbstractCamelContext.internalAddService(AbstractCamelContext.java:1467)
	at org.apache.camel.impl.engine.AbstractCamelContext.addService(AbstractCamelContext.java:1385)
	at org.apache.camel.processor.SendProcessor.doStart(SendProcessor.java:236)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:101)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1487)
	at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
	... 40 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: io.apicurio.registry.serde.strategy.MySchemaResolver
	at io.apicurio.registry.serde.utils.Utils.loadClass(Utils.java:35)
	at io.apicurio.registry.serde.utils.Utils.instantiate(Utils.java:50)
	at io.apicurio.registry.serde.SchemaResolverConfigurer.configure(SchemaResolverConfigurer.java:72)
	at io.apicurio.registry.serde.AbstractKafkaSerDe.configure(AbstractKafkaSerDe.java:68)
	at io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer.configure(ProtobufKafkaSerializer.java:78)
	at io.apicurio.registry.serde.strategy.MyProtobufKafkaSerializer.configure(MyProtobufKafkaSerializer.java:49)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:369)
	... 53 more
Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.strategy.MySchemaResolver
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:428)
	at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:378)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:315)
	at io.apicurio.registry.serde.utils.Utils.loadClass(Utils.java:33)
	... 59 more

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Type

Projects

  • Status

    Backlog

Relationships

None yet

Development

No branches or pull requests

Issue actions