acoustic-carpenter-78188
04/07/2023, 7:12 PMSKIPPED due to an upstream node failure, these tasks are not retried, and the execution is incorrectly marked as SUCCEEDED anyway.
Expected behavior
SKIPPED tasks should also be retried on recovery if the upstream node succeeds.
Additional context to reproduce
import random
from flytekit import task, workflow
from flytekit.core.workflow import WorkflowFailurePolicy
@task
def pass_through(input1: int) -> int:
return input1
@task
def fail(input1: int) -> int:
if random.randint(0, 10) < 7:
assert False
return input1
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf(wf_input: int) -> tuple[int, int]:
a = fail(input1=wf_input)
b = pass_through(input1=wf_input)
c = pass_through(input1=a)
return b, c
Execute against a flyte-sandbox like so:
pyflyte run --remote --image <http://cr.flyte.org/flyteorg/flytekit:py3.10-latest|cr.flyte.org/flyteorg/flytekit:py3.10-latest> test/recovery.py wf --wf_input 3
Since the task failure is non-deterministic, keep retrying until the first node fails. The last node should now be marked as SKIPPED. Then, recover until the first node succeeds and observe the behavior.
Screenshots
Initial failure:
Screenshot 2023-04-07 at 11 57 41 AM▾
Screenshot 2023-04-07 at 11 58 02 AM▾
Screenshot 2023-04-07 at 12 03 29 PM▾
acoustic-carpenter-78188
04/07/2023, 10:12 PM