Okay, took me a bit of time to repro and get down to a minimal example. If you have just 1 task created in the loop, I did not observe the issue. When I add another (
task_some_other_task
in this example), the issue occurs.
from flytekit import Resources, WorkflowFailurePolicy, dynamic, task, workflow
@task(
limits=Resources(mem="500Mi"),
)
def some_task(
val: str,
):
print(val)
@dynamic(limits=Resources(mem="500Mi"))
def base_workflow():
# set up task promises
for val in ["one", "two", "three"]:
task_some_task = some_task(
val=val,
).with_overrides(node_name=f"{val}-some-task")
task_some_other_task = some_task(
val=val,
).with_overrides(node_name=f"{val}-some-other-task")
task_some_task >> task_some_other_task
@workflow( failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE,
)
def wf():
base_workflow()