Skip to content

Spark finished after the first stage and returned an empty dataframe #10

@i-Hun

Description

@i-Hun

Thank for your work, but I wasn't able successfully run this plugin. The query finished after the first stage and returned an empty dataframe without any error.

Code to reproduce.

  1. I installed spark in a docker image python:3.8-bullseye with openjdk_version="17" like this
ARG scala_version="2.12"

ENV APACHE_SPARK_VERSION="3.3.0" \
    HADOOP_VERSION="3" \
    SPARK_HOME=/usr/local/spark \
    SPARK_OPTS="--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info" \
    PATH="${PATH}:${SPARK_HOME}/bin"

WORKDIR /tmp
RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    tar xzf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /usr/local --owner root --group root --no-same-owner && \
    rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    ln -s "/usr/local/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" $SPARK_HOME

WORKDIR /usr/local

# to read s3a
RUN wget -P "${SPARK_HOME}/jars" https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar && \
    wget -P "${SPARK_HOME}/jars" https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar

RUN wget -P "${SPARK_HOME}/jars" https://github.com/IBM/spark-s3-shuffle/releases/download/v0.5/spark-s3-shuffle_${scala_version}-${APACHE_SPARK_VERSION}_0.5.jar

# Add a link in the before_notebook hook in order to source automatically PYTHONPATH
RUN mkdir -p /usr/local/bin/before-notebook.d && \
    ln -s "${SPARK_HOME}/sbin/spark-config.sh" /usr/local/bin/before-notebook.d/spark-config.sh

# Fix Spark installation for Java 11 and Apache Arrow library
# see: https://github.com/apache/spark/pull/27356, https://spark.apache.org/docs/latest/#downloading
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo $'\n\
spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true\n\
spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true\n\
spark.driver.memory 200g\n\
spark.kryoserializer.buffer.max 2047\n\
spark.sql.shuffle.partitions 300\n\
spark.sql.execution.arrow.pyspark.fallback.enabled true\n\
spark.driver.maxResultSize 120g' >> "${SPARK_HOME}/conf/spark-defaults.conf"

RUN pip install pyspark

Configured spark in python:

from os import getenv

# set env vars in jupyter notebook
%env AWS_ACCESS_KEY_ID={getenv('S3_SMM_TEST_ACCESS_ID')}
%env AWS_SECRET_ACCESS_KEY={getenv('S3_SMM_TEST_ACCESS_KEY')}
%env S3_ENDPOINT_URL={getenv('S3_ADVANCED_ENDPOINT')}
%env S3_SHUFFLE_ROOT=s3a://smm-test/personal/s3-shuffle

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("name").master('local[*]').config(
    "spark.sql.execution.arrow.pyspark.enabled", "false"
).config(
    "spark.shuffle.manager", "org.apache.spark.shuffle.S3ShuffleManager"
).config(
    "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.S3ShuffleDataIO"
).config(
    "spark.shuffle.s3.rootDir", "s3a://smm-test/personal/s3-shuffle"
).config(
    "spark.dynamicAllocation.enabled", "true"
).config(
    "spark.dynamicAllocation.shuffleTracking.enabled", "true"
).config(
    "spark.fs.s3a.path.style.access", "true"
).config(
    "spark.fs.s3a.fast.upload", "true"
).config(
    "spark.driver.extraClassPath", f'{getenv("SPARK_HOME")}/jars/aws-java-sdk-bundle-1.11.375.jar,/opt/spark/jars/hadoop-aws-3.2.2.jar'
).config(
    "spark.executor.extraClassPath", f'{getenv("SPARK_HOME")}/jars/aws-java-sdk-bundle-1.11.375.jar,/opt/spark/jars/hadoop-aws-3.2.2.jar'
).config(
    "spark.shuffle.s3.sort.cloneRecords", "true"
).config(
    "spark.shuffle.s3.useBlockManager", "true"
).config(
    "spark.shuffle.s3.forceBatchFetch", "true"
).config(
    "spark.shuffle.s3.supportsUnbuffer", "true"
).config(
    'spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider'
).getOrCreate()
config_spark_s3 = {
    'access_id': getenv("S3_SMM_TEST_ACCESS_ID"),
    'access_key': getenv("S3_SMM_TEST_ACCESS_KEY"),
    'impl': 'org.apache.hadoop.fs.s3a.S3AFileSystem',
    'endpoint': getenv("S3_ADVANCED_ENDPOINT")
}
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", config_spark_s3["impl"])
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", config_spark_s3["endpoint"])
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", config_spark_s3["access_id"])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config_spark_s3["access_key"])

and tried to execute a heavy query.

It has been ended in a couple of minutes and result was like
image

despite the fact that without spark-s3-shuffle it runs through many stages in a hour and returns a massive dataframe. The spark.shuffle.s3.rootDir was filled with a couple GBs of data but I would expected much more data.

Do you have any thoughts what can I do to make it work?
Thanks in advance!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions