acoustic-carpenter-78188
05/28/2024, 5:47 PMremote.wait(ex)
on an execution that contains an ArrayNode
errors out:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:1891: in wait
execution = self.sync_execution(execution, sync_nodes=sync_nodes)
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:1986: in sync_execution
node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping) # noqa
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/remote/remote.py:2116: in sync_node_execution
logger.error(f"NE {execution} undeterminable, {type(execution._node)}, {execution._node}")
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:54: in __str__
return self.verbose_string()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:71: in verbose_string
return self.short_string()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/common.py:63: in short_string
literal_str = re.sub(r"\s+", " ", str(self.to_flyte_idl())).strip()
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/core/workflow.py:569: in to_flyte_idl
array_node=self.array_node.to_flyte_idl() if self.array_node else None,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'int' object has no attribute 'to_flyte_idl'") raised in repr()] FlyteArrayNode object at 0x111393750>
def to_flyte_idl(self) -> _core_workflow.ArrayNode:
return _core_workflow.ArrayNode(
> node=self._node.to_flyte_idl() if self._node is not None else None,
parallelism=self._parallelism,
min_successes=self._min_successes,
min_success_ratio=self._min_success_ratio,
)
E AttributeError: 'int' object has no attribute 'to_flyte_idl'
../../../../miniconda3/envs/unionai/lib/python3.11/site-packages/flytekit/models/core/workflow.py:400: AttributeError
This should work.
Expected behavior
Users should be able to use FlyteRemote
to fetch/sync/wait on executions containing ArrayNodes.
Additional context to reproduce
Workflow file:
from functools import partial
from flytekit import task, map_task, workflow
@task
def fn(x: int, y: int) -> int:
return x + y
@workflow
def workflow_with_maptask(data: list[int], y: int) -> list[int]:
partial_fn = partial(fn, y=y)
return map_task(partial_fn)(x=data)
pytest test to reproduce:
import re
from datetime import timedelta
from subprocess import run
from time import sleep
from flytekit import WorkflowExecutionPhase
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
def test_workflow_with_maptask(workflows_dir):
"""Check simple hello world example.
1. Run map_tasks.py
2. Checks output is a list of [x + y, ...]
"""
result = run(
[
"unionai",
"--config",
"<path/to/config.yaml>",
"run",
"--remote",
"map_tasks.py",
"workflow_with_maptask",
"--data",
"[1, 2, 3, 4, 5]",
"--y",
"1",
],
cwd=workflows_dir, text=True, check=True, capture_output=True
)
match = re.search(r"executions/(\w+)", result.stdout)
execution_id = match.group(1)
remote = FlyteRemote(
config=Config.for_endpoint("dns:///demo.hosted.unionai.cloud"),
default_project="flytesnacks",
default_domain="development",
)
ex = remote.fetch_execution(name=execution_id)
ex = remote.wait(ex, poll_interval=timedelta(seconds=1)) # š error happens here
assert ex.closure.phase == WorkflowExecutionPhase.SUCCEEDED
assert ex.outputs["o0"] == [2, 3, 4, 5, 6]
Screenshots
No response
Are you sure this issue hasn't been raised already?
āļø Yes
Have you read the Code of Conduct?
āļø Yes
flyteorg/flyteacoustic-carpenter-78188
05/28/2024, 5:47 PM