Hi team, I have a dynamic taslk which launches a ...
# ask-the-community
v
Hi team, I have a dynamic taslk which launches a launch plan(workflow) When if one of the workflow fails, all of the other launches gets aborted, How to let flyte know not to do this? Its okay if one launch task fail, continue with the rest
m
You can set a failure policy for a workflow to allow it to continue if tasks fail. e.g.
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
v
Thanks @Maarten de Jong, you mean for my workflow which has dynamic task ? My dynamic task launches 100 (example) workflows. If some step in the workflow fails, its okay for a workflow to fail, but i don’t want other 99 workflows to get “aborted”
Copy code
ground_truth_workflow = flytekit.LaunchPlan.get_or_create(
    name="ground_truth_workflow",
    workflow=GroundTruthOfflinePCPWorkFlow,
)
Dynamic tasks calls a launch plan.
Copy code
@flytekit.dynamic
def run:
     # Figures out number of iterations
     # ground_truth_workflow(..)
If one of the `ground_truth_workflow`(..) fails, current behavior all of the “`RUNNING`” workflows get
aborted
@Ketan (kumare3) - I get this error when i add a failure policy.
Copy code
TypeError: isinstance() arg 2 must be a type or tuple of types
cc @Dan Rammer (hamersaw) in case you have time 🙏
cc @Jay Ganbat - in case you know 🙏
d
Can you say more about where you see the error? What is the workflow def?
this should be specified on the workflow that calls the dynamic task
j
I think Maartens suggestion will work
v
Copy code
@flytekit.workflow
def GroundTruthSchedulerWorkflows(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE):

    <http://logger.info|logger.info>("scheduling ground truth workflows")
    scheduler_tasks.run()
This is the workflow which calls dynamic task
Copy code
@flytekit.dynamic(
    cache=False,
    cache_version="0.12.10",
    retries=1,
    requests=flytekit.Resources(mem="16Gi", cpu="1"),
    limits=flytekit.Resources(mem="16Gi", cpu="1"),
)
def run():
  ….
Copy code
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/com_lyft_avsoftware/src/perception/scene_workflows/ground_truth/workflows/ground_truth_scheduler_workflows.py", line 14, in <module>
  def GroundTruthSchedulerWorkflows(failure_policy=flytekit.WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) :
 File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/workflow.py", line 739, in workflow
  return wrapper(_workflow_function)
 File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/workflow.py", line 734, in wrapper
  workflow_instance.compile()
 File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/workflow.py", line 604, in compile
  self._input_parameters = transform_inputs_to_parameters(ctx, self.python_interface)
 File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/interface.py", line 188, in transform_inputs_to_parameters
  default_lv = TypeEngine.to_literal(ctx, _default, python_type=interface.inputs[k], expected=v.type)
 File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/type_engine.py", line 680, in to_literal
  transformer.assert_type(python_type, python_val)
 File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/type_engine.py", line 96, in assert_type
  if not hasattr(t, "__origin__") and not isinstance(v, t):
d
the failure policy needs to be defined in the workflow decorator rather than as an argument to the workflow
Copy code
@flytekit.workflow(failure_policy=...)
and then as i mentioned it should be used on the top-level workflow to define how that handles failures in the dynamic subtasks
Copy code
@flytekit.workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def foo:
    run()

ground_truth_workflow = flytekit.LaunchPlan.get_or_create(
    name="ground_truth_workflow",
    workflow=GroundTruthOfflinePCPWorkFlow,
)

@flytekit.dynamic(
    cache=False,
    cache_version="0.12.10",
    retries=1,
    requests=flytekit.Resources(mem="16Gi", cpu="1"),
    limits=flytekit.Resources(mem="16Gi", cpu="1"),
)
def run():
    for i in range(2):
        ground_truth_workflow()
v
i am so sorry for this 🤦 🤯 I don’t know why I made this error and couldn’t debug this! 🤦‍♀️
Thanks @Dan Rammer (hamersaw)
d
😄 no problem!
155 Views