Hi, Im trying to use the basic PySpark example, w...
# ask-the-community
g
Hi, Im trying to use the basic PySpark example, with ImageSpec when i run pyflyte run --remote --image imageSpec.yaml flyte3.py my_wf i get from web UI :
Copy code
[1/1] currentAttempt done. Last Error: USER::core.py:760 in invoke          │
│                                                                              │
│ ❱  760 │   │   │   │   return __callback(*args, **kwargs)                    │
│                                                                              │
│ /usr/local/lib/python3.10/site-packages/flytekit/bin/entrypoint.py:508 in    │
│ fast_execute_task_cmd                                                        │
│                                                                              │
│ ❱ 508 │   subprocess.run(cmd, check=True)                                    │
│                                                                              │
│ /usr/local/lib/python3.10/subprocess.py:526 in run                           │
│                                                                              │
│ ❱  526 │   │   │   raise CalledProcessError(retcode, process.args,           │
╰──────────────────────────────────────────────────────────────────────────────╯
CalledProcessError: Command '['pyflyte-execute', '--inputs', 
'<s3://my-s3-bucket/metadata/propeller/flytesnacks-development-fd967e080f61042fd8>
6d/n0/data/inputs.pb', '--output-prefix', 
'<s3://my-s3-bucket/metadata/propeller/flytesnacks-development-fd967e080f61042fd8>
6d/n0/data/0', '--raw-output-data-prefix', 
'<s3://my-s3-bucket/data/4d/fd967e080f61042fd86d-n0-0>', '--checkpoint-path', 
'<s3://my-s3-bucket/data/4d/fd967e080f61042fd86d-n0-0/_flytecheckpoints>', 
'--prev-checkpoint', '""', '--dynamic-addl-distro', 
'<s3://my-s3-bucket/flytesnacks/development/U67AW4X3WQLPZXY2HBRFM7GDWY======/scri>
pt_mode.tar.gz', '--dynamic-dest-dir', '/root', '--resolver', 
'flytekit.core.python_auto_container.default_task_resolver', '--', 
'task-module', 'flyte3', 'task-name', 'hello_spark']' returned non-zero exit 
status 1.
Im not havving issues without --remote flag, (pyflyte run flyte3.py my_wf is running ok) Id be soo grateful if you can guide me.
k
did you see any other error in the task pod?
kubectl logs <pod-name> -n flytesnacks-development
g
@Kevin Su
Copy code
/bin/bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
++ id -u
+ myuid=0
++ id -g
+ mygid=0
+ set +e
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/bash
+ set -e
+ '[' -z root:x:0:0:root:/root:/bin/bash ']'
+ '[' -z /usr/local/openjdk-11 ']'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sort -t_ -k4 -n
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' -z ']'
+ '[' -z ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ '[' -z ']'
+ '[' -z x ']'
+ SPARK_CLASSPATH='/opt/spark/conf::/opt/spark/jars/*'
+ case "$1" in
Non-spark-on-k8s command provided, proceeding in pass-through mode...
+ echo 'Non-spark-on-k8s command provided, proceeding in pass-through mode...'
+ CMD=("$@")
+ exec /usr/bin/tini -s -- pyflyte-fast-execute --additional-distribution <s3://my-s3-bucket/flytesnacks/development/U67AW4X3WQLPZXY2HBRFM7GDWY======/script_mode.tar.gz> --dest-dir /root -- pyflyte-execute --inputs <s3://my-s3-bucket/metadata/propeller/flytesnacks-development-fb6dd57bc977244b6b62/n0/data/inputs.pb> --output-prefix <s3://my-s3-bucket/metadata/propeller/flytesnacks-development-fb6dd57bc977244b6b62/n0/data/0> --raw-output-data-prefix <s3://my-s3-bucket/data/nj/fb6dd57bc977244b6b62-n0-0> --checkpoint-path <s3://my-s3-bucket/data/nj/fb6dd57bc977244b6b62-n0-0/_flytecheckpoints> --prev-checkpoint '""' --resolver flytekit.core.python_auto_container.default_task_resolver -- task-module flyte3 task-name hello_spark
Traceback (most recent call last):
  File "/usr/local/bin/pyflyte-fast-execute", line 5, in <module>
    from flytekit.bin.entrypoint import fast_execute_task_cmd
  File "/usr/local/lib/python3.9/dist-packages/flytekit/__init__.py", line 207, in <module>
    from flytekit.core.base_sql_task import SQLTask
  File "/usr/local/lib/python3.9/dist-packages/flytekit/core/base_sql_task.py", line 4, in <module>
    from flytekit.core.base_task import PythonTask, TaskMetadata
  File "/usr/local/lib/python3.9/dist-packages/flytekit/core/base_task.py", line 34, in <module>
    from flytekit.core.interface import Interface, transform_interface_to_typed_interface
  File "/usr/local/lib/python3.9/dist-packages/flytekit/core/interface.py", line 14, in <module>
    from flytekit.core.type_engine import TypeEngine
  File "/usr/local/lib/python3.9/dist-packages/flytekit/core/type_engine.py", line 24, in <module>
    from marshmallow_enum import EnumField, LoadDumpOptions
