Hello, I was exploring on Kubernetes Spark job an...
# ask-the-community
c
Hello, I was exploring on Kubernetes Spark job and i tried to implement it by following this Documentation . This is done in a EKS setup. I have created a custom docker image for spark as specified in the documentation, (only thing i did was i commented the following out in the docker file
Copy code
# Copy the makefile targets to expose on the container. This makes it easier to register.
# Delete this after we update CI to not serialize inside the container
# COPY k8s_spark/sandbox.config /root
# Copy the actual code
# COPY k8s_spark/ /root/k8s_spark
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
# ARG tag
# ENV FLYTE_INTERNAL_IMAGE $tag
# Copy over the helper script that the SDK relies on
# RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
# RUN chmod a+x /usr/local/bin/flytekit_venv
) I registered the sample pyspark workflow with the image and i am facing this issue:
failed
SYSTEM ERROR! Contact platform administrators.
When looking at the logs in aws i found that it was unable to load native-hadoop library warning could this be the cause of this issue any idea?
Copy code
{"log":"22/11/24 07:03:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
","stream":"stderr","docker":{"container_id":"XXX"},"kubernetes":{"container_name":"YYY","namespace_name":"flytesnacks-development","pod_name":"ZZZ","pod_id":"AAA","namespace_id":"BBB","namespace_labels":{"kubernetes_io/metadata_name":"flytesnacks-development"}}}
Attaching the full traceback:
Copy code
{"log":"{\"asctime\": \"2022-11-24 07:03:56,085\", \"name\": \"flytekit.entrypoint\", \"levelname\": \"ERROR\", \"message\": \"Traceback (most recent call last):\
\
      File \\\"/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\\\", line 165, in system_entry_point\
        return wrapped(*args, **kwargs)\
      File \\\"/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py\\\", line 462, in dispatch_execute\
        new_user_params = self.pre_execute(ctx.user_space_params)\
      File \\\"/opt/venv/lib/python3.8/site-packages/flytekitplugins/spark/task.py\\\", line 124, in pre_execute\
        self.sess = sess_builder.getOrCreate()\
      File \\\"/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py\\\", line 272, in getOrCreate\
        session = SparkSession(sc, options=self._options)\
      File \\\"/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py\\\", line 307, in __init__\
        jsparkSession = self._jvm.SparkSession(<http://self._jsc.sc|self._jsc.sc>(), options)\
      File \\\"/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py\\\", line 1585, in __call__\
        return_value = get_return_value(\
      File \\\"/opt/venv/lib/python3.8/site-packages/py4j/protocol.py\\\", line 330, in get_return_value\
        raise Py4JError(\
\
Message:\
\
    An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:\
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist\
\\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)\
\\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)\
\\tat py4j.Gateway.invoke(Gateway.java:237)\
\\tat py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)\
\\tat py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)\
\\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\
\\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\
\\tat java.lang.Thread.run(Thread.java:750)\
\
\
\
SYSTEM ERROR! Contact platform administrators.\"}
","stream":"stderr","docker":{"container_id":"58a24d3b7f9e5d9fcdd64e7e7404fa37f489beab0401ac93bd1953e88e90f116"},"kubernetes":{"container_name":"ablr8sc5p2mfvfwj6fxl-n0-1","namespace_name":"flytesnacks-development","pod_name":"ablr8sc5p2mfvfwj6fxl-n0-1","pod_id":"58a24d3b7f9e5d9fcdd64e7e7404fa37f489beab0401ac93bd1953e88e90f116","namespace_id":"2e48245d-3a0a-48e9-96db-897259d85785","namespace_labels":{"kubernetes_io/metadata_name":"flytesnacks-development"}}}
s
I’m working on a PR to clean up the spark Dockerfile: https://github.com/flyteorg/flytesnacks/pull/914/files. Would you be able to use it as is? You can remove the code copy command, though.
I’m also working on using a non-root user if possible. It currently works only with root user.
c
Ok is enabling of spark plugin mandatory in eks setup or only installing the flytekitplugins-spark enough? and one of the warnings was
NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
and because of this, I think this happened
return_value = get_return_value(\
File \\\"/opt/venv/lib/python3.8/site-packages/py4j/protocol.py\\\", line 330, in get_return_value\
raise Py4JError(\
\
Message:\
\
An error occurred while calling <http://None.org|None.org>.apache.spark.sql.SparkSession. Trace:\
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist\
\\tat
Should i rebuild the docker image with what you are working on currently and check if it makes a difference?
s
I believe the hadoop warning can be ignored. And yes, you need to set up spark: https://docs.flyte.org/en/latest/deployment/plugin_setup/k8s/index.html#deployment-plugin-setup-k8s.
Should i rebuild the docker image with what you are working on currently and check if it makes a difference?
Yes, please!
c
Hi i have enabled the spark operator following the documentation created a Dockerimage for spark as mentioned in the doc attaching the doc and the dockerfile for further reference
Copy code
FROM ubuntu:focal
LABEL org.opencontainers.image.source <https://github.com/flyteorg/flytesnacks>

WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
ENV DEBIAN_FRONTEND=noninteractive

# Install Python3 and other basics
RUN apt-get update && apt-get install -y python3.8 python3.8-venv make build-essential libssl-dev python3-pip curl

# Install AWS CLI to run on AWS (for GCS install GSutil). This will be removed
# in future versions to make it completely portable
RUN pip3 install awscli

WORKDIR /opt
RUN curl <https://sdk.cloud.google.com> > install.sh
RUN bash /opt/install.sh --install-dir=/opt
ENV PATH $PATH:/opt/google-cloud-sdk/bin
WORKDIR /root

ENV VENV /opt/venv
# Virtual environment
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"

RUN apt-get -y install git
RUN apt-get install -y wget
RUN pip3 install wheel

# Install Python dependencies
COPY requirements.txt /root
COPY flytekit_install_spark3.sh  /root/flytekit_install_spark3.sh
RUN pip install -r /root/requirements.txt

RUN flytekit_install_spark3.sh

# Adding Tini support for the spark pods
RUN wget  <https://github.com/krallin/tini/releases/download/v0.18.0/tini> && \
    cp tini /sbin/tini && cp tini /usr/bin/tini && \
    chmod a+x /sbin/tini && chmod a+x /usr/bin/tini

# Setup Spark environment
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
ENV SPARK_HOME /opt/spark
ENV SPARK_VERSION 3.2.1
ENV PYSPARK_PYTHON ${VENV}/bin/python3
ENV PYSPARK_DRIVER_PYTHON ${VENV}/bin/python3

# For spark we want to use the default entrypoint which is part of the
# distribution, also enable the virtualenv for this image.
ENTRYPOINT ["/opt/entrypoint.sh"]
Copy code
flytekit
awscli
pymysql
scikit-learn
xgboost
sqlalchemy
boto3
s3fs
mlflow
lxml
xlsxwriter
spark
flytekitplugins-spark
requirements.txt Sparksampletest.py
Copy code
import flytekit
import pandas
from flytekit import Resources, kwtypes, task, workflow
from flytekit.types.structured.structured_dataset import StructuredDataset
from flytekitplugins.spark import Spark

try:
    from typing import Annotated
except ImportError:
    from typing_extensions import Annotated

columns = kwtypes(name=str, age=int)

@task(
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
)
def create_spark_df() -> Annotated[StructuredDataset, columns]:
    """
    This task returns a Spark dataset that conforms to the defined schema. Failure to do so should result
    in a runtime error. TODO: runtime error enforcement
    """
    sess = flytekit.current_context().spark_session
    return StructuredDataset(
        dataframe=sess.createDataFrame(
            [
                ("Alice", 5),
                ("Bob", 10),
                ("Charlie", 15),
            ],
            ["name", "age"],
        )
    )

@task(cache_version="1")
def sum_of_all_ages(s: Annotated[StructuredDataset, columns]) -> int:
    df: pandas.DataFrame = s.open(pandas.DataFrame).all()
    return int(df["age"].sum())

@workflow
def my_smart_structured_dataset() -> int:
    """
    This workflow shows how a simple schema can be created in Spark and passed to a python function and accessed as a
    pandas DataFrame. Flyte Schemas are abstract DataFrames and not tied to a specific memory representation.
    """
    df = create_spark_df()
    return sum_of_all_ages(s=df)

if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Running my_smart_schema()-> {my_smart_structured_dataset()}")
@Samhita Alla the error is :
Copy code
Traceback (most recent call last):

      File "/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 462, in dispatch_execute
        new_user_params = self.pre_execute(ctx.user_space_params)
      File "/opt/venv/lib/python3.8/site-packages/flytekitplugins/spark/task.py", line 124, in pre_execute
        self.sess = sess_builder.getOrCreate()
      File "/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py", line 272, in getOrCreate
        session = SparkSession(sc, options=self._options)
      File "/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py", line 307, in __init__
        jsparkSession = self._jvm.SparkSession(<http://self._jsc.sc|self._jsc.sc>(), options)
      File "/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1585, in __call__
        return_value = get_return_value(
      File "/opt/venv/lib/python3.8/site-packages/py4j/protocol.py", line 330, in get_return_value
        raise Py4JError(

Message:

    An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
s
Can you try the dockerfile I worked on in the PR?
c
Sure will try
After trying with new dockerfile
Copy code
this is the error:

Traceback (most recent call last):

      File "/opt/venv/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 525, in dispatch_execute
        raise TypeError(

Message:

    Failed to convert return value for var o0 for function sparksampletest.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o42.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:461)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:558)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)


SYSTEM ERROR! Contact platform administrators.
s
@Chandramoulee K V, could you share the content of values-override.yaml file?
c
Hi when run in local it is running fine after we enables the spark plugin
s
Oh so is it failing on demo cluster?
c
Hi, Updates: since it was giving out error saying it was not able to convert the spark_df to structuredDataset parquet format i tried with different script with a simple map,reduce function with partitions it worked fine
Copy code
Failed to convert return value for var o0 for function sparksampletest.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o42.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
This is the main err in converting the dataframes workflow what might be the cause for this its saying s3 filesystem is not supported so parquet files are not being read?
s
Have you installed hadoop-aws? I know it’s present in the dockerfile but want to confirm. Also, we use a values-override.yaml file to upgrade helm. Have you copied the content from the plugin setup guide?
c
yes the we used already available values-eks.yaml and added the spark-operator part from the documentation. will confirm if hadoop aws is present in the container
s
Please take a look at https://docs.flyte.org/en/latest/deployment/plugin_setup/k8s/index.html doc and follow the spark operator installation steps.
c
This is done in the EKS cluster
s
Oh have you installed already?
c
Yes after doing that i was able to run a simple map reduce spark task that will square the numbers in range and will sum it and retrun the value
s
Yeah, if the problem is with s3, then hadoop-s3 hasn’t been installed correctly.
c
Is there any way to verify that in the container?
s
Um I guess. Would you mind sharing all your steps? I’ll take a look at them and see if there’s anything erroneous.
222 Views