nutritious-manchester-15545
11/29/2023, 10:32 AMhallowed-mouse-14616
11/29/2023, 1:18 PMhallowed-mouse-14616
11/29/2023, 1:20 PMhallowed-mouse-14616
11/29/2023, 1:22 PM@task
def sleep_task(idx: int, seconds: int) -> int:
time.sleep(seconds)
print(f"{idx}: Slept {seconds} seconds")
return seconds
@workflow
def fanout_sleep_sub_wf(length: int = 5000, sleep_seconds: int = 1) -> int:
payload = generate_map(length=length)
partial_task = functools.partial(sleep_task, seconds=sleep_seconds)
# Arbitrarily large concurrency value to see if we can scale up the nodes.
map_func = map_task(partial_task, concurrency=10000)
return sum_task(
data=map_func(idx=payload).with_overrides(cache=False, cache_serialize=False)
)
fanout_sleep_lp = LaunchPlan.get_default_launch_plan(
current_context(), fanout_sleep_sub_wf
)
@workflow
def fanout_sleep_wf(length: int = 5000, sleep_seconds: int = 1) -> int:
last_result = None
for _ in range(
10
): # Arbitrarily chosen fanout. Keeping under default parallelism of 25.
last_result = fanout_sleep_lp(length=length, sleep_seconds=sleep_seconds)
return last_result
Is a load testing workflow we use internally to help scale. This example launches arbitrary workload (1s sleeps) 50k times.nutritious-manchester-15545
11/29/2023, 1:37 PMhallowed-mouse-14616
11/29/2023, 1:52 PMhallowed-mouse-14616
11/29/2023, 1:53 PMhallowed-mouse-14616
11/29/2023, 1:54 PM