Description
What happened?
When upgrading from Beam 2.33.0 to 2.61.0 I noticed that pipelines using FileIO are reading input data 2 times.
The reason seems to be addition of bad record error handlers introduced by #29670
This changes ParDo from single output to multiple in which case Spark translation add a fork with filter to handle split to separate datasets.
https://github.com/apache/beam/blob/v2.61.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java#L452
This happens even when using default error handler which is a no-op.
Spark structured streaming runner does not suffer from this because it does prune outputs which are not consumed by other transforms introduced by #25624
The should do the same for SparkRunner as well
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner