Skip to content

[Bug]: No parallelism using WriteToParquet in Apache Spark #24365

Open
@cozos

Description

@cozos

What happened?

When running Beam on Spark using WriteToParquet without num_shards, files seem to be written with no parallelism. In https://beam.apache.org/releases/pydoc/2.11.0/apache_beam.io.parquetio.html it says:

num_shards – The number of files (shards) used for output. If not set, the service will decide on the optimal number of shards.

However, in Spark, my tasks looks like this:
Screen Shot 2022-11-24 at 11 39 42 PM

I believe that this is happening because iobase.WriteImpl in here is doing:

    ...
    | 'Pair' >> core.Map(lambda x: (None, x))
    | core.GroupByKey()

which was added in this PR: #958

If I understand correctly, the pcollection elements will all have the same key, None, and GroupByKey will group all those elements into a single "partition" (in Spark terms). This "None" partition is massively skewed and can only be written by 1 thread / task and will take forever.

Issue Priority

Priority: 2

Issue Component

Component: io-py-parquet

Activity

Abacn

Abacn commented on Nov 28, 2022

@Abacn
Contributor

Thanks for reporting and triaging the issue. Surprised by "adding a None key then GroupByKey" which makes no sense today, but probably how GBK works has since changed. We should be able to replace the change in #958 to a ReShuffle(). Would you mind testing if it resolves your issue and appreciate if opening a PR?

cozos

cozos commented on Nov 28, 2022

@cozos
ContributorAuthor

Hi @Abacn, thanks for your response.

Upon a closer reading of iobase._WriteBundleDoFn, I realized that it does not actually return a pcollection of all elements, but rather a pcollection of all the file paths that the elements were written to. This makes the None GroupByKey a bit better, as the shuffle skew is only happening on the number of files (several hundred or thousands) which is a much smaller magnitude than elements/rows (millions).

With this in mind, the poor performance from the GroupByKey is perplexing, especially since it seems to work fine in GCP Dataflow but not Spark. Any ideas?

Here is where my Spark job is stuck on the Beam-to-Spark translation:

org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:45)
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(GroupNonMergingWindowsFunctions.java:272)
org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator.translateGroupByKey(SparkBatchPortablePipelineTranslator.java:203)
org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator.translate(SparkBatchPortablePipelineTranslator.java:158)
org.apache.beam.runners.spark.SparkPipelineRunner.lambda$run$2(SparkPipelineRunner.java:189)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

We should be able to replace the change in #958 to a ReShuffle(). Would you mind testing if it resolves your issue and appreciate if opening a PR?

I will give this a try.

Thanks

Abacn

Abacn commented on Nov 28, 2022

@Abacn
Contributor

@cozos This issue attracted me partly because, actually, Python text IO write has obviously worse performance than Java SDK:

Java metrics: http://104.154.241.245/d/bnlHKP3Wz/java-io-it-tests-dataflow?orgId=1&viewPanel=4
Python metrics: http://104.154.241.245/d/gP7vMPqZz/python-io-it-tests-dataflow?orgId=1&viewPanel=5

Java Read ~20s; Java Write ~30s; Python Read ~100s; Python Write 350s

I recently implemented this Python performance test and found this. And trying to figure out the performance bottlenecks in Python file based IOs.

That said Dataflow may also be affected.

cozos

cozos commented on Nov 29, 2022

@cozos
ContributorAuthor

I see, interesting. What I am experiencing in Beam on Spark is not "Python is much slower than Java", it's more like "WriteToParquet does not work at all for moderate sized data". Nevertheless, please keep me posted on your Python performance investigations.

By the way, I tried replacing GroupByKey with Reshuffle and it did not help for my Spark pipeline. I am now trying to remove all shuffles before finalization completely.

cozos

cozos commented on Nov 29, 2022

@cozos
ContributorAuthor

@Abacn Can you shine some light on why we want to trigger a reshuffle here in the first place?

Abacn

Abacn commented on Nov 29, 2022

@Abacn
Contributor

@cozos Thanks for the followup. Did not have much knowlesge on Spark runner but improvement on python file based io is an ongoing work. Will keep updates of course.

mosche

mosche commented on Nov 30, 2022

@mosche
Member

Similarly, for the read side see #24422

kennknowles

kennknowles commented on Dec 1, 2022

@kennknowles
Member

Looking at this, is the assumption that there are very few elements coming out from the write fn?

cozos

cozos commented on Dec 2, 2022

@cozos
ContributorAuthor

Upon thinking about this further I think the bottleneck came from the reader problem I had in here #24422

Basically all Reads only happen on 1 partition on runners that don't support SDF. But this issue was being obscured by Spark UI stage showing the job stuck at the shuffle boundary which came from WriteToParquet (when in reality the bottleneck was at the Read).

We can close this issue but I don't know if the GroupByKey on None is still a problem we want to track (as it could also cause a bottleneck).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

      Participants

      @kennknowles@mosche@cozos@Abacn

      Issue actions

        [Bug]: No parallelism using WriteToParquet in Apache Spark · Issue #24365 · apache/beam