I have been pretty happy with this pattern around ...
# ask-the-community
l
I have been pretty happy with this pattern around
map_task
using
dynamic
. Sharing here to see if anyone has any “gotchas” or additional ideas. I wrap
map_task
with a
dynamic
task for two reasons: 1. We can parameterize the concurrency arg, this is helpful for us when we have a big variety in the number of inputs between executions. 2. If caching at the task level, we can cache at the dynamic level (I just recently discovered dynamic also supports caching). This (from what I can tell) avoids the need to check the cache on every task in map_task and will check cache for just the dynamic task. This can save a few minutes in some cases!
Copy code
from flytekit import map_task, task, workflow, dynamic


@task(cache=True, cache_version="0.0.1")
def build_inputs() -> list[int]:
    return list(range(10))


@task(cache=True, cache_version="0.0.1")
def example_task(x: int) -> int:
    return x


@dynamic(cache=True, cache_version="0.0.1")
def dynamic_map_task(xs: list[int], concurrency: int) -> list[int]:
    return map_task(example_task, concurrency=concurrency)(x=xs)


@workflow
def example_workflow(concurrency: int = 16) -> list[int]:
    inputs = build_inputs()
    return dynamic_map_task(xs=inputs, concurrency=concurrency)
k
Dynamic does support caching
Your approach is indeed supported
Can you write a blog to educate others 😍
l
Where could I get started and is there anything else worth adding around?
k
No just write your experience and usecase - where do you work? You can write it there or on Flyte.org. Preferably at your company’s blog. Usecases and education together is better
l
I am at Pachama. I am not sure about our priority of writing a blog like this and I would probably do so outside of working hours. Maybe I’ll make a short medium article and make sure to add use cases examples and differences in cache hit times.
k
Also happy to peer review suggest edits. Cc @David Espejo (he/him) / @Sage Elliott
l
Sounds good! No promises, but I’ll put it on the docket!
h
Hi @Len Strnad We tried your approach here wrapping map_task using dynamic in one of our long running pipelines. It actually helped us in saving time by avoiding the need to read cached results from each task within a map_task. Thanks for sharing this gratitude thank you
k
Why were you reading cached results in a map task would love to understand the problem
l
I think they mean they saved time in their workflow by checking one cache (the dynamic task) instead of each task in the map task individually. I could be wrong. That is why I often do it. I'll do the same thing when using stubs from workflows in other projects that don't have the most performance caching in sub tasks.
h
@Ketan (kumare3) We have a cron schedule associated with the launch plan and we prefer to not rerun the map tasks in every run, hence we have enabled caching on the input task. We had a different problem too with the map_tasks that it gets stuck at initialising for a very long time like 1h+ on re-runs and we couldn’t figure out why that was happening as no pods were getting spawned or no logs were present. So an immediate solution for us was to avoid the attempt to rerun the map task and wrap it with dynamic, so that it only checks the cache at dynamic level. @Len Strnad solution saved the time for our workflows.
l
@Hema Jayachandran How many tasks and what type of objects are being cached? Just curious.
h
Hey @Len Strnad So those are tensors task that is used by our model training. Number of tasks varies between different entities that we run. Minimum is 40 and maximum that we have is 1600
k
We can support caching at the map level it self
l
Gotcha, for data like this we usually implement writing out to zarr, tfrecords, etc and return the paths to the artifacts. We end up writing the files with a plugin that lets us scale up within the task. My most recent preference has been to use dataframes to manage the list of training files and return that since flyte handles dfs nicely.
Then retrieval of cached outputs is much more lightweight since it is basically a list of urls.