Skip to content

[BUG] with maxBytesPerTrigger Spark job won't start because of ClassNotFoundException #170

Open
@ronaldbuit

Description

@ronaldbuit

Describe the bug
When we configure the option maxBytesPerTrigger for our Spark job that reads from a Pulsar topic it won't start. It crashes with this error:

ERROR MicroBatchExecution: Query [id = f01bfeff-d1b8-415f-8592-c8e3211bead7, runId = e5d87131-b874-4072-8f3e-3532cbf74c45] terminated with error
org.apache.pulsar.client.admin.PulsarAdminException$TimeoutException: java.util.concurrent.TimeoutException

If debugging is enabled, this error occurs early in the process:

DEBUG ObjectMapperFactory: Add LoadManagerReport deserializer failed because LoadManagerReport.class has been shaded
java.lang.ClassNotFoundException: org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport

When we put the pulsar-client-admin.jar on the classpath the Spark job will start, but cannot receive anything from the topic because there is a conflict with shade classes for the GenericAvroRecord.

It looks related to apache/pulsar#15167.

To Reproduce
Steps to reproduce the behavior:

  1. Configure maxBytesPerTrigger with corresponding settings.
  2. Start Spark job with debug enabled
  3. See errors

Expected behavior
With maxBytesPerTrigger enabled the Spark job starts normally.

Additional context
Stacktrace for ClassNotFoundException:

DEBUG ObjectMapperFactory: Add LoadManagerReport deserializer failed because LoadManagerReport.class has been shaded
java.lang.ClassNotFoundException: org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:467)
	at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1069)
	at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1135)
	at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1118)
	at org.apache.pulsar.common.util.ObjectMapperFactory.setAnnotationsModule(ObjectMapperFactory.java:203)
	at org.apache.pulsar.common.util.ObjectMapperFactory.create(ObjectMapperFactory.java:117)
	at org.apache.pulsar.client.admin.internal.JacksonConfigurator.<init>(JacksonConfigurator.java:38)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.reflection.ReflectionHelper.makeMe(ReflectionHelper.java:1356)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ClazzCreator.createMe(ClazzCreator.java:248)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ClazzCreator.create(ClazzCreator.java:342)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SystemDescriptor.create(SystemDescriptor.java:463)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext$1.compute(SingletonContext.java:59)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext$1.compute(SingletonContext.java:47)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache$OriginThreadAwareFuture$1.call(Cache.java:74)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache$OriginThreadAwareFuture.run(Cache.java:131)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache.compute(Cache.java:176)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext.findOrCreate(SingletonContext.java:98)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.Utilities.createService(Utilities.java:2102)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ServiceLocatorImpl.internalGetAllServiceHandles(ServiceLocatorImpl.java:1481)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ServiceLocatorImpl.getAllServices(ServiceLocatorImpl.java:799)
	at org.apache.pulsar.shade.org.glassfish.jersey.inject.hk2.AbstractHk2InjectionManager.getAllInstances(AbstractHk2InjectionManager.java:170)
	at org.apache.pulsar.shade.org.glassfish.jersey.inject.hk2.ImmediateHk2InjectionManager.getAllInstances(ImmediateHk2InjectionManager.java:30)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.ContextResolverFactory$ContextResolversConfigurator.postInit(ContextResolverFactory.java:69)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.lambda$initRuntime$2(ClientConfig.java:461)
	at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.initRuntime(ClientConfig.java:461)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.util.collection.Values$LazyValueImpl.get(Values.java:317)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig.getRuntime(ClientConfig.java:819)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getClientRuntime(ClientRequest.java:176)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getInjectionManager(ClientRequest.java:567)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.onBuilder(JerseyWebTarget.java:371)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.request(JerseyWebTarget.java:206)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.request(JerseyWebTarget.java:38)
	at org.apache.pulsar.client.admin.internal.BaseResource.lambda$requestAsync$1(BaseResource.java:101)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325)
	at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:92)
	at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:72)
	at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:178)
	at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:185)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStatsAsync(TopicsImpl.java:678)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.lambda$getInternalStats$22(TopicsImpl.java:665)
	at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:306)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:665)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:660)
	at org.apache.spark.sql.pulsar.PulsarAdmissionControlHelper.latestOffsetForTopicPartition(PulsarHelper.scala:542)
	at org.apache.spark.sql.pulsar.PulsarHelper.$anonfun$latestOffsets$2(PulsarHelper.scala:258)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:182)
	at org.apache.spark.sql.pulsar.PulsarHelper.latestOffsets(PulsarHelper.scala:254)
	at org.apache.spark.sql.pulsar.PulsarSource.latestOffset(PulsarSource.scala:90)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)

Stacktrace for TimeoutException:

ERROR MicroBatchExecution: Query [id = f01bfeff-d1b8-415f-8592-c8e3211bead7, runId = e5d87131-b874-4072-8f3e-3532cbf74c45] terminated with error
org.apache.pulsar.client.admin.PulsarAdminException$TimeoutException: java.util.concurrent.TimeoutException
	at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:311)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:665)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:660)
	at org.apache.spark.sql.pulsar.PulsarAdmissionControlHelper.latestOffsetForTopicPartition(PulsarHelper.scala:542)
	at org.apache.spark.sql.pulsar.PulsarHelper.$anonfun$latestOffsets$2(PulsarHelper.scala:258)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:182)
	at org.apache.spark.sql.pulsar.PulsarHelper.latestOffsets(PulsarHelper.scala:254)
	at org.apache.spark.sql.pulsar.PulsarSource.latestOffset(PulsarSource.scala:90)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
	at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:306)
	... 42 more

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions