Chandramoulee K V
11/24/2022, 8:05 AM# Copy the makefile targets to expose on the container. This makes it easier to register.
# Delete this after we update CI to not serialize inside the container
# COPY k8s_spark/sandbox.config /root
# Copy the actual code
# COPY k8s_spark/ /root/k8s_spark
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
# ARG tag
# ENV FLYTE_INTERNAL_IMAGE $tag
# Copy over the helper script that the SDK relies on
# RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
# RUN chmod a+x /usr/local/bin/flytekit_venv
)
I registered the sample pyspark workflow with the image and i am facing this issue:
failed
SYSTEM ERROR! Contact platform administrators.
When looking at the logs in aws i found that it was unable to load native-hadoop library warning could this be the cause of this issue any idea?
{"log":"22/11/24 07:03:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
","stream":"stderr","docker":{"container_id":"XXX"},"kubernetes":{"container_name":"YYY","namespace_name":"flytesnacks-development","pod_name":"ZZZ","pod_id":"AAA","namespace_id":"BBB","namespace_labels":{"kubernetes_io/metadata_name":"flytesnacks-development"}}}
{"log":"{\"asctime\": \"2022-11-24 07:03:56,085\", \"name\": \"flytekit.entrypoint\", \"levelname\": \"ERROR\", \"message\": \"Traceback (most recent call last):\
\
File \\\"/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\\\", line 165, in system_entry_point\
return wrapped(*args, **kwargs)\
File \\\"/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py\\\", line 462, in dispatch_execute\
new_user_params = self.pre_execute(ctx.user_space_params)\
File \\\"/opt/venv/lib/python3.8/site-packages/flytekitplugins/spark/task.py\\\", line 124, in pre_execute\
self.sess = sess_builder.getOrCreate()\
File \\\"/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py\\\", line 272, in getOrCreate\
session = SparkSession(sc, options=self._options)\
File \\\"/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py\\\", line 307, in __init__\
jsparkSession = self._jvm.SparkSession(<http://self._jsc.sc|self._jsc.sc>(), options)\
File \\\"/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py\\\", line 1585, in __call__\
return_value = get_return_value(\
File \\\"/opt/venv/lib/python3.8/site-packages/py4j/protocol.py\\\", line 330, in get_return_value\
raise Py4JError(\
\
Message:\
\
An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:\
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist\
\\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)\
\\tat py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)\
\\tat py4j.Gateway.invoke(Gateway.java:237)\
\\tat py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)\
\\tat py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)\
\\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\
\\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\
\\tat java.lang.Thread.run(Thread.java:750)\
\
\
\
SYSTEM ERROR! Contact platform administrators.\"}
","stream":"stderr","docker":{"container_id":"58a24d3b7f9e5d9fcdd64e7e7404fa37f489beab0401ac93bd1953e88e90f116"},"kubernetes":{"container_name":"ablr8sc5p2mfvfwj6fxl-n0-1","namespace_name":"flytesnacks-development","pod_name":"ablr8sc5p2mfvfwj6fxl-n0-1","pod_id":"58a24d3b7f9e5d9fcdd64e7e7404fa37f489beab0401ac93bd1953e88e90f116","namespace_id":"2e48245d-3a0a-48e9-96db-897259d85785","namespace_labels":{"kubernetes_io/metadata_name":"flytesnacks-development"}}}
Samhita Alla
Chandramoulee K V
11/24/2022, 12:02 PMNativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
and because of this, I think this happened
return_value = get_return_value(\
File \\\"/opt/venv/lib/python3.8/site-packages/py4j/protocol.py\\\", line 330, in get_return_value\
raise Py4JError(\
\
Message:\
\
An error occurred while calling <http://None.org|None.org>.apache.spark.sql.SparkSession. Trace:\
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist\
\\tat
Samhita Alla
Should i rebuild the docker image with what you are working on currently and check if it makes a difference?Yes, please!
Chandramoulee K V
11/30/2022, 1:17 PMFROM ubuntu:focal
LABEL org.opencontainers.image.source <https://github.com/flyteorg/flytesnacks>
WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
ENV DEBIAN_FRONTEND=noninteractive
# Install Python3 and other basics
RUN apt-get update && apt-get install -y python3.8 python3.8-venv make build-essential libssl-dev python3-pip curl
# Install AWS CLI to run on AWS (for GCS install GSutil). This will be removed
# in future versions to make it completely portable
RUN pip3 install awscli
WORKDIR /opt
RUN curl <https://sdk.cloud.google.com> > install.sh
RUN bash /opt/install.sh --install-dir=/opt
ENV PATH $PATH:/opt/google-cloud-sdk/bin
WORKDIR /root
ENV VENV /opt/venv
# Virtual environment
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"
RUN apt-get -y install git
RUN apt-get install -y wget
RUN pip3 install wheel
# Install Python dependencies
COPY requirements.txt /root
COPY flytekit_install_spark3.sh /root/flytekit_install_spark3.sh
RUN pip install -r /root/requirements.txt
RUN flytekit_install_spark3.sh
# Adding Tini support for the spark pods
RUN wget <https://github.com/krallin/tini/releases/download/v0.18.0/tini> && \
cp tini /sbin/tini && cp tini /usr/bin/tini && \
chmod a+x /sbin/tini && chmod a+x /usr/bin/tini
# Setup Spark environment
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
ENV SPARK_HOME /opt/spark
ENV SPARK_VERSION 3.2.1
ENV PYSPARK_PYTHON ${VENV}/bin/python3
ENV PYSPARK_DRIVER_PYTHON ${VENV}/bin/python3
# For spark we want to use the default entrypoint which is part of the
# distribution, also enable the virtualenv for this image.
ENTRYPOINT ["/opt/entrypoint.sh"]
flytekit
awscli
pymysql
scikit-learn
xgboost
sqlalchemy
boto3
s3fs
mlflow
lxml
xlsxwriter
spark
flytekitplugins-spark
requirements.txt
Sparksampletest.py
import flytekit
import pandas
from flytekit import Resources, kwtypes, task, workflow
from flytekit.types.structured.structured_dataset import StructuredDataset
from flytekitplugins.spark import Spark
try:
from typing import Annotated
except ImportError:
from typing_extensions import Annotated
columns = kwtypes(name=str, age=int)
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
limits=Resources(mem="2000M"),
cache_version="1",
)
def create_spark_df() -> Annotated[StructuredDataset, columns]:
"""
This task returns a Spark dataset that conforms to the defined schema. Failure to do so should result
in a runtime error. TODO: runtime error enforcement
"""
sess = flytekit.current_context().spark_session
return StructuredDataset(
dataframe=sess.createDataFrame(
[
("Alice", 5),
("Bob", 10),
("Charlie", 15),
],
["name", "age"],
)
)
@task(cache_version="1")
def sum_of_all_ages(s: Annotated[StructuredDataset, columns]) -> int:
df: pandas.DataFrame = s.open(pandas.DataFrame).all()
return int(df["age"].sum())
@workflow
def my_smart_structured_dataset() -> int:
"""
This workflow shows how a simple schema can be created in Spark and passed to a python function and accessed as a
pandas DataFrame. Flyte Schemas are abstract DataFrames and not tied to a specific memory representation.
"""
df = create_spark_df()
return sum_of_all_ages(s=df)
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running my_smart_schema()-> {my_smart_structured_dataset()}")
Traceback (most recent call last):
File "/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
return wrapped(*args, **kwargs)
File "/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 462, in dispatch_execute
new_user_params = self.pre_execute(ctx.user_space_params)
File "/opt/venv/lib/python3.8/site-packages/flytekitplugins/spark/task.py", line 124, in pre_execute
self.sess = sess_builder.getOrCreate()
File "/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py", line 272, in getOrCreate
session = SparkSession(sc, options=self._options)
File "/opt/venv/lib/python3.8/site-packages/pyspark/sql/session.py", line 307, in __init__
jsparkSession = self._jvm.SparkSession(<http://self._jsc.sc|self._jsc.sc>(), options)
File "/opt/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1585, in __call__
return_value = get_return_value(
File "/opt/venv/lib/python3.8/site-packages/py4j/protocol.py", line 330, in get_return_value
raise Py4JError(
Message:
An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist
at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
at py4j.Gateway.invoke(Gateway.java:237)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Samhita Alla
Chandramoulee K V
11/30/2022, 1:23 PMthis is the error:
Traceback (most recent call last):
File "/opt/venv/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
return wrapped(*args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 525, in dispatch_execute
raise TypeError(
Message:
Failed to convert return value for var o0 for function sparksampletest.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o42.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:461)
at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:558)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Unknown Source)
SYSTEM ERROR! Contact platform administrators.
Samhita Alla
Chandramoulee K V
12/01/2022, 5:07 AMSamhita Alla
Chandramoulee K V
12/01/2022, 10:35 AMFailed to convert return value for var o0 for function sparksampletest.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o42.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
This is the main err in converting the dataframes workflow what might be the cause for this its saying s3 filesystem is not supported so parquet files are not being read?Samhita Alla
Chandramoulee K V
12/02/2022, 5:25 AMSamhita Alla
Chandramoulee K V
12/02/2022, 5:28 AMSamhita Alla
Chandramoulee K V
12/02/2022, 5:33 AMSamhita Alla
Chandramoulee K V
12/02/2022, 5:35 AMSamhita Alla