Visak
03/21/2023, 11:01 PM@task
Def add_1(n : float) -> float:
return n + 1
@workflow
Def some_name():
# I want these two steps to run sequentially instead of parallel
c1 = add_1(1.0)
c2 = add_1(2.0)
Jay Ganbat
03/21/2023, 11:06 PMVisak
03/21/2023, 11:13 PM@workflow
Def some_name():
# I want these two steps to run sequentially instead of parallel
for item in list:
add_1(item)
Jay Ganbat
03/21/2023, 11:17 PMlist
has a predefined values or will it be determined at runtime? because workflow is evaluated during registration.
You might be able to do that in dynamic task
@Ketan (kumare3) any insight ๐
Visak
03/21/2023, 11:23 PMtask
in a loopJay Ganbat
03/21/2023, 11:26 PMresult = SOME VALUE
for item in list:
result = add_1(item, result)
never though about this beforeVisak
03/21/2023, 11:30 PMvarsha Parthasarathy
03/22/2023, 12:08 AMKetan (kumare3)
Visak
03/22/2023, 1:26 AMvarsha Parthasarathy
03/22/2023, 2:16 AMx: will be different for every run
for i in range(x):
splits = split_inputs(inputs=input_tasks,s=i)
r = flytekit.map_task(
run_maptask_fpo,
concurrency=50,
min_success_ratio=0.9)(
curation_input=splits)
result.append(r)
@Ketan (kumare3) - I want flytekit.map_task(..) to be launched once, wait for r , append to result and again call flytekit.map_task(..) for x times.
But currently all map_tasks are launched x times, all at once.Ketan (kumare3)
from flytekit import task, workflow, map_task, dynamic
import typing
@task
def square(i: int) -> int:
return i*i
@task
def splits(i: int) -> typing.List[int]:
return list(range(i))
@task
def join(new_v: typing.List[int], v: typing.List[int]) -> typing.List[int]:
v.extend(new_v)
return v
@dynamic
def do(i: int, x: int, v: typing.List[int]) -> typing.List[int]:
s = splits(i=i)
new_v = map_task(square)(i=s)
v = join(new_v=new_v, v=v)
if i == x - 1:
return v
return do(i=i+1, x=x, v=v)
@workflow
def wf(x: int):
do(i=1, x=x, v=[])
Jay Ganbat
03/22/2023, 5:11 AMVisak
03/22/2023, 5:39 AMKetan (kumare3)