<#3904 [BUG] Failed to run spark task if using fas...
# flytekit
c
#3904 [BUG] Failed to run spark task if using fast-register Issue created by pingsutw Describe the bug Failed to run the spark task with below errors
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/test.py", line 33, in hello_spark
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1250, in reduce
        vals = self.mapPartitions(func).collect()
      File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1197, in collect
        sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
        return_value = get_return_value(
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
        return f(*a, **kw)
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(

Message:

    An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Aborting TaskSet 0.0 because task 1 (partition 1)
cannot run anywhere due to node and executor excludeOnFailure.
Most recent failure:
Lost task 1.1 in stage 0.0 (TID 3) (10.244.0.54 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'f' on <module 'test' from '/usr/lib/python3.9/test/__init__.py'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
Expected behavior It works if using non-fast register, so it should be able to work if using fast register.
pyflyte run --remote flyte-example/test.py wf
<- fails
pyflyte register --non-fast --version v11 flyte-example/test.py
<- works Additional context to reproduce workflow.py import datetime import random from operator import add import flytekit from flytekit import Resources, task, workflow from flytekitplugins.spark import Spark from flytekit.image_spec.image_spec import ImageSpec spark_image = ImageSpec(registry="pingsutw") @task( task_config=Spark( # this configuration is applied to the spark cluster spark_conf={ "spark.driver.memory": "1000M", "spark.executor.memory": "1000M", "spark.executor.cores": "1", "spark.executor.instances": "2", "spark.driver.cores": "1", } ), limits=Resources(mem="2000M"), cache_version="1", container_image=spark_image, ) def hello_spark(partitions: int) -> float: print("Starting Sparkfk wifth Partitions: {}".format(partitions)) n = 100000 * partitions sess = flytekit.current_context().spark_session count = ( sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) ) pi_val = 4.0 * count / n print("Pi val is :{}".format(pi_val)) return pi_val def f(_): x = random.random() * 2 - 1 y = random.random() * 2 - 1 return 1 if x**2 + y**2 <= 1 else 0 @task(cache_version="1", container_image=spark_image) def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int: print("My printed value: {} @ {}".format(value_to_print, date_triggered)) return 1 @workflow def wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float: """ Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care about how the image is configured. """ pi = hello_spark(partitions=50) print_every_time(value_to_print=pi, date_triggered=triggered_date) return pi Screenshots No response Are you sure this issue hasn't been raised already? • Yes Have you read the Code of Conduct? • Yes flyteorg/flyte