ModuleNotFoundError: No module named 'marshmallow_enum'
k
you need to install marshmallow_enum in the image spec.
cc @Eduardo Apolinario (eapolinario) do we push the new flytekit image? the latest image should contain marshmallow_enum.
e
yeah, there's something weird going on. I think it has to do with the alpha release we had last week. cc: @Yee
(driving to the office now, will take a look when I get there)
y
which version are you on?
this was fixed already https://github.com/flyteorg/flytekit/pull/1746 (basically an indirect dependency that stopped being an indirect dependency)
try v1.8.1?
g
@Yee was using 1.6.1, this is the tracebhack using 1.8.1
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/flyte3.py", line 33, in hello_spark
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 1905, in reduce
        vals = self.mapPartitions(func).collect()
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 1814, in collect
        sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 5441, in _jrdd
        wrapped_func = _wrap_function(
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 5243, in _wrap_function
        return sc._jvm.SimplePythonFunction(

Message:

    'JavaPackage' object is not callable

User error.
k
could you share your image spec?
g
@Kevin Su sure!
Copy code
python_version: 3.11
registry: gmolinah92 
packages:
  - pyspark 
env:
  Debug: "True"
k
which version of flytekit are you using in the local venv
g
1.8.1
k
could you try to explicitly set spark base_image in the image spec
g
sure, how can i do that?, should i run
docker pull <http://ghcr.io/flyteorg/flytekit:py3.9-sqlalchemy-1.9.0a0|ghcr.io/flyteorg/flytekit:py3.9-sqlalchemy-1.9.0a0>
?
k
no, envd will pull that automatically
g
updated imageSpec
Copy code
python_version: 3.11
registry: gmolinah92 
packages:
  - pyspark 
env:
  Debug: "True"
base_image:
  <http://ghcr.io/flyteorg/flytekit:spark-latest|ghcr.io/flyteorg/flytekit:spark-latest>
output :
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/flyte3.py", line 33, in hello_spark
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 1905, in reduce
        vals = self.mapPartitions(func).collect()
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 1814, in collect
        sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 5441, in _jrdd
        wrapped_func = _wrap_function(
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 5243, in _wrap_function
        return sc._jvm.SimplePythonFunction(

Message:

    'JavaPackage' object is not callable

User error.
k
we use this spark image as our base image + flytekit + flytekitplugins
do you use any jar that isn’t installed in the spark base image?
g
thanks kevin, no i'm just trying the provided documentation example, but we plan to use jdbc jar
k
That might have some issues in the image, investigating
g
just in case, this is the content of my flyte3.py file
Copy code
import datetime
import random
from operator import add

import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
from flytekit.image_spec.image_spec import ImageSpec

spark_image = ImageSpec(registry="gmolinah92")


@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
    container_image=spark_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Sparkfk wifth Partitions: {}".format(partitions))
    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1", container_image=spark_image)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def my_wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
    """
    Using the workflow is still as any other workflow. 
    As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi
how should i use that dockerfile?, can you please share suggested documentation?
k
thanks for sharing, let me test it
could you try
pyflyte register --version v1 --non-fast flyte3.py
without passing imageSpec.yaml
g
successfully registered,
Copy code
Running pyflyte register from /home/gabriel/Documents with images ImageConfig(default_image=Image(name='default', fqn='<http://cr.flyte.org/flyteorg/flytekit|cr.flyte.org/flyteorg/flytekit>', tag='py3.10-1.8.1'), images=[Image(name='default', fqn='<http://cr.flyte.org/flyteorg/flytekit|cr.flyte.org/flyteorg/flytekit>', tag='py3.10-1.8.1')]) and image destination folder /root on 1 package(s) ('/home/gabriel/Documents/flyte3.py',)
Registering against localhost:30080
Detected Root /home/gabriel/Documents, using this to create deployable package...
Loading packages ['flyte3'] under source root /home/gabriel/Documents
Image gmolinah92/flytekit:Df4gvCXxRN3XkjdLXzPEVg.. found. Skip building.
Image gmolinah92/flytekit:Df4gvCXxRN3XkjdLXzPEVg.. found. Skip building.
Successfully serialized 4 flyte objects
[✔] Registration flyte3.hello_spark type TASK successful with version v1
[✔] Registration flyte3.print_every_time type TASK successful with version v1
[✔] Registration flyte3.my_wf type WORKFLOW successful with version v1
[✔] Registration flyte3.my_wf type LAUNCH_PLAN successful with version v1
Successfully registered 4 entities
but then run --remote generates same error (did you tried it successfully?)
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/flyte3.py", line 33, in hello_spark
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 1905, in reduce
        vals = self.mapPartitions(func).collect()
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 1814, in collect
        sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 5441, in _jrdd
        wrapped_func = _wrap_function(
      File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 5243, in _wrap_function
        return sc._jvm.SimplePythonFunction(

Message:

    'JavaPackage' object is not callable

User error.
k
hmm, it works for me. what image you’re using for the spark task?
g
gmolinah92/flytekit:jauyJJTJIIQjfoWzFf3YaQ..
k
could you try my image.
pingsutw/flytekit:4dRZqjpM2KKZ6XNRUL76ew..
it works for me
g
How can i use it?
k
pyflyte run --image pingsutw/flytekit:4dRZqjpM2KKZ6XNRUL76ew.. wf.py my_wf
g
Apparently its needed to change
spark_image = ImageSpec(registry="gmolinah92")
to
spark_image = ImageSpec(registry="pingsutw")
in flyte3.py Even that way, running run --remote is using a different image than specified, (both registry users, as you can see in attached image) Both executions failed with same previous error.