https://flyte.org logo
#ask-the-community
Title
# ask-the-community
l

Len Strnad

01/02/2024, 5:34 PM
I would like to kick off workflows using the dynamic workflow convention. However, each workflow has a map_task and I would like to control for the number of tasks running. I would prefer to have each workflow run in serial to accomplish this. Code snippet could look like:
Copy code
@dynamic
def dynamic_workflow(a: str, bs: list[str]) -> list[str]:
    # imagine each build_dataset_workflow runs map_task
    # if bs is largish, we don't control the number of tasks well
    datasets = [build_dataset_workflow(a=a, b=b) for b in bs]

    # Can we can kick off each workflow in serial
    # how can we achieve something like programmatically: 
    datasets[0] >> datasets[1]
    datasets[1] >> datasets[2]
    ...
or something like:
Copy code
@dynamic
def dynamic_workflow(a: str, bs: list[str]) -> list[str]:
    datasets = []
    for b in bs:
        # is there a wait function that can be used here?
        ds = wait(build_dataset_workflow(a=a, b=b))
        datasets.append(ds)
    return datasets
Any ideas?
Or maybe I can use
map_task
to solve this like:
Copy code
@dynamic
def dynamic_workflow(bs: list[str]) -> list[str]:
    return map_task(build_dataset_workflow, concurrency=1)(b=bs)
I tried using map_task to accomplish this, but got
Map tasks can only compose of PythonFuncton and PythonInstanceTasks currently
. So, that does not look like an option.
t

Thomas Newton

01/02/2024, 6:14 PM
I think something like your first suggestion will work using
>>
. I haven't seen
>>>
before.
l

Len Strnad

01/02/2024, 6:15 PM
Apologies! I meant
>>
and corrected above.
The first approach would work if I knew the length of
bs
, but what if it is unknown? maybe I can map the bitwise shift operator over consecutive pairs? Trying that now
t

Thomas Newton

01/02/2024, 6:17 PM
Yes, I believe that is possible. I think I have done something like this with a
for
loop before but I can't remember it clearly.
l

Len Strnad

01/02/2024, 6:20 PM
Awesome. Yes, I think this is likely the best solution. I’ll report back.'
Worked like a charm:
Copy code
@dynamic
def serial_workflows():
    datasets = [example_workflow(...) for _ in range(10)]
    outputs = [[datasets[i], datasets[i + 1]] for i in range(len(datasets) - 1)]
    return list(map(lambda x: x[0] >> x[1], outputs))
2 Views