cuddly-jelly-27016
02/13/2025, 6:29 PMTraceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/test.py", line 33, in hello_spark
sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1250, in reduce
vals = self.mapPartitions(func).collect()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1197, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
Message:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Aborting TaskSet 0.0 because task 1 (partition 1)
cannot run anywhere due to node and executor excludeOnFailure.
Most recent failure:
Lost task 1.1 in stage 0.0 (TID 3) (10.244.0.54 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
command = serializer._read_with_length(file)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
return cloudpickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'f' on <module 'test' from '/usr/lib/python3.9/test/__init__.py'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
Expected behavior
It works if using non-fast register, so it should be able to work if using fast register.
pyflyte run --remote flyte-example/test.py wf
<- fails
pyflyte register --non-fast --version v11 flyte-example/test.py
<- works
Additional context to reproduce
workflow.py
import datetime
import random
from operator import add
import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
from flytekit.image_spec.image_spec import ImageSpec
spark_image = ImageSpec(registry="pingsutw")
@task(
task_config=Spark(
# this configuration is applied to the spark cluster
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
limits=Resources(mem="2000M"),
cache_version="1",
container_image=spark_image,
)
def hello_spark(partitions: int) -> float:
print("Starting Sparkfk wifth Partitions: {}".format(partitions))
n = 100000 * partitions
sess = flytekit.current_context().spark_session
count = (
sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
)
pi_val = 4.0 * count / n
print("Pi val is :{}".format(pi_val))
return pi_val
def f(_):
x = random.random() * 2 - 1
y = random.random() * 2 - 1
return 1 if x**2 + y**2 <= 1 else 0
@task(cache_version="1", container_image=spark_image)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
print("My printed value: {} @ {}".format(value_to_print, date_triggered))
return 1
@workflow
def wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
"""
Using the workflow is still as any other workflow.
As image is a property of the task, the workflow does not care
about how the image is configured.
"""
pi = hello_spark(partitions=50)
print_every_time(value_to_print=pi, date_triggered=triggered_date)
return pi
Screenshots
No response
Are you sure this issue hasn't been raised already?
• Yes
Have you read the Code of Conduct?
• Yes
flyteorg/flytecuddly-jelly-27016
02/13/2025, 6:29 PM