Eric Song
08/14/2023, 9:28 PMmap_task
but my subtasks are stuck in the "Queued" state. Seeing the following in the Flyte admin logs.
Request failed due to [rpc error: code = NotFound desc = Failed to get existing node execution id: [node_id:"dn0" execution_id:<project:"flytetester" domain:"development" name:"eric-vtph869yqo6qyxstzp2diw" > ] with err: entry not found]. If it's an unauthenticated error, we will attempt to establish an authenticated context.
ArrayNode event recording failed: [ExecutionNotFound: The execution that the event belongs to does not exist, caused by [rpc error: code = NotFound desc = Failed to get existing node execution id: [node_id:"dn0" execution_id:<project:"flytetester" domain:"development" name:"eric-vtph869yqo6qyxstzp2diw" > ] with err: entry not found]]
failed Execute for node. Error: ExecutionNotFound: The execution that the event belongs to does not exist, caused by [rpc error: code = NotFound desc = Failed to get existing node execution id: [node_id:"dn0" execution_id:<project:"flytetester" domain:"development" name:"eric-vtph869yqo6qyxstzp2diw" > ] with err: entry not found]
Yee
Eric Song
08/14/2023, 10:45 PM@dynamic
def my_subtask(input: str) -> str:
<http://logging.info|logging.info>('Starting my_subtask')
i = 0
while i < 60:
logging.info(f'subtask Iteration - {i}')
sleep(10)
i += 1
return f"subtask - {input}"
@dynamic
def example_task(input: str) -> str:
<http://logging.info|logging.info>('Starting example_task')
map_task(my_subtask)(
input=["a", "b", "c", "d", "e"]
)
return "hi"
This works if I just swap in the old map_task
.MUhwjR9VF3K2QJhL2VvNGoFY
08/14/2023, 10:46 PMYee
my_subtask
to a normal task instead of a dynamic task?Eric Song
08/14/2023, 10:58 PMexample_task
(the parent) to be task
but it's fine if the subtask stays as dynamic
Yee
Eduardo Apolinario (eapolinario)
08/15/2023, 12:09 AMEric Song
08/15/2023, 12:10 AMexample_task
set to task
instead of dynamic
, I lose parallelism of the map_task
and the tasks are executed serially. I tried setting concurrency
but that didn't fix it.
So I have the following situation
• old map_task + example_task
as @task
-> executes serially
• old map_task + example_task
as @dynamic
-> executes in parallel (what I want)
• new map_task + example_task
as @task
-> executes serially
• new map_task + example_task
as @dynamic
-> doesn't launch subtask podsEduardo Apolinario (eapolinario)
08/15/2023, 12:29 AM@dynamic + array node map tasks
.
And just to clarify a misconception, nested tasks (in this case a map task running as part of another @task
) are not distributed, i.e. they all run in the same container (serially in the original container. In this latest flytekit release you'll start seeing a warning that explains this (more specifically you'll see a message that says "You are not supposed to nest @Task/@Workflow inside a @Task!"
in the logs).Eric Song
08/15/2023, 1:13 AMEduardo Apolinario (eapolinario)
08/15/2023, 1:54 AMDan Rammer (hamersaw)
08/15/2023, 7:53 PMArrayNode
event reporting does not match node IDs when it's a child node. So in addition to the dynamic as you mentioned, this doesn't work in subworkflows or conditionals either. I have a small PR that corrects the execution, but it is not meshing well with the UI. We're having some deeper internal discussion to sort all of this out.ArrayNode
, but we are very excited about all of the feature improvements they can bring to maptasks!Eric Song
08/15/2023, 10:58 PM