Skip to content

[Bug]: NPE in SolaceIO getWatermark #32660

Open
@ppawel

Description

@ppawel

What happened?

With our Solace broker instance, it sometimes happens during maintenance windows that it resets the connections and our Beam jobs have to reconnect. Normally this is not a big issue but I have noticed this in one of our SolaceIO-based jobs that we are currently testing:

java.lang.NullPointerException
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:903)
	at org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader.getWatermark(UnboundedSolaceReader.java:128)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.currentRestriction(Read.java:953)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:1017)
	at org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserverWithProgress.getProgress(RestrictionTrackers.java:105)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.getProgress(FnApiDoFnRunner.java:1186)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1700(FnApiDoFnRunner.java:145)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$SplittableFnDataReceiver.getProgress(FnApiDoFnRunner.java:1171)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SplittingMetricTrackingFnDataReceiver.getProgress(PCollectionConsumerRegistry.java:482)
	at org.apache.beam.fn.harness.BeamFnDataReadRunner.trySplit(BeamFnDataReadRunner.java:266)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.trySplit(ProcessBundleHandler.java:741)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
	at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

This exception happens somewhere around reconnection failures (network reasons - connection refused, broker was probably down briefly).

Afterwards, the pipeline seems to recover (subsequent reconnection to the broker is successful) but still I guess it would be nice to handle such event more gracefully than NPE.

FYI @bzablocki

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions