Does anyone know if it's possible to get a full wo...
# ask-the-community
c
Does anyone know if it's possible to get a full workflow graph for a dynamic workflow? Using either
FlyteRemote
or from the console? I want to get the fully expanded workflow after the dynamic expansion. Thanks!
I've tried fetching the execution associated with a workflow, but when I check out a
flytekit.remote.executions.FlyteWorkflowExecution
associated with a dynamic workflow, I don't see any attributes that provide this information, including `node_execution`:
Copy code
In [17]: execution.node_executions
Out[17]: {}
j
enable
sync_node=True
when you fetch the execution
Copy code
remote.sync_execution(exec_id, sync_nodes=True)
c
Great, this gave me the node executions. Trying to get at the actual computation graph is still a little bit tricky from these but I'm seeing what I can do
j
are you trying to get like actual runtime etc? probably need to write a recursive method based on how complicated your subworkflows are
c
yeah, that's what I'm doing now
j
our workflows are quite nested so i had to go through the same thing ๐Ÿ˜…
c
were you able to publish your method?
j
i ended up with this ๐Ÿ˜…
Copy code
def ignore_node(node_name):
    return "start" in node_name or "end" in node_name


def construct_exec_time_diagram(exec_obj, remote):
    exec_times = {}
    if isinstance(exec_obj, FlyteWorkflowExecution):
        wf_id = exec_obj.id.name
        if not exec_obj.node_executions:
            print(f"Found workflow execution syncing nodes... {wf_id}")
            remote.sync_execution(exec_obj, sync_nodes=True)
            print(f"Done syncing {wf_id}")

        for node_name, node_exec in exec_obj.node_executions.items():
            if ignore_node(node_exec.metadata.spec_node_id):
                continue
            exec_times[f"{wf_id}_{node_name}"] = construct_exec_time_diagram(
                node_exec, remote
            )

    elif isinstance(exec_obj, FlyteNodeExecution):
        node_name = f"{exec_obj.id.execution_id.name}_{exec_obj.id.node_id}"
        if not ignore_node(exec_obj.metadata.spec_node_id):
            # If node launches a new workflow or LP
            if exec_obj.workflow_executions:
                exec_times[f"wf_{node_name}"] = construct_exec_time_diagram(
                    exec_obj.workflow_executions[0], remote
                )
            else:
                # If dynamic task below will get dynamic task execution time not underlying task
                # THIS IS THE ONLY PLACE WE SHOULD BE PUTTING EXECUTION TIME SINCE IT IS THE TASK EXECUTION TIME THAT MATTERS
                exec_times[
                    f"{node_name}_{exec_obj.task_executions[0].id.task_id.name}"
                ] = exec_obj.task_executions[0].closure.duration.seconds
                for (
                    subnode_name,
                    subnode_exec_obj,
                ) in exec_obj.subworkflow_node_executions.items():
                    if ignore_node(node_name):
                        continue
                    exec_times[
                        f"{node_name}_{subnode_name}"
                    ] = construct_exec_time_diagram(subnode_exec_obj, remote)
    return exec_times
so
construct_exec_time_diagram
takes in a workflow execution and subsequently goes through each execution and collect runtime metrics
c
ah I see. This is nice, but I guess what I'm trying to get is like, an unfurled graph of tasks. Which I'm not sure I'm even able to get. I think what you've got is more practical though (you're keeping track of the actual workflow nodes and capturing runtimes per node).
ahh maybe I missed the recursive call in there
that's definitely helpful to see
j
i think you should be able to construct it, flyteconsole must be doing somehting similar since we can see it in the graph tab
yeah i was only interested in the actual execution time and discarding any queue times
c
I agree that it should be able to constructed somewhere. I'm actually not trying to figure out execution times (yet), but the full set of tasks that were executed. I'll try your method out though now that I realize it's probably closer to what I'm looking for
maybe I can help expand it as well
hey @Jay Ganbat, your code didn't work for me when I initially used it. I would get this error:
Copy code
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Traceback (most recent call last) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ in <module>:1                                                                                                   โ”‚
โ”‚                                                                                                                 โ”‚
โ”‚ โฑ 1 construct_exec_time_diagram(execution, remote)                                                              โ”‚
โ”‚                                                                                                                 โ”‚
โ”‚ in construct_exec_time_diagram:17                                                                               โ”‚
โ”‚                                                                                                                 โ”‚
โ”‚ โฑ 17 โ”‚   โ”‚   โ”‚   exec_times[f"{wf_id}_{node_name}"] = construct_exec_time_diagram(                              โ”‚
โ”‚                                                                                                                 โ”‚
โ”‚ in construct_exec_time_diagram:34                                                                               โ”‚
โ”‚                                                                                                                 โ”‚
โ”‚ โฑ 34 โ”‚   โ”‚   โ”‚   โ”‚   ] = exec_obj.task_executions[0].closure.duration.seconds                                   โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
IndexError: list index out of range

> /var/folders/pn/3sv25zqj1g55w5q8mj13qrq40000gp/T/ipykernel_41805/558582152.py(34)construct_exec_time_diagram()
     32                 exec_times[
     33                     f"{node_name}_{exec_obj.task_executions[0].id.task_id.name}"
---> 34                 ] = exec_obj.task_executions[0].closure.duration.seconds
     35                 for (
     36                     subnode_name,
So I changed this bit under this comment like this:
Copy code
# If dynamic task below will get dynamic task execution time not underlying task
                # THIS IS THE ONLY PLACE WE SHOULD BE PUTTING EXECUTION TIME SINCE IT IS THE TASK EXECUTION TIME THAT MATTERS
                if exec_obj.task_executions:
                    exec_times[
                        f"{node_name}_{exec_obj.task_executions[0].id.task_id.name}"
                    ] = exec_obj.task_executions[0].closure.duration.seconds
                for (
...
but now I realize that when I run it, I am getting a strange result that looks like this:
Copy code
{'agtr6558tfzk4xjkst5c_n0': {'agtr6558tfzk4xjkst5c_n0_flyte_workflows.flyte_workflows.workflows.paginate_filesystem.test_dynamic_workflow.fill_filesystem': 20},
 'agtr6558tfzk4xjkst5c_n2': {'agtr6558tfzk4xjkst5c_n2_flyte_workflows.flyte_workflows.workflows.paginate_filesystem.test_dynamic_workflow.get_length_of_list': 13},
 'agtr6558tfzk4xjkst5c_n1': {'agtr6558tfzk4xjkst5c_n1_flyte_workflows.flyte_workflows.workflows.paginate_filesystem.test_dynamic_workflow.paginate_bucket': 17},
 'agtr6558tfzk4xjkst5c_n3': {'agtr6558tfzk4xjkst5c_n3_n3-0-start-node': {},
  'agtr6558tfzk4xjkst5c_n3_n3-0-n0': {},
  'agtr6558tfzk4xjkst5c_n3_n3-0-end-node': {}}}
with these empty tasks at the end. I'm wondering if it's because of me changing that line, but either way, when I've been trying a different method to get at the subworkflow executions, I still can't find task executions in the subworkflow executions I'm looking at.
j
yeah it did took some trial and err, did you know what type of node is yours so there is a weird case where the start node is not actually named start node rather named by execution id
c
in that case the node name is
agtr6558tfzk4xjkst5c_n3
looks like it should be the
n3
node for the task
Maybe that could be the start node? The start node seems named. The other issue behind this is that, there are actually other tasks here that I was able to unearth by looking at the network calls in the Flyte UI.
So there should be a bunch of other tasks, like
n3-0-n0-0-n0-n1
, but I can't get them from the remote
j
Are those tasks subtasks or sub workflows This looks like a 4 nested dynamic task ๐Ÿค”
I wonder if we need another case that will request another node sync call
c
there I think are both subtasks and subworkflows, but when I call
NodeExecution.subworkflow_node_executions
, the results are
NodeExecution
, not
WorkflowExecution
, so I can't
remote.sync_execution
them
also since they're
NodeExecution
s they don't have nodes themselves.
j
Do you have a screenshot of the expanded execution so we can try to map what should correspond to what
c
I do, but more interesting might be the network trace of the Flyte console. When I look at that, there are a bunch of calls to what are the expanded steps in the dynamic workflow, but I'm not sure where those could be coming from.
I'm trying to get both of these
j
I see, yeah I think you should look for subworklfow or workflow executions on those nodes to see if they are populated you could infer from the web ui to know where the dynamic task should have subworklfow or workflow associated with it
c
I did check this and didn't find anything for those nodes.
the node would be
n3
j
That could be a bug then I think you can manually inspect the object ๐Ÿ˜… otherwise Iโ€™m kinda stumped
c
sorry this took so long, here is the fully unfurled task from the
nodes
page in the Flyte UI:
image.png
and here is as far as I get from subworkflow_executions
image.png
but looking at the network tab when I inspect this workflow in the flyte UI, it looks for this
<http://localhost:30080/api/v1/data/node_executions/flytesnacks/development/al7jsw45lf4q5kpkkvgs/n3-0-n0-0-n0-n1>
image.png
which has a
dynamic_workflows
attribute
image.png
but I have no idea how to find this
n3-0-n0-0-n0-n1
node in the remote
I've figured it out, but I think the only way to access this will be via the API (unless maybe there's some work to be done in the pyflyte client)
Using the API, here's a little client that gets what I was looking for (at least the start of it):
Copy code
import requests as rq
from typing import Optional
from urllib.parse import urljoin
class FlyteAPIClient:
    def __init__(self, base: str = "<http://localhost:30080/api/v1/>"):
        self.base = base

    def request(self, method: str, endpoint: str):
        url = urljoin(self.base, endpoint)
        return rq.request(method, url).json()

    def get(self, endpoint):
        return self.request("get", endpoint)

    def get_subnode_executions(self, execution_id: str, parent_node: str, project: str="flytesnacks", environment: str="development", depth=1):
        nodes = []
        req_url = f"node_executions/{project}/{environment}/{execution_id}?uniqueParentId={parent_node['id']['node_id']}&limit=10000"
        res = self.get(req_url)
        for ele in res["node_executions"]:
            nodes.append((depth, ele))
            if ele.get("metadata", {}).get("is_parent_node"):
                nodes += self.get_subnode_executions(execution_id=execution_id, parent_node=ele, depth=depth+1)
        return nodes
        

    def get_node_executions(self, execution_id: str, project: str="flytesnacks", environment: str="development"):
        nodes = []
        res = self.get(f"node_executions/{project}/{environment}/{execution_id}?limit=1000")
        for ele in res["node_executions"]:
            nodes.append((0, ele))
            if ele.get("metadata", {}).get("is_parent_node"):
                nodes += self.get_subnode_executions(execution_id=execution_id, parent_node=ele)
        return nodes
        
flyte_client = FlyteAPIClient()
node_executions = flyte_client.get_node_executions("al7jsw45lf4q5kpkkvgs")
and here's an ouput from this (it's a text snippet since it's too long)
output
I was also able to eventually get a kind of, visualization. It's not quite what I would like but it's pretty close:
image.png
k
Ohh you can use the get data endpoint
And given the parent node id it will Work
c
It doesnโ€™t quite work
Well, it does eventually, you have to specify a parent node, like you said
Thatโ€™s what I did in the client above. I still want to figure out how to get task names back out, or how to do this with flyte remote
k
Flyteremote has a client handle that has the get data method
c
Yeah but it doesnโ€™t seem to have a way to get the dynamic workflow tasks out
j
Hmm didnโ€™t know about that client but glad it sort of worked out looks awesome That task expansion looks crazy complex ๐Ÿ˜…
k
The Ui uses the same api
c
That's good to know
the API also has the node startup times and durations so you can get execution times from it as well, to get more granular execution times maybe