Len Strnad
02/23/2024, 4:11 PMmap_task
using dynamic
. Sharing here to see if anyone has any “gotchas” or additional ideas. I wrap map_task
with a dynamic
task for two reasons:
1. We can parameterize the concurrency arg, this is helpful for us when we have a big variety in the number of inputs between executions.
2. If caching at the task level, we can cache at the dynamic level (I just recently discovered dynamic also supports caching). This (from what I can tell) avoids the need to check the cache on every task in map_task and will check cache for just the dynamic task. This can save a few minutes in some cases!
from flytekit import map_task, task, workflow, dynamic
@task(cache=True, cache_version="0.0.1")
def build_inputs() -> list[int]:
return list(range(10))
@task(cache=True, cache_version="0.0.1")
def example_task(x: int) -> int:
return x
@dynamic(cache=True, cache_version="0.0.1")
def dynamic_map_task(xs: list[int], concurrency: int) -> list[int]:
return map_task(example_task, concurrency=concurrency)(x=xs)
@workflow
def example_workflow(concurrency: int = 16) -> list[int]:
inputs = build_inputs()
return dynamic_map_task(xs=inputs, concurrency=concurrency)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Len Strnad
02/26/2024, 4:49 PMKetan (kumare3)
Len Strnad
02/27/2024, 3:08 PMKetan (kumare3)
Len Strnad
02/27/2024, 3:10 PMHema Jayachandran
04/12/2024, 8:43 AMKetan (kumare3)
Len Strnad
04/12/2024, 3:34 PMHema Jayachandran
04/16/2024, 6:55 AMLen Strnad
04/16/2024, 1:19 PMHema Jayachandran
04/24/2024, 1:57 PMKetan (kumare3)
Len Strnad
04/24/2024, 2:09 PMLen Strnad
04/24/2024, 2:10 PM