Skip to content

[Bug]: Failed to restore state on Flink runner #32000

Open
@rubensacoustic

Description

@rubensacoustic

What happened?

Sometimes I'm experimenting issues when Flink want to restore from a previous state. (using a savepoint)

Not sure if this is the right place but it seems like this is related to the schema codec.

This is the exception I'm getting:

org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer.
	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1605)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1580)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1734)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox$24(StreamTask.java:1723)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:731)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.}
	... 15 more
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$OnTimerInvoker$tsendOfBuffering$dHMtZW5kT2ZCdWZmZXJpbmc.invokeOnTimer(Unknown Source)
	at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:242)
	at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:206)
	at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:233)
	at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
	at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1165)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1134)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:1128)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:292)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1732)
	... 14 more
Caused by: java.lang.RuntimeException: Error reading state.
	at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:652)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:657)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.onBufferingTimer(GroupIntoBatches.java:601)
Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected list element deserialization failure
	at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeNextElement(ListDelimitedSerializer.java:89)
	at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeList(ListDelimitedSerializer.java:51)
	at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
	at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
	at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:629)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:657)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.onBufferingTimer(GroupIntoBatches.java:601)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$OnTimerInvoker$tsendOfBuffering$dHMtZW5kT2ZCdWZmZXJpbmc.invokeOnTimer(Unknown Source)
	at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:242)
	at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:206)
	at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:233)
	at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
	at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1165)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1134)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:1128)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:292)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1732)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox$24(StreamTask.java:1723)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:731)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: varint overflow 1721980800000
	at org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:106)
	at org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:55)
	at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
	at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90)
	at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37)
	at org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:431)
	at org.apache.beam.sdk.coders.Coder$ByteBuddy$IzJYxo36.decode(Unknown Source)
	at org.apache.beam.sdk.coders.Coder$ByteBuddy$IzJYxo36.decode(Unknown Source)
	at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:116)
	at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeNextElement(ListDelimitedSerializer.java:82)
	... 32 more

This is not always happening, seems to be a random error or may be a race condition?

Apache Beam 2.57.0
Flink: tested with 1.15 and 1.18, same results

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions