cuddly-jelly-27016
02/07/2025, 10:09 PMfailed to compile workflow
and TaskReferenceNotFound
. Till version flytekit==1.11.0 everything was working fine. But when we have a new clean installation with flytekit==1.13.0
or higher versions then we get the following error:
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"failed to compile workflow for [resource_type:WORKFLOW project:\"playground\" domain:\"dev\" name:\"hello.hello_wf\" version:\"c2d8jj\"] with err failed to compile workflow with err Collected Errors: 1\n\tError 0: Code: TaskReferenceNotFound, Node Id: start-node, Description: Referenced Task [resource_type:TASK name:\"hello.hello_world\" version:\"c2d8jj\"] not found.\n", grpc_status:3, created_time:"2024-08-28T14:11:52.293396625+00:00"}"
Expected behavior
In expected behavior the workflow should be compiled and TaskReference should be found. The workflow should execute properly without any error.
Additional context to reproduce
Flytekit Issues
There is a bug in Flytekit SDK when registering a script.
Reproduce issue
Required software
• Docker
• flytectl for Sandbox
Steps-to-reproduce
• Start Flyte demo cluster on your host: flytectl demo start
.
• Create a clean Docker container and switch to it. It is being used to submit a workflow from which to submit a workflow. All commands are now being executed from inside the docker container: docker run -it --network="host" python:3.11 /bin/bash
.
• Install nano editor: apt update && apt install -y nano
• Install flytekit: pip install flytekite==1.13.4
.
• Create a directory in `/tmp`: mkdir /tmp/hello_world && cd /tmp/hello_world
• Create an example workflow touch hello_world.py
and edit the following content:
from flytekit import task, workflow
@task
def say_hello() -> str:
return "Hello, World!"
@workflow
def hello_world_wf() -> str:
res = say_hello()
return res
• Create a script from which to run the workflow `touch flyte_runner.py`:
"""Runs a Flyte workflow called directly from Python."""
import json
import logging
import os
import random
import string
import sys
from importlib import import_module
from pathlib import Path
from typing import Any, List
import keyring
from flytekit import WorkflowExecutionPhase
from flytekit.configuration import Config as FlyteConfig
from flytekit.configuration import ImageConfig, PlatformConfig
from flytekit.core.notification import Email
from flytekit.models.security import Identity, SecurityContext
from flytekit.remote.remote import FlyteRemote
from flytekit.tools.translator import Options
def _create_flyte_remote(project: str, domain: str, url: str) -> FlyteRemote:
"""Creates a flyte config file."""
return FlyteRemote(
config=FlyteConfig(platform=PlatformConfig(endpoint=url, insecure=True)),
default_domain=domain,
default_project=project,
)
def load_function(python_file: Path, function: str) -> Any:
"""Loads a function by name."""
module_path = python_file.parent.resolve()
module = os.path.basename(python_file).replace(".py", "")
sys.path.append(str(module_path))
module = import_module(module)
return getattr(module, function)
def run_workflow(
project: str,
domain: str,
url: str,
workflow_file: str,
workflow: str,
):
"""Runs a Flyte workflow."""
try:
options = Options()
workflow_file_path = Path(workflow_file)
flyte_remote = _create_flyte_remote(project=project, domain=domain, url=url)
entity_wf = load_function(workflow_file_path, workflow)
random_suffix = "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6)) # noqa: S311
execution_id = f"{workflow.replace('_', '-')}-{random_suffix}"
flyte_remote.register_script(
entity=entity_wf,
source_path=workflow_file_path.parent.resolve(),
options=options,
version=random_suffix,
copy_all=True,
)
flyte_remote.execute(
entity=entity_wf,
image_config=ImageConfig.auto_default_image(),
options=options,
version=random_suffix,
wait=False,
inputs=inputs,
execution_name=execution_id,
)
<http://logging.info|logging.info>(f"Flyte execution ID: {execution_id}")
except Exception as e:
logging.error(f"Error in Flyte Runner: {e}")
sys.exit(1)
if __name__ == "__main__":
run_workflow(
project="flytesnacks",
domain="development",
url="localhost:30080",
workflow_file="hello_world.py",
workflow="hello_world_wf",
)
• Now try to run submit the workflow: python flyte_runner.py
Screenshots
image
Are you sure this issue hasn't been raised already?
• Yes
Have you read the Code of Conduct?
• Yes
flyteorg/flytecuddly-jelly-27016
02/07/2025, 10:09 PM