Clemente Cuevas
01/03/2024, 9:26 PMFlyteRemote
or from the console? I want to get the fully expanded workflow after the dynamic expansion. Thanks!Clemente Cuevas
01/03/2024, 9:28 PMflytekit.remote.executions.FlyteWorkflowExecution
associated with a dynamic workflow, I don't see any attributes that provide this information, including `node_execution`:
In [17]: execution.node_executions
Out[17]: {}
Jay Ganbat
01/03/2024, 10:24 PMsync_node=True
when you fetch the executionJay Ganbat
01/03/2024, 10:24 PMremote.sync_execution(exec_id, sync_nodes=True)
Clemente Cuevas
01/03/2024, 10:44 PMJay Ganbat
01/03/2024, 10:46 PMClemente Cuevas
01/03/2024, 10:46 PMJay Ganbat
01/03/2024, 10:47 PMClemente Cuevas
01/03/2024, 10:47 PMJay Ganbat
01/03/2024, 10:48 PMdef ignore_node(node_name):
return "start" in node_name or "end" in node_name
def construct_exec_time_diagram(exec_obj, remote):
exec_times = {}
if isinstance(exec_obj, FlyteWorkflowExecution):
wf_id = exec_obj.id.name
if not exec_obj.node_executions:
print(f"Found workflow execution syncing nodes... {wf_id}")
remote.sync_execution(exec_obj, sync_nodes=True)
print(f"Done syncing {wf_id}")
for node_name, node_exec in exec_obj.node_executions.items():
if ignore_node(node_exec.metadata.spec_node_id):
continue
exec_times[f"{wf_id}_{node_name}"] = construct_exec_time_diagram(
node_exec, remote
)
elif isinstance(exec_obj, FlyteNodeExecution):
node_name = f"{exec_obj.id.execution_id.name}_{exec_obj.id.node_id}"
if not ignore_node(exec_obj.metadata.spec_node_id):
# If node launches a new workflow or LP
if exec_obj.workflow_executions:
exec_times[f"wf_{node_name}"] = construct_exec_time_diagram(
exec_obj.workflow_executions[0], remote
)
else:
# If dynamic task below will get dynamic task execution time not underlying task
# THIS IS THE ONLY PLACE WE SHOULD BE PUTTING EXECUTION TIME SINCE IT IS THE TASK EXECUTION TIME THAT MATTERS
exec_times[
f"{node_name}_{exec_obj.task_executions[0].id.task_id.name}"
] = exec_obj.task_executions[0].closure.duration.seconds
for (
subnode_name,
subnode_exec_obj,
) in exec_obj.subworkflow_node_executions.items():
if ignore_node(node_name):
continue
exec_times[
f"{node_name}_{subnode_name}"
] = construct_exec_time_diagram(subnode_exec_obj, remote)
return exec_times
Jay Ganbat
01/03/2024, 10:49 PMconstruct_exec_time_diagram
takes in a workflow execution and subsequently goes through each execution and collect runtime metricsClemente Cuevas
01/03/2024, 10:52 PMClemente Cuevas
01/03/2024, 10:52 PMClemente Cuevas
01/03/2024, 10:52 PMJay Ganbat
01/03/2024, 10:52 PMJay Ganbat
01/03/2024, 10:53 PMClemente Cuevas
01/03/2024, 10:54 PMClemente Cuevas
01/03/2024, 10:54 PMClemente Cuevas
01/04/2024, 12:29 AMโญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ Traceback (most recent call last) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ
โ in <module>:1 โ
โ โ
โ โฑ 1 construct_exec_time_diagram(execution, remote) โ
โ โ
โ in construct_exec_time_diagram:17 โ
โ โ
โ โฑ 17 โ โ โ exec_times[f"{wf_id}_{node_name}"] = construct_exec_time_diagram( โ
โ โ
โ in construct_exec_time_diagram:34 โ
โ โ
โ โฑ 34 โ โ โ โ ] = exec_obj.task_executions[0].closure.duration.seconds โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
IndexError: list index out of range
> /var/folders/pn/3sv25zqj1g55w5q8mj13qrq40000gp/T/ipykernel_41805/558582152.py(34)construct_exec_time_diagram()
32 exec_times[
33 f"{node_name}_{exec_obj.task_executions[0].id.task_id.name}"
---> 34 ] = exec_obj.task_executions[0].closure.duration.seconds
35 for (
36 subnode_name,
So I changed this bit under this comment like this:
# If dynamic task below will get dynamic task execution time not underlying task
# THIS IS THE ONLY PLACE WE SHOULD BE PUTTING EXECUTION TIME SINCE IT IS THE TASK EXECUTION TIME THAT MATTERS
if exec_obj.task_executions:
exec_times[
f"{node_name}_{exec_obj.task_executions[0].id.task_id.name}"
] = exec_obj.task_executions[0].closure.duration.seconds
for (
...
but now I realize that when I run it, I am getting a strange result that looks like this:
{'agtr6558tfzk4xjkst5c_n0': {'agtr6558tfzk4xjkst5c_n0_flyte_workflows.flyte_workflows.workflows.paginate_filesystem.test_dynamic_workflow.fill_filesystem': 20},
'agtr6558tfzk4xjkst5c_n2': {'agtr6558tfzk4xjkst5c_n2_flyte_workflows.flyte_workflows.workflows.paginate_filesystem.test_dynamic_workflow.get_length_of_list': 13},
'agtr6558tfzk4xjkst5c_n1': {'agtr6558tfzk4xjkst5c_n1_flyte_workflows.flyte_workflows.workflows.paginate_filesystem.test_dynamic_workflow.paginate_bucket': 17},
'agtr6558tfzk4xjkst5c_n3': {'agtr6558tfzk4xjkst5c_n3_n3-0-start-node': {},
'agtr6558tfzk4xjkst5c_n3_n3-0-n0': {},
'agtr6558tfzk4xjkst5c_n3_n3-0-end-node': {}}}
with these empty tasks at the end. I'm wondering if it's because of me changing that line, but either way, when I've been trying a different method to get at the subworkflow executions, I still can't find task executions in the subworkflow executions I'm looking at.Jay Ganbat
01/04/2024, 12:34 AMClemente Cuevas
01/04/2024, 12:48 AMagtr6558tfzk4xjkst5c_n3
Clemente Cuevas
01/04/2024, 12:48 AMn3
node for the taskClemente Cuevas
01/04/2024, 12:49 AMClemente Cuevas
01/04/2024, 12:50 AMn3-0-n0-0-n0-n1
, but I can't get them from the remoteJay Ganbat
01/04/2024, 12:51 AMJay Ganbat
01/04/2024, 12:52 AMClemente Cuevas
01/04/2024, 12:54 AMNodeExecution.subworkflow_node_executions
, the results are NodeExecution
, not WorkflowExecution
, so I can't remote.sync_execution
themClemente Cuevas
01/04/2024, 12:54 AMNodeExecution
s they don't have nodes themselves.Jay Ganbat
01/04/2024, 12:57 AMClemente Cuevas
01/04/2024, 1:00 AMClemente Cuevas
01/04/2024, 1:01 AMJay Ganbat
01/04/2024, 1:07 AMClemente Cuevas
01/04/2024, 1:09 AMClemente Cuevas
01/04/2024, 1:09 AMn3
Jay Ganbat
01/04/2024, 1:32 AMClemente Cuevas
01/04/2024, 2:02 AMnodes
page in the Flyte UI:Clemente Cuevas
01/04/2024, 2:02 AMClemente Cuevas
01/04/2024, 2:07 AMClemente Cuevas
01/04/2024, 2:09 AMClemente Cuevas
01/04/2024, 2:11 AM<http://localhost:30080/api/v1/data/node_executions/flytesnacks/development/al7jsw45lf4q5kpkkvgs/n3-0-n0-0-n0-n1>
Clemente Cuevas
01/04/2024, 2:12 AMClemente Cuevas
01/04/2024, 2:12 AMdynamic_workflows
attributeClemente Cuevas
01/04/2024, 2:12 AMClemente Cuevas
01/04/2024, 2:13 AMn3-0-n0-0-n0-n1
node in the remoteClemente Cuevas
01/04/2024, 2:31 AMClemente Cuevas
01/04/2024, 4:31 AMimport requests as rq
from typing import Optional
from urllib.parse import urljoin
class FlyteAPIClient:
def __init__(self, base: str = "<http://localhost:30080/api/v1/>"):
self.base = base
def request(self, method: str, endpoint: str):
url = urljoin(self.base, endpoint)
return rq.request(method, url).json()
def get(self, endpoint):
return self.request("get", endpoint)
def get_subnode_executions(self, execution_id: str, parent_node: str, project: str="flytesnacks", environment: str="development", depth=1):
nodes = []
req_url = f"node_executions/{project}/{environment}/{execution_id}?uniqueParentId={parent_node['id']['node_id']}&limit=10000"
res = self.get(req_url)
for ele in res["node_executions"]:
nodes.append((depth, ele))
if ele.get("metadata", {}).get("is_parent_node"):
nodes += self.get_subnode_executions(execution_id=execution_id, parent_node=ele, depth=depth+1)
return nodes
def get_node_executions(self, execution_id: str, project: str="flytesnacks", environment: str="development"):
nodes = []
res = self.get(f"node_executions/{project}/{environment}/{execution_id}?limit=1000")
for ele in res["node_executions"]:
nodes.append((0, ele))
if ele.get("metadata", {}).get("is_parent_node"):
nodes += self.get_subnode_executions(execution_id=execution_id, parent_node=ele)
return nodes
flyte_client = FlyteAPIClient()
node_executions = flyte_client.get_node_executions("al7jsw45lf4q5kpkkvgs")
Clemente Cuevas
01/04/2024, 4:32 AMClemente Cuevas
01/04/2024, 4:32 AMClemente Cuevas
01/04/2024, 4:41 AMClemente Cuevas
01/04/2024, 4:42 AMKetan (kumare3)
Ketan (kumare3)
Clemente Cuevas
01/04/2024, 4:44 AMClemente Cuevas
01/04/2024, 4:44 AMClemente Cuevas
01/04/2024, 4:45 AMKetan (kumare3)
Clemente Cuevas
01/04/2024, 4:46 AMJay Ganbat
01/04/2024, 7:25 AMKetan (kumare3)
Clemente Cuevas
01/04/2024, 6:43 PMClemente Cuevas
01/04/2024, 6:44 PM