Len Strnad
01/02/2024, 5:34 PM@dynamic
def dynamic_workflow(a: str, bs: list[str]) -> list[str]:
# imagine each build_dataset_workflow runs map_task
# if bs is largish, we don't control the number of tasks well
datasets = [build_dataset_workflow(a=a, b=b) for b in bs]
# Can we can kick off each workflow in serial
# how can we achieve something like programmatically:
datasets[0] >> datasets[1]
datasets[1] >> datasets[2]
...
or something like:
@dynamic
def dynamic_workflow(a: str, bs: list[str]) -> list[str]:
datasets = []
for b in bs:
# is there a wait function that can be used here?
ds = wait(build_dataset_workflow(a=a, b=b))
datasets.append(ds)
return datasets
Any ideas?Len Strnad
01/02/2024, 5:38 PMmap_task
to solve this like:
@dynamic
def dynamic_workflow(bs: list[str]) -> list[str]:
return map_task(build_dataset_workflow, concurrency=1)(b=bs)
Len Strnad
01/02/2024, 5:56 PMMap tasks can only compose of PythonFuncton and PythonInstanceTasks currently
. So, that does not look like an option.Thomas Newton
01/02/2024, 6:14 PM>>
. I haven't seen >>>
before.Len Strnad
01/02/2024, 6:15 PM>>
and corrected above.Len Strnad
01/02/2024, 6:15 PMbs
, but what if it is unknown? maybe I can map the bitwise shift operator over consecutive pairs? Trying that nowThomas Newton
01/02/2024, 6:17 PMfor
loop before but I can't remember it clearly.Thomas Newton
01/02/2024, 6:19 PMLen Strnad
01/02/2024, 6:20 PMLen Strnad
01/02/2024, 10:52 PM@dynamic
def serial_workflows():
datasets = [example_workflow(...) for _ in range(10)]
outputs = [[datasets[i], datasets[i + 1]] for i in range(len(datasets) - 1)]
return list(map(lambda x: x[0] >> x[1], outputs))