clean-glass-36808
03/10/2025, 9:35 PMWorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
to work in any capacity.
Our workflow/task is specified as follows:
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) # type: ignore [misc]
def fail_after_workflow() -> str:
"""Foo."""
return randomly_fail_workflow()
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) # type:ignore [misc]
def randomly_fail_workflow() -> str:
"""Foo."""
maybe_fail_task(idx=0)
maybe_fail_task(idx=1)
maybe_fail_task(idx=2)
maybe_fail_task(idx=3)
return "test"
@task(
requests=ResourceFactory.gig_resources(1, 1, 1),
) # type:ignore [misc]
def maybe_fail_task(idx: int) -> str:
"""Foo."""
time.sleep(20 * float(idx))
if idx % 2 == 0:
raise ValueError("failed")
print(idx)
return str(idx)
However when one of the tasks fails it always causes the rest of in-flight tasks to be aborted. What are we doing wrong here?clean-glass-36808
03/10/2025, 9:55 PMgentle-tomato-480
03/12/2025, 4:19 PMfailure_policy
on the "main" workflow (so in this case that 'd be fail_after_workflow
.
Or make randomly_fail_workflow
your main.
Wonder if setting failure_policy
on subworkflows is supportedclean-glass-36808
03/12/2025, 5:12 PMgentle-tomato-480
03/12/2025, 5:50 PMclean-glass-36808
03/12/2025, 10:50 PMgentle-tomato-480
03/12/2025, 10:52 PMcareful-australia-19356
03/13/2025, 6:05 PMclean-glass-36808
03/13/2025, 6:20 PM