<#3904 [BUG] Failed to run spark task if using fas...
# flyte-github
c
#3904 [BUG] Failed to run spark task if using fast-register Issue created by pingsutw Describe the bug Failed to run the spark task with below errors
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/test.py", line 33, in hello_spark
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1250, in reduce
        vals = self.mapPartitions(func).collect()
      File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1197, in collect
        sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
        return_value = get_return_value(
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
        return f(*a, **kw)
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(

Message:

    An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Aborting TaskSet 0.0 because task 1 (partition 1)
cannot run anywhere due to node and executor excludeOnFailure.
Most recent failure:
Lost task 1.1 in stage 0.0 (TID 3) (10.244.0.54 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'f' on <module 'test' from '/usr/lib/python3.9/test/__init__.py'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
Expected behavior It works if using non-fast register, so it should be able to work if using fast register. Additional context to reproduce workflow.py
Copy code
import datetime
import random
from operator import add

import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
from flytekit.image_spec.image_spec import ImageSpec

spark_image = ImageSpec(registry="pingsutw")


@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
    container_image=spark_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Sparkfk wifth Partitions: {}".format(partitions))
    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1", container_image=spark_image)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
    """
    Using the workflow is still as any other workflow. 
    As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi
Screenshots No response Are you sure this issue hasn't been raised already? ☑︎ Yes Have you read the Code of Conduct? ☑︎ Yes flyteorg/flyte