bored-beard-89967
01/02/2024, 5:34 PM@dynamic
def dynamic_workflow(a: str, bs: list[str]) -> list[str]:
    # imagine each build_dataset_workflow runs map_task
    # if bs is largish, we don't control the number of tasks well
    datasets = [build_dataset_workflow(a=a, b=b) for b in bs]
    # Can we can kick off each workflow in serial
    # how can we achieve something like programmatically: 
    datasets[0] >> datasets[1]
    datasets[1] >> datasets[2]
    ...
or something like:
@dynamic
def dynamic_workflow(a: str, bs: list[str]) -> list[str]:
    datasets = []
    for b in bs:
        # is there a wait function that can be used here?
        ds = wait(build_dataset_workflow(a=a, b=b))
        datasets.append(ds)
    return datasets
Any ideas?bored-beard-89967
01/02/2024, 5:38 PMmap_task to solve this like:
@dynamic
def dynamic_workflow(bs: list[str]) -> list[str]:
    return map_task(build_dataset_workflow, concurrency=1)(b=bs)bored-beard-89967
01/02/2024, 5:56 PMMap tasks can only compose of PythonFuncton and PythonInstanceTasks currently. So, that does not look like an option.calm-pilot-2010
01/02/2024, 6:14 PM>>. I haven't seen >>> before.bored-beard-89967
01/02/2024, 6:15 PM>> and corrected above.bored-beard-89967
01/02/2024, 6:15 PMbs, but what if it is unknown? maybe I can map the bitwise shift operator over consecutive pairs? Trying that nowcalm-pilot-2010
01/02/2024, 6:17 PMfor loop before but I can't remember it clearly.calm-pilot-2010
01/02/2024, 6:19 PMbored-beard-89967
01/02/2024, 6:20 PMbored-beard-89967
01/02/2024, 10:52 PM@dynamic
def serial_workflows():
    datasets = [example_workflow(...) for _ in range(10)]
    outputs = [[datasets[i], datasets[i + 1]] for i in range(len(datasets) - 1)]
    return list(map(lambda x: x[0] >> x[1], outputs))