blue-ice-67112
09/30/2024, 10:51 AMjolly-florist-66728
09/30/2024, 10:51 AMretries
parameter in the @task
decorator. Here's an example:
from flytekit import task
from flytekitplugins.spark import Spark
@task(
task_config=Spark(
spark_conf={"spark": "1"},
executor_path="/usr/bin/python3",
applications_path="local:///usr/local/bin/entrypoint.py",
),
retries=3 # Set the number of retries
)
def my_spark_task(a: str) -> int:
session = flytekit.current_context().spark_session
assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local"
return 10
In this example, the my_spark_task
will be retried up to 3 times in case of failure. The retries
parameter is set to 3, which means the task will attempt to run a total of 4 times (1 initial attempt + 3 retries).
Would you like more detail on this?
Sources:
- Flyte Task Retries
- Flyte Spark Task Example 👍 1 button 👎 0 button 💡 Hint: Mention @ancient-wolf-19325 in the thread for followups.blue-ice-67112
10/01/2024, 2:01 PMjolly-florist-66728
10/01/2024, 2:01 PMFlyteRecoverableException
. Here's an example of how to do this for a Python task:
from flytekit import task
from flytekit.exceptions.user import FlyteRecoverableException
@task(retries=3)
def my_python_task():
try:
# Your task logic here
raise Exception("Transient failure")
except Exception as e:
raise FlyteRecoverableException("Recoverable failure: " + str(e))
For Spark tasks, you can similarly catch exceptions and raise them as `FlyteRecoverableException`:
from flytekit import task
from flytekitplugins.spark import Spark
from flytekit.exceptions.user import FlyteRecoverableException
@task(
task_config=Spark(
spark_conf={"spark": "1"},
executor_path="/usr/bin/python3",
applications_path="local:///usr/local/bin/entrypoint.py",
),
retries=3
)
def my_spark_task(a: str) -> int:
try:
session = flytekit.current_context().spark_session
assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local"
# Simulate a transient failure
raise Exception("Transient failure")
except Exception as e:
raise FlyteRecoverableException("Recoverable failure: " + str(e))
return 10
This way, Flyte will recognize the failure as recoverable and will retry the task according to the specified retry policy.
Would you like more detail on this?
Sources:
- Flyte Task Retries
- Flyte Recoverable Exceptions 👍 1 button 👎 0 button 💡 Hint: Mention @ancient-wolf-19325 in the thread for followups.blue-ice-67112
10/01/2024, 2:21 PMjolly-florist-66728
10/01/2024, 2:21 PMFlyteRecoverableException
to ensure that Flyte treats the failure as recoverable and retries the task. Here's an example of how you can do this:
from flytekit import task
from flytekitplugins.spark import Spark
from flytekit.exceptions.user import FlyteRecoverableException
@task(
task_config=Spark(
spark_conf={"spark": "1"},
executor_path="/usr/bin/python3",
applications_path="local:///usr/local/bin/entrypoint.py",
),
retries=3
)
def my_spark_task(a: str) -> int:
try:
session = flytekit.current_context().spark_session
assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local"
# Your Spark logic here
# Simulate a transient failure
raise Exception("Transient failure")
except Exception as e:
raise FlyteRecoverableException("Recoverable failure: " + str(e))
return 10
In this example, any exception raised within the try
block will be caught and re-raised as a FlyteRecoverableException
, ensuring that Flyte will retry the task according to the specified retry policy.
Would you like more detail on this?
Sources:
- https://docs.flyte.org/en/latest/api/flytekit/design/authoring.html#exception-handling
- https://discuss.flyte.org/t/10365853/hi-all-i-m-seeing-retries-3-and-counting-on-a-workflow-that-/ 👍 1 button 👎 0 button 💡 Hint: Mention @ancient-wolf-19325 in the thread for followups.blue-ice-67112
10/01/2024, 2:27 PMjolly-florist-66728
10/01/2024, 2:27 PMFlyteRecoverableException
to ensure that Flyte treats them as recoverable and retries the task.
Here's an example of how you can handle specific PySpark exceptions and mark them as recoverable:
from flytekit import task
from flytekitplugins.spark import Spark
from flytekit.exceptions.user import FlyteRecoverableException
from pyspark.sql.utils import AnalysisException, Py4JJavaError
@task(
task_config=Spark(
spark_conf={"spark": "1"},
executor_path="/usr/bin/python3",
applications_path="local:///usr/local/bin/entrypoint.py",
),
retries=3
)
def my_spark_task(a: str) -> int:
try:
session = flytekit.current_context().spark_session
assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local"
# Your Spark logic here
# Simulate a transient failure
raise AnalysisException("Simulated AnalysisException")
except (AnalysisException, Py4JJavaError) as e:
raise FlyteRecoverableException("Recoverable failure: " + str(e))
except Exception as e:
raise e # Re-raise other exceptions as non-recoverable
return 10
In this example, AnalysisException
and Py4JJavaError
are caught and re-raised as FlyteRecoverableException
, ensuring that Flyte will retry the task. Other exceptions are re-raised as non-recoverable.
Would you like more detail on this?
Sources:
- https://discuss.flyte.org/t/10365853/hi-all-i-m-seeing-retries-3-and-counting-on-a-workflow-that-/ 👍 1 button 👎 0 button 💡 Hint: Mention @ancient-wolf-19325 in the thread for followups.