freezing-shampoo-67249
04/24/2023, 1:13 PMimport time
from typing import List
from flytekit import map_task, task, workflow
from flytekit.core.node_creation import create_node
USE_CACHE = True
CACHE_VERSION = "0.0.27"
SERIALIZE_CACHE = False
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t1() -> str:
print("t1")
time.sleep(1)
return "t1"
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t2(inp: str) -> str:
print("t2")
time.sleep(1)
return "t2"
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t3_prep(inp: str) -> List[str]:
print("t3_prep")
return ["a", "b", "c"]
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t3(inp: str) -> str:
print(f"t3 :: {inp}")
time.sleep(1)
return "t3"
@workflow
def wf1():
t1_out = t1()
t2_out = t2(inp=t1_out)
t3_prep_out = t3_prep(inp=t2_out)
t3_out = map_task(t3)(inp=t3_prep_out)
@workflow
def wf2():
wf1()
t1_out = t1()
t2_out = t2(inp=t1_out)
@workflow
def wf():
n1 = create_node(wf1)
n2 = create_node(wf2)
n1 >> n2
if __name__ == "__main__":
wf()
I get the (unexpected) output when running locally
t1
t2
t3_prep
t3 :: a
t3 :: b
t3 :: c
t3 :: a
t3 :: b
t3 :: c
Why is "t3" executed twice? I would expect it to be executed only once.hallowed-mouse-14616
04/24/2023, 1:24 PMcache_serialize
is not respected with executing maptasks. This is a construct of it being implemented as a plugin rather than a core component of Flyte, so it is essentially excluded from the core logic.
Probably more information than you want, but we are currently working on an ArrayNode
type (RFC here) that can be tracked with this issue. From a high-level this means that maptasks will be a included in the core Flyte execution logic, enabling things like cache_serialize
, the cache overwrite during execution functionality, but also allowing mapping over all plugin types and even different Flyte node types.freezing-shampoo-67249
04/24/2023, 1:37 PMArrayNode
feature is exactly what I need. Looking forward to having this in one of the next releases of Flyte. 🙂