https://flyte.org logo
#ask-the-community
Title
# ask-the-community
a

Adrian Loy

11/29/2023, 10:32 AM
Lets say you have a task that you need to run 100.000 times or more. How would you do it? We thought about MapTask after reading this blogpost but it seems like there are issues with parellelizing maptasks over very large lists, we encountered Problems when trying to parallelize on 20.000 keys already with the experimental MapTask. It seems like the non-experimental MapTask has a hard limit on 5.000 tasks. I am a bit suprised by this, as the blogpost and the documentation of MapTask seem to not mention anything in that direction. Is a dynamic workflow that schedules 100.000 tasks a better idea?
d

Dan Rammer (hamersaw)

11/29/2023, 1:18 PM
Can you elaborate a bit on the issues you found with the experimental maptask? The ultimate goal is to transition here (advantages out of scope), so just want to make sure we're successfully covering use cases.
The 5k limit is a configuration option, but we don't recommend going much larger. I believe users have successfully executing up to 10k. This limit was added as a result of state storage size restrictions. As you may know, Flyte uses etcd to store workflow execution state. There is a limit (on EKS 1.5MB) for object sizes in etcd, so the 5k limit is so that we do not run into issues there.
A workaround here is to batch your 100k inputs into multiple lists and execute over multiple maptasks using different launchplans. For example:
Copy code
@task
def sleep_task(idx: int, seconds: int) -> int:
    time.sleep(seconds)
    print(f"{idx}: Slept {seconds} seconds")
    return seconds


@workflow
def fanout_sleep_sub_wf(length: int = 5000, sleep_seconds: int = 1) -> int:
    payload = generate_map(length=length)
    partial_task = functools.partial(sleep_task, seconds=sleep_seconds)
    # Arbitrarily large concurrency value to see if we can scale up the nodes.
    map_func = map_task(partial_task, concurrency=10000)
    return sum_task(
        data=map_func(idx=payload).with_overrides(cache=False, cache_serialize=False)
    )


fanout_sleep_lp = LaunchPlan.get_default_launch_plan(
    current_context(), fanout_sleep_sub_wf
)


@workflow
def fanout_sleep_wf(length: int = 5000, sleep_seconds: int = 1) -> int:
    last_result = None
    for _ in range(
        10
    ):  # Arbitrarily chosen fanout. Keeping under default parallelism of 25.
        last_result = fanout_sleep_lp(length=length, sleep_seconds=sleep_seconds)
    return last_result
Is a load testing workflow we use internally to help scale. This example launches arbitrary workload (1s sleeps) 50k times.
a

Adrian Loy

11/29/2023, 1:37 PM
Thank you for the answer. We are indeed now pursuing some chunking approach, I just felt like I was missing something. Would you agree that a dynamic workflow launching that many tasks is also a bad idea? I read somewhere in the docs that the metadata associated with that workflow would be a reason to avoid that setup.
d

Dan Rammer (hamersaw)

11/29/2023, 1:52 PM
Yeah, so two things about executing dynamic tasks at scale (1) they start a pod initially to compile the dynamic workflow closure before executing all of the internal tasks. So in this scenario that additional overhead will likely be unnoticable given the cost of executing 100k tasks, but something to be cognizant of and (2) maptasks use bitarrays to store a subset of metadata for each task execution as an optimization. This means that the size of the etcd object for executing 5k tasks using a maptask will be significantly smaller than using a dynamic workflow for the same 5k tasks. Basically, they can scale larger.
Was the experimental maptask issue just a problem with scaling to 100k tasks? I'm wondering if it works fine for smaller inputs and the missing 5k safeguard exposes some of the issues.
sorry, just saw the tag on the other thread!
6 Views