kind-kite-58745
06/04/2023, 8:32 AMModuleNotFoundError
saying my code’s module doesn’t exist.
I found that when the workflow is executed the python files are not present inside the Kubernetes container. I tested this by catching the kubectl get pod <pod-name -oyaml>
output of the task pod, editing its entrypoint to sleep 99999
, then ran kubectl exec -it <pod-name> bash
, and observed that the container starts in /root folder which is totally empty. I could not find the files anywhere else inside the container using tools like find
and grep
, it seems missing
Should I use register_script instead? What is the difference?
Code example attached as a comment in this threadkind-kite-58745
06/04/2023, 8:34 AMfrom client.flyteclient import FlyteClient
from tasks.poc_flow import hello_world_wf, hello
if __name__ == "__main__":
client = FlyteClient()
print("Registering task hello...")
client.register_task(hello)
print("Registering workflow hello_world_wf...")
client.register_workflow(hello_world_wf)
print("Running workflow hello_world_wf...")
client.run_workflow(hello_world_wf, inputs={"name": "World"})
Note that FlyteClient is a custom class that initializes the FlyteRemote and has functions to register and execute workflows,
I moved that code into a class to workaround the empty module name issue.
The class:
import uuid
from flytekit.remote.entities import FlyteTask, FlyteWorkflow
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.configuration import Config, Image, ImageConfig ,SerializationSettings
from flytekit.core.launch_plan import LaunchPlan
from flytekit.remote import FlyteRemote
class FlyteClient:
def __init__(self, **kwargs):
self._project = kwargs.get('project', "flytesnacks")
self._domain = kwargs.get('domain', "development")
self._uid = kwargs.get('uid', str(uuid.uuid4())[:8])
self._endpoint = kwargs.get('endpoint', <hard-coded endpoint default>)
self._remote = FlyteRemote(
config=Config.for_endpoint(endpoint=self._endpoint),
default_project=self._project,
default_domain=self._domain,
)
self._serialization_settings = SerializationSettings(
project=self._project,
domain=self._domain,
image_config=ImageConfig(default_image=Image(name='python',fqn='<http://ghcr.io/flyteorg/flytekit|ghcr.io/flyteorg/flytekit>', tag='py3.9-1.6.2b0')),
)
def register_task(self, task: FlyteTask) -> FlyteTask:
return self._remote.register_task(
entity=task,
serialization_settings=self._serialization_settings,
version=self._uid
)
def register_workflow(self, workflow: FlyteWorkflow) -> FlyteWorkflow:
return self._remote.register_workflow(
entity=workflow,
serialization_settings=self._serialization_settings,
version=self._uid
)
def run_workflow(self, workflow: FlyteWorkflow, **kwargs) -> FlyteWorkflowExecution:
inputs = kwargs.get('inputs', {})
wait = kwargs.get('wait', True)
wf_name = "".join(c for c in workflow.name if c.isalnum())
return self._remote.execute(
LaunchPlan.get_or_create(workflow=workflow),
inputs=inputs,
execution_name=f"{wf_name}-{self._uid}",
wait=wait
)
The task and workflow:
from flytekit import task, workflow
@task
def hello(name: str) -> str:
return f"Hello, {name}!"
@workflow
def hello_world_wf(name: str) -> str:
return hello(name=name)
Directory structure:
./main.py <-- I run this
./tasks/poc-flow.py
./client/flyteclient.py
I tried adding empty __init__.py
files in the module (tasks & client) directories, didn’t help.kind-kite-58745
06/04/2023, 8:37 AMspec:
affinity: {}
containers:
- args:
- pyflyte-execute
- --inputs
- gs://<bucket>/metadata/propeller/flytesnacks-development-taskspocflowhelloworldwf-1dc316a4/n0/data/inputs.pb
- --output-prefix
- gs://<bucket>/metadata/propeller/flytesnacks-development-taskspocflowhelloworldwf-1dc316a4/n0/data/0
- --raw-output-data-prefix
- gs://<bucket>/h0/taskspocflowhelloworldwf-1dc316a4-n0-0
- --checkpoint-path
- gs://<bucket>/h0/taskspocflowhelloworldwf-1dc316a4-n0-0/_flytecheckpoints
- --prev-checkpoint
- '""'
- --resolver
- flytekit.core.python_auto_container.default_task_resolver
- --
- task-module
- tasks.poc_flow
- task-name
- hello
Looking into the possibility that the pyflyte-execute command is the one downloading it from GCS bucket, which would make the test I did with kubectl exec into different entrypoint invalid. However it still can’t find my codekind-kite-58745
06/04/2023, 8:52 AM0
folder in theretall-lock-23197
tasks
to another parent folder? The structure can be something like: parent-folder/tasks/***.py
. Please also add __init__.py
file to the tasks folder.kind-kite-58745
06/05/2023, 9:51 AMdef register_script(self, workflow: FlyteWorkflow) -> FlyteWorkflow:
return self._remote.register_script(
entity=workflow,
version=self._uid,
# module_name=workflow.name,
source_path=os.path.dirname(__main__.__file__),
copy_all=True
)
New flat directory structure, just for testing:
./main.py
./flyteclient.py
./pocflow.py
This copies the entire folder and subfolders so its my responsibility to ensure it doesn’t copy extra stuff it doesn’t need, but I’ll take care of that. This works well
Next I’ll move the task definitoons to their own file. Thankstall-lock-23197