Hello, we cannot seem to get `WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE` to work in...
c
Hello, we cannot seem to get
WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
to work in any capacity. Our workflow/task is specified as follows:
Copy code
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)  # type: ignore [misc]
def fail_after_workflow() -> str:
    """Foo."""
    return randomly_fail_workflow()


@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)  # type:ignore [misc]
def randomly_fail_workflow() -> str:
    """Foo."""
    maybe_fail_task(idx=0)
    maybe_fail_task(idx=1)
    maybe_fail_task(idx=2)
    maybe_fail_task(idx=3)

    return "test"

@task(
    requests=ResourceFactory.gig_resources(1, 1, 1),
)  # type:ignore [misc]
def maybe_fail_task(idx: int) -> str:
    """Foo."""
    time.sleep(20 * float(idx))
    if idx % 2 == 0:
        raise ValueError("failed")
    print(idx)
    return str(idx)
However when one of the tasks fails it always causes the rest of in-flight tasks to be aborted. What are we doing wrong here?
I made an example that more closely resembled the docs and it also doesn't seem to work.
g
I don't think this matters but it might be worth a try if you haven't got it to work yet/ I only set the
failure_policy
on the "main" workflow (so in this case that 'd be
fail_after_workflow
. Or make
randomly_fail_workflow
your main. Wonder if setting
failure_policy
on subworkflows is supported
c
Are you on v1.14 as well?
g
I'm on v1.15
c
Yeah looks like it just doesn't work on sub workflows
g
Makes sense. I suppose Flyte has some kind of graph compilation step and takes the failure_policy of the root node and sees sub workflows as just other nodes in the graph then
c
we're probably not passing the policy down in the case of sub-workflows. Mind opening a gh issue?
c