Excited about the gate node capability, however I ...
# ask-the-community
h
Excited about the gate node capability, however I think I found a bug in
remote.wait
when it comes to workflows with these nodes present. Calling
remote.wait
results in the following:
AttributeError: 'FlyteGateNode' object has no attribute 'interface'
. It looks like the underlying
sync_node_execution
here doesn't explicitly handle the gate node case, leading to the assumption that it's a normal node (and thus the error above). Full error & workaround in 🧵
Here's the full stack trace, if that's useful for anyone with the same issue:
Copy code
tests/e2e/test_regression.py:106: in _await_flyte_result
    execution = remote.wait(execution, timeout=REMOTE_FLYTE_WORKFLOW_TIMEOUT)
/usr/mambaforge/envs/sci-ops-dev/lib/python3.10/site-packages/flytekit/remote/remote.py:1488: in wait
    execution = self.sync_execution(execution, sync_nodes=sync_nodes)
/usr/mambaforge/envs/sci-ops-dev/lib/python3.10/site-packages/flytekit/remote/remote.py:1583: in sync_execution
    node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping)  # noqa
/usr/mambaforge/envs/sci-ops-dev/lib/python3.10/site-packages/flytekit/remote/remote.py:1699: in sync_node_execution
    execution._underlying_node_executions = [
/usr/mambaforge/envs/sci-ops-dev/lib/python3.10/site-packages/flytekit/remote/remote.py:1700: in <listcomp>
    self.sync_node_execution(FlyteNodeExecution.promote_from_model(cne), sub_node_mapping)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <flytekit.remote.remote.FlyteRemote object at 0x7fe5ec2963b0>
execution = <FlyteLiteral id { node_id: "n20-0-n0" execution_id { project: "sci-ops" domain: "development" name: "fc06d4dc3c128421...nos: 979750000 } updated_at { seconds: 1691751624 nanos: 75072000 } } metadata { retry_group: "0" spec_node_id: "n0" }>
node_mapping = {'n0': Node(ID: n0), 'n1': Node(ID: n1), 'n2': Node(ID: n2)}
    def sync_node_execution(
        self,
        execution: FlyteNodeExecution,
        node_mapping: typing.Dict[str, FlyteNode],
    ) -> FlyteNodeExecution:
        """
        Get data backing a node execution. These FlyteNodeExecution objects should've come from Admin with the model
        fields already populated correctly. For purposes of the remote experience, we'd like to supplement the object
        with some additional fields:
          - inputs/outputs
          - task/workflow executions, and/or underlying node executions in the case of parent nodes
          - TypedInterface (remote wrapper type)
        A node can have several different types of executions behind it. That is, the node could've run (perhaps
        multiple times because of retries):
          - A task
          - A static subworkflow
          - A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
          - A launch plan
        The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is
        encapsulated in this function.
        """
        # For single task execution - the metadata spec node id is missing. In these cases, revert to regular node id
        node_id = execution.metadata.spec_node_id
        # This case supports single-task execution compiled workflows.
        if node_id and node_id not in node_mapping and execution.id.node_id in node_mapping:
            node_id = execution.id.node_id
            remote_logger.debug(
                f"Using node execution ID {node_id} instead of spec node id "
                f"{execution.metadata.spec_node_id}, single-task execution likely."
            )
        # This case supports single-task execution compiled workflows with older versions of admin/propeller
        if not node_id:
            node_id = execution.id.node_id
            remote_logger.debug(f"No metadata spec_node_id found, using {node_id}")
        # First see if it's a dummy node, if it is, we just skip it.
        if constants.START_NODE_ID in node_id or constants.END_NODE_ID in node_id:
            return execution
        # Look for the Node object in the mapping supplied
        if node_id in node_mapping:
            execution._node = node_mapping[node_id]
        else:
            raise Exception(f"Missing node from mapping: {node_id}")
        # Get the node execution data
        node_execution_get_data_response = self.client.get_node_execution_data(execution.id)
        # Calling a launch plan directly case
        # If a node ran a launch plan directly (i.e. not through a dynamic task or anything) then
        # the closure should have a workflow_node_metadata populated with the launched execution id.
        # The parent node flag should not be populated here
        # This is the simplest case
        if not execution.metadata.is_parent_node and execution.closure.workflow_node_metadata:
            launched_exec_id = execution.closure.workflow_node_metadata.execution_id
            # This is a recursive call, basically going through the same process that brought us here in the first
            # place, but on the launched execution.
            launched_exec = self.fetch_execution(
                project=launched_exec_id.project, domain=launched_exec_id.domain, name=launched_exec_id.name
            )
            self.sync_execution(launched_exec)
            if launched_exec.is_done:
                # The synced underlying execution should've had these populated.
                execution._inputs = launched_exec.inputs
                execution._outputs = launched_exec.outputs
            execution._workflow_executions.append(launched_exec)
            execution._interface = launched_exec._flyte_workflow.interface
            return execution
        # If a node ran a static subworkflow or a dynamic subworkflow then the parent flag will be set.
        if execution.metadata.is_parent_node:
            # We'll need to query child node executions regardless since this is a parent node
            child_node_executions = iterate_node_executions(
                self.client,
                workflow_execution_identifier=execution.id.execution_id,
                unique_parent_id=execution.id.node_id,
            )
            child_node_executions = [x for x in child_node_executions]
            # If this was a dynamic task, then there should be a CompiledWorkflowClosure inside the
            # NodeExecutionGetDataResponse
            if node_execution_get_data_response.dynamic_workflow is not None:
                compiled_wf = node_execution_get_data_response.dynamic_workflow.compiled_workflow
                node_launch_plans = {}
                # TODO: Inspect branch nodes for launch plans
                for node in FlyteWorkflow.get_non_system_nodes(compiled_wf.primary.template.nodes):
                    if (
                        node.workflow_node is not None
                        and node.workflow_node.launchplan_ref is not None
                        and node.workflow_node.launchplan_ref not in node_launch_plans
                    ):
                        node_launch_plans[node.workflow_node.launchplan_ref] = self.client.get_launch_plan(
                            node.workflow_node.launchplan_ref
                        ).spec
                dynamic_flyte_wf = FlyteWorkflow.promote_from_closure(compiled_wf, node_launch_plans)
                execution._underlying_node_executions = [
                    self.sync_node_execution(FlyteNodeExecution.promote_from_model(cne), dynamic_flyte_wf._node_map)
                    for cne in child_node_executions
                ]
                execution._task_executions = [
                    node_exes.task_executions for node_exes in execution.subworkflow_node_executions.values()
                ]
                execution._interface = dynamic_flyte_wf.interface
            # Handle the case where it's a static subworkflow
            elif isinstance(execution._node.flyte_entity, FlyteWorkflow):
                sub_flyte_workflow = execution._node.flyte_entity
                sub_node_mapping = {n.id: n for n in sub_flyte_workflow.flyte_nodes}
                execution._underlying_node_executions = [
                    self.sync_node_execution(FlyteNodeExecution.promote_from_model(cne), sub_node_mapping)
                    for cne in child_node_executions
                ]
                execution._interface = sub_flyte_workflow.interface
            # Handle the case where it's a branch node
            elif execution._node.branch_node is not None:
                <http://remote_logger.info|remote_logger.info>(
                    "Skipping branch node execution for now - branch nodes will "
                    "not have inputs and outputs filled in"
                )
                return execution
            else:
                remote_logger.error(f"NE {execution} undeterminable, {type(execution._node)}, {execution._node}")
                raise Exception(f"Node execution undeterminable, entity has type {type(execution._node)}")
        # This is the plain ol' task execution case
        else:
            execution._task_executions = [
                self.sync_task_execution(
                    FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task
                )
                for t in iterate_task_executions(self.client, execution.id)
            ]
>           execution._interface = execution._node.flyte_entity.interface
E           AttributeError: 'FlyteGateNode' object has no attribute 'interface'
/usr/mambaforge/envs/sci-ops-dev/lib/python3.10/site-packages/flytekit/remote/remote.py:1724: AttributeError
As a workaround, I just wrote my own simple wait function which periodically polls
execution.is_done
.
k
Cc @Yee thank you, can you file a big report please
y
i think that’s the issue
h
Thanks for filing the ticket @Yee. Not blocking for us, just wanted to surface 🙂
y
we’ll get it slated soon