cuddly-jelly-27016
07/27/2023, 10:40 PMTraceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/test.py", line 33, in hello_spark
sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1250, in reduce
vals = self.mapPartitions(func).collect()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1197, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
Message:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Aborting TaskSet 0.0 because task 1 (partition 1)
cannot run anywhere due to node and executor excludeOnFailure.
Most recent failure:
Lost task 1.1 in stage 0.0 (TID 3) (10.244.0.54 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
command = serializer._read_with_length(file)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
return cloudpickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'f' on <module 'test' from '/usr/lib/python3.9/test/__init__.py'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
Expected behavior
It works if using non-fast register, so it should be able to work if using fast register.
Additional context to reproduce
workflow.py
import datetime
import random
from operator import add
import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
from flytekit.image_spec.image_spec import ImageSpec
spark_image = ImageSpec(registry="pingsutw")
@task(
task_config=Spark(
# this configuration is applied to the spark cluster
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
limits=Resources(mem="2000M"),
cache_version="1",
container_image=spark_image,
)
def hello_spark(partitions: int) -> float:
print("Starting Sparkfk wifth Partitions: {}".format(partitions))
n = 100000 * partitions
sess = flytekit.current_context().spark_session
count = (
sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
)
pi_val = 4.0 * count / n
print("Pi val is :{}".format(pi_val))
return pi_val
def f(_):
x = random.random() * 2 - 1
y = random.random() * 2 - 1
return 1 if x**2 + y**2 <= 1 else 0
@task(cache_version="1", container_image=spark_image)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
print("My printed value: {} @ {}".format(value_to_print, date_triggered))
return 1
@workflow
def wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
"""
Using the workflow is still as any other workflow.
As image is a property of the task, the workflow does not care
about how the image is configured.
"""
pi = hello_spark(partitions=50)
print_every_time(value_to_print=pi, date_triggered=triggered_date)
return pi
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyte