Len Strnad
07/11/2023, 6:09 PMfrom flytekit.remote import FlyteRemote, FlyteWorkflowExecution, FlyteTaskExecution
from flytekit.configuration import Config
remote = FlyteRemote(
config=Config.for_endpoint(endpoint=endpoint),
default_project=project,
default_domain=domain,
)
# specific execution
execution = remote.fetch_execution(name="ffc0e12bb3e314fae9aa")
ex = remote.sync_execution(execution=execution, sync_nodes=True)
# ['n0'] is the node I am interested in
node_times = {x.metadata.spec_node_id: x.closure.duration.total_seconds() / 60 \
for x in ex.node_executions['n0'].executions}
{'end-node': 0.0,
'n0': 2.698262133333333,
'n1': 2.202614066666667,
'n2': 1.59931375,
'n3': 1.8271749,
'n4': 30.83562241666667,
'n5': 0.2830597833333333,
'n6': 0.3337019666666666,
'start-node': 0.0}
however, I am really struggling to map the node ids to the task name. Any thoughts?Jay Ganbat
07/11/2023, 8:24 PMLen Strnad
07/11/2023, 8:25 PMexample = ex.node_executions['n0'].executions[2]
example.metadata
gives
retry_group: "0"
spec_node_id: "end-node"
example.id
gives
node_id: "n0-0-n1"
execution_id {
project: "..."
domain: "..."
name: "ffc0e12bb3e314fae9aa"
}
Jay Ganbat
07/11/2023, 8:29 PMtask_node_metadata
Len Strnad
07/11/2023, 8:30 PMexample
?Jay Ganbat
07/11/2023, 8:31 PMexecution.node_executions.get("n1").closure.task_node_metadata
Len Strnad
07/11/2023, 8:35 PMexecution.node_executions.get('n0').closure.task_node_metadata
is None for me.Jay Ganbat
07/11/2023, 8:44 PMLen Strnad
07/11/2023, 8:45 PMJay Ganbat
07/11/2023, 8:46 PMLen Strnad
07/11/2023, 8:47 PMexecution = remote.fetch_execution(name="ffc0e12bb3e314fae9aa")
references a workflow execution where there might be a combination of subworkflows and tasks{'end-node': 0.0,
'n0': 2.698262133333333,
'n1': 2.202614066666667,
'n2': 1.59931375,
'n3': 1.8271749,
'n4': 30.83562241666667,
'n5': 0.2830597833333333,
'n6': 0.3337019666666666,
'start-node': 0.0}
can be either a subworkflow or task to add clarityJay Ganbat
07/11/2023, 8:49 PMLen Strnad
07/11/2023, 8:53 PMJay Ganbat
07/11/2023, 8:58 PMLen Strnad
07/11/2023, 9:07 PMfrom flytekit.remote import FlyteRemote
from flytekit.models.core.identifier import NodeExecutionIdentifier
from flytekit.clients.helpers import iterate_task_executions
from flytekit.configuration import Config
remote = FlyteRemote(
config=Config.for_endpoint(endpoint=endpoint),
default_project=project,
default_domain=domain,
)
execution = remote.fetch_execution(name="ffc0e12bb3e314fae9aa")
execution = remote.sync_execution(execution=execution, sync_nodes=True)
profiles = {}
for node_execution in execution.node_executions['n0'].executions:
nid = NodeExecutionIdentifier(node_id=node_execution.id.node_id, execution_id=execution.id)
for t in iterate_task_executions(client=remote.client, node_execution_identifier=nid):
profiles[t.id.task_id.name] = t.closure.duration
Jay Ganbat
07/11/2023, 9:20 PMLen Strnad
07/11/2023, 11:10 PMfrom flytekit.remote import FlyteRemote
from flytekit.models.core.identifier import NodeExecutionIdentifier
from flytekit.clients.helpers import iterate_task_executions
from flytekit.configuration import Config
remote = FlyteRemote(
config=Config.for_endpoint(endpoint=endpoint),
default_project=project,
default_domain=domain,
)
execution = remote.fetch_execution(name="ffc0e12bb3e314fae9aa")
execution = remote.sync(execution=execution, sync_nodes=True)
profiles = {}
def nested_task_info(node_executions):
for ne in node_executions.values():
if ne.id.node_id == "start-node" or ne.id.node_id == "end-node":
continue
if ne.metadata.is_parent_node:
nested_task_info(ne.subworkflow_node_executions)
else:
nid = NodeExecutionIdentifier(node_id=ne.id.node_id, execution_id=execution.id)
for t in iterate_task_executions(client=remote.client, node_execution_identifier=nid):
profiles[t.id.task_id.name] = t.closure.duration
nested_task_info(execution.node_executions)
Jay Ganbat
07/11/2023, 11:54 PMKetan (kumare3)
Len Strnad
07/12/2023, 1:16 PMKetan (kumare3)
Len Strnad
07/12/2023, 9:56 PMDan Rammer (hamersaw)
07/13/2023, 1:30 AMpyflyte metrics explain
command (code here), basically it fetches all of the nested workflow with configurable depth and breaks down the runtime into multiple spans to better attribute time spend during node execution.Len Strnad
07/13/2023, 1:11 PM