Hello Flyte community I have a question regarding...
# ask-the-community
v
Hello Flyte community I have a question regarding programmatic workflow registration from python I successfully register my task and workflow, but when I run the workflow, it gives me
ModuleNotFoundError
saying my code’s module doesn’t exist. I found that when the workflow is executed the python files are not present inside the Kubernetes container. I tested this by catching the
kubectl get pod <pod-name -oyaml>
output of the task pod, editing its entrypoint to
sleep 99999
, then ran
kubectl exec -it <pod-name> bash
, and observed that the container starts in /root folder which is totally empty. I could not find the files anywhere else inside the container using tools like
find
and
grep
, it seems missing Should I use register_script instead? What is the difference? Code example attached as a comment in this thread
My code:
Copy code
from client.flyteclient import FlyteClient
from tasks.poc_flow import hello_world_wf, hello


if __name__ == "__main__":
    client = FlyteClient()

    print("Registering task hello...")
    client.register_task(hello)

    print("Registering workflow hello_world_wf...")
    client.register_workflow(hello_world_wf)

    print("Running workflow hello_world_wf...")
    client.run_workflow(hello_world_wf, inputs={"name": "World"})
Note that FlyteClient is a custom class that initializes the FlyteRemote and has functions to register and execute workflows, I moved that code into a class to workaround the empty module name issue. The class:
Copy code
import uuid
from flytekit.remote.entities import FlyteTask, FlyteWorkflow
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.configuration import Config, Image, ImageConfig ,SerializationSettings
from flytekit.core.launch_plan import LaunchPlan
from flytekit.remote import FlyteRemote

class FlyteClient:
    def __init__(self, **kwargs):
        self._project = kwargs.get('project', "flytesnacks")
        self._domain = kwargs.get('domain', "development")
        self._uid = kwargs.get('uid', str(uuid.uuid4())[:8])
        self._endpoint = kwargs.get('endpoint', <hard-coded endpoint default>)

        self._remote = FlyteRemote(
            config=Config.for_endpoint(endpoint=self._endpoint),
            default_project=self._project,
            default_domain=self._domain,
        )

        self._serialization_settings = SerializationSettings(
            project=self._project,
            domain=self._domain,
            image_config=ImageConfig(default_image=Image(name='python',fqn='<http://ghcr.io/flyteorg/flytekit|ghcr.io/flyteorg/flytekit>', tag='py3.9-1.6.2b0')),
        )

    def register_task(self, task: FlyteTask) -> FlyteTask:
        return self._remote.register_task(
            entity=task,
            serialization_settings=self._serialization_settings,
            version=self._uid
        )

    def register_workflow(self, workflow: FlyteWorkflow) -> FlyteWorkflow:
        return self._remote.register_workflow(
            entity=workflow,
            serialization_settings=self._serialization_settings,
            version=self._uid
        )

    def run_workflow(self, workflow: FlyteWorkflow, **kwargs) -> FlyteWorkflowExecution:
        inputs = kwargs.get('inputs', {})
        wait = kwargs.get('wait', True)
        wf_name = "".join(c for c in workflow.name if c.isalnum())
        return self._remote.execute(
            LaunchPlan.get_or_create(workflow=workflow),
            inputs=inputs,
            execution_name=f"{wf_name}-{self._uid}",
            wait=wait
        )
The task and workflow:
Copy code
from flytekit import task, workflow

@task
def hello(name: str) -> str:
    return f"Hello, {name}!"

@workflow
def hello_world_wf(name: str) -> str:
    return hello(name=name)
Directory structure: ./main.py <-- I run this ./tasks/poc-flow.py ./client/flyteclient.py I tried adding empty
__init__.py
files in the module (tasks & client) directories, didn’t help.
The original entrypoint given by flyte/the code:
Copy code
spec:
  affinity: {}
  containers:
  - args:
    - pyflyte-execute
    - --inputs
    - gs://<bucket>/metadata/propeller/flytesnacks-development-taskspocflowhelloworldwf-1dc316a4/n0/data/inputs.pb
    - --output-prefix
    - gs://<bucket>/metadata/propeller/flytesnacks-development-taskspocflowhelloworldwf-1dc316a4/n0/data/0
    - --raw-output-data-prefix
    - gs://<bucket>/h0/taskspocflowhelloworldwf-1dc316a4-n0-0
    - --checkpoint-path
    - gs://<bucket>/h0/taskspocflowhelloworldwf-1dc316a4-n0-0/_flytecheckpoints
    - --prev-checkpoint
    - '""'
    - --resolver
    - flytekit.core.python_auto_container.default_task_resolver
    - --
    - task-module
    - tasks.poc_flow
    - task-name
    - hello
Looking into the possibility that the pyflyte-execute command is the one downloading it from GCS bucket, which would make the test I did with kubectl exec into different entrypoint invalid. However it still can’t find my code
If raw-output-data-prefix should contain the code, then it’s not there, just the inputs. There is no
0
folder in there
s
Can you move
tasks
to another parent folder? The structure can be something like:
parent-folder/tasks/***.py
. Please also add
__init__.py
file to the tasks folder.
v
Didn’t help, but I thought I’ll have more luck using register_script instead. Had the same issue for a while, until I got the input args right: (in my custom FlyteClient class)
Copy code
def register_script(self, workflow: FlyteWorkflow) -> FlyteWorkflow:
        return self._remote.register_script(
            entity=workflow,
            version=self._uid,
            # module_name=workflow.name,
            source_path=os.path.dirname(__main__.__file__),
            copy_all=True
        )
New flat directory structure, just for testing:
Copy code
./main.py
./flyteclient.py
./pocflow.py
This copies the entire folder and subfolders so its my responsibility to ensure it doesn’t copy extra stuff it doesn’t need, but I’ll take care of that. This works well Next I’ll move the task definitoons to their own file. Thanks
s
Okay. But the other two methods have to work as well. Not sure why that's erroring out. cc @Kevin Su @Eduardo Apolinario (eapolinario)
151 Views