https://flyte.org logo
#ask-the-community
Title
# ask-the-community
x

Xin Shi

08/18/2023, 5:25 PM
Hi, I have one question regarding the for loop in workflow. I wanted to have a code like the following
Copy code
@task
def example_task(num:int) -> None:
    .....

@workflow
def test_workflow() -> None:
    for i in range(5):
        example_test(num=i)
It seems one way is that I can use dynamic workflow. However, in dynamic workflow, the tasks in the for loop will be running in parallel. I wanted the tasks can be running in sequence. So do you have a method to realize the for loop in sequence. Thanks!
j

Jay Ganbat

08/18/2023, 5:28 PM
do you have many tasks in the loop? maybe you can add chaining to your task or add dummy variable that feeds into the next iteration i dont think workflow will work since during registration time everything needs to be defined
n

Niels Bantilan

08/18/2023, 5:39 PM
see here for the chaining syntax
x

Xin Shi

08/18/2023, 5:52 PM
If using the chaining syntax, the code will be something like
Copy code
@task
def example_task(num:int) -> None:
    .....

@workflow
def test_workflow() -> None:
    example_test_wf1 = example_test(num=0)
    example_test_wf2 = example_test(num=1)
    example_test_wf3 = example_test(num=2)
    example_test_wf4 = example_test(num=3)
    example_test_wf5 = example_test(num=4)

    example_test_wf1 >> example_test_wf2
    example_test_wf2 >> example_test_wf3
    example_test_wf3 >> example_test_wf4
    example_test_wf4 >> example_test_wf5
The code is kind of redundant and it will be more ugly if I have more tasks in the for loop. I am exploring if there is any more concise way.
j

Jay Ganbat

08/18/2023, 6:02 PM
maybe dummy output works 🤔
Copy code
dep_out = 0
for i in range(5):
  output, dep_out = example_test_wf(..., dep_out=dep_out)
and use dynamic task i actually never tried this before so let me know if this works 😅
x

Xin Shi

08/18/2023, 6:21 PM
Thanks for the suggestion! However I also wanted to run these tasks in sequence. One issue with dynamic task is that it seems these
example_test_wf
will be running in parallel task as far as I know (please let me know if there is some configuration that can make them running in sequence).
j

Jay Ganbat

08/18/2023, 6:22 PM
so by adding that dummy out should ensure the sequential operation right, dep_out from previous loop is fed back into the next task well thats the idea but not sure it will work haha
x

Xin Shi

08/18/2023, 6:24 PM
ah, got it now. thanks! will try it and update the result later here.
it seems the above method works 👍. Actually there is no need to use dynamic task. A normal
workflow
works well. It's kind of surprising as I thought for loop is not supported in workflow. It seems there are other people have the same impression as me (see the discussion here https://flyte-org.slack.com/archives/CP2HDHKE1/p1685726254115599?thread_ts=1685464169.404789&cid=CP2HDHKE1). I guess maybe Flyte official did some improvements there. Btw, I am using the latest Flytekit 1.9.0. To test the for loop works in workflow, I tried the following, it worked and it was running in parallel, just like dynamic task.
Copy code
@task
def example_task(num: int) -> None:
    print(f"current num: {num}")


@workflow
def quick_test_workflow():
    for i in range(6):
        example_task(num=i)
j

Jay Ganbat

08/18/2023, 8:01 PM
oh thats good to hear, yeah i think i observed for loop works only if the loop is deterministic at registration time, maybe flyte is just expanding the list directly
35 Views