Description
What happened?
The usual pattern (example 2) to read GCS paths is:
Pipeline p = ...;
// E.g. the filenames might be computed from other data in the pipeline, or
// read from a data source.
PCollection<String> filenames = ...;
// Read all files in the collection.
PCollection<String> lines =
filenames
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(TextIO.readFiles());
The FileIO.matchAll()
injects a Reshuffle step.
This is working as intented when a few paths (with wildcards) are expanded to lots of files, as it is ment to increase paralelism.
When the list of paths is large and (does not expand, no wildcards - ex 1M filenames) this reshuffle makes the matchAll step not to fusion with next steps, thus keeping a high IO step alone and not triggering scaling up.
Test on GCS with 1M filenames, 1 worker, CPU <2%, backlog is small (just filenames). This does not scale, but without the reshuffle it would fusion with next steps thus making a more CPU intensive stage and triggering autoscale.
Workaround : starting with numWorkers > 1, might not be ideal as if the input number of files varies it can waste ressources.
Desired solution : add an optional configuration parameter for FileIO.matchAll() to disable this reshuffle step; the default configuration should be like today (true) to avoid any regression for existing pipelines.
FileIO.matchAll().withReshuffle(false)
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner