mysterious-painter-66441
06/04/2025, 3:49 PMfrom typing import List
from flytekit import LaunchPlan, WorkflowFailurePolicy, dynamic, task, workflow
@task
def retrieve_a_list() -> List[int]:
return [int(i) for i in range(3)]
@task
def print_num_task(num: int):
if num % 2 == 0:
raise Exception(f"{num} is odd")
print(num)
@workflow
def print_num(num: int):
print_num_task(num=num)
launch_plan = LaunchPlan.get_or_create(workflow=print_num)
@dynamic
def print_list(num_list: List[int]):
for num in num_list:
launch_plan(num=num).with_overrides(name=str(f"number_{num}"))
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def print_number_wf():
num_list = retrieve_a_list()
print_list(num_list=num_list)
# TODO: Add a task to summarize the print list. Return a dictionary of failed/successful workflow in the print_list workflow.
if __name__ == "__main__":
print_number_wf()