bored-beard-89967
02/23/2024, 4:11 PMmap_task
using dynamic
. Sharing here to see if anyone has any “gotchas” or additional ideas. I wrap map_task
with a dynamic
task for two reasons:
1. We can parameterize the concurrency arg, this is helpful for us when we have a big variety in the number of inputs between executions.
2. If caching at the task level, we can cache at the dynamic level (I just recently discovered dynamic also supports caching). This (from what I can tell) avoids the need to check the cache on every task in map_task and will check cache for just the dynamic task. This can save a few minutes in some cases!
from flytekit import map_task, task, workflow, dynamic
@task(cache=True, cache_version="0.0.1")
def build_inputs() -> list[int]:
return list(range(10))
@task(cache=True, cache_version="0.0.1")
def example_task(x: int) -> int:
return x
@dynamic(cache=True, cache_version="0.0.1")
def dynamic_map_task(xs: list[int], concurrency: int) -> list[int]:
return map_task(example_task, concurrency=concurrency)(x=xs)
@workflow
def example_workflow(concurrency: int = 16) -> list[int]:
inputs = build_inputs()
return dynamic_map_task(xs=inputs, concurrency=concurrency)
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
bored-beard-89967
02/26/2024, 4:49 PMfreezing-airport-6809
bored-beard-89967
02/27/2024, 3:08 PMfreezing-airport-6809
bored-beard-89967
02/27/2024, 3:10 PMfaint-machine-61752
04/12/2024, 8:43 AMfreezing-airport-6809
bored-beard-89967
04/12/2024, 3:34 PMfaint-machine-61752
04/16/2024, 6:55 AMbored-beard-89967
04/16/2024, 1:19 PMfaint-machine-61752
04/24/2024, 1:57 PMfreezing-airport-6809
bored-beard-89967
04/24/2024, 2:09 PMbored-beard-89967
04/24/2024, 2:10 PM