Hello guys I'm having a new error. I packaged a p...
# ask-the-community
f
Hello guys I'm having a new error. I packaged a project to Docker hub. And then used FlyteRemote to register the workflow to a remote Flyte cluster on AWD EKS -- adding the IMAGE_STR. However, when I fetch and execute the workflow, it raises a
ModuleNotFoundError
. What could I be doing wrong please?
s
Can you paste the exact error?
So you've used registration instead of fast registration right?
f
This is what I'm getting:
Copy code
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[fde22ae742e034d5c9b1-n0-0] terminated with exit code (1). Reason [Error]. Message: 
.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/venv/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/venv/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/venv/lib/python3.8/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/opt/venv/lib/python3.8/site-packages/flytekit/bin/entrypoint.py", line 476, in execute_task_cmd
    _execute_task(
  File "/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 160, in system_entry_point
    return wrapped(*args, **kwargs)
  File "/opt/venv/lib/python3.8/site-packages/flytekit/bin/entrypoint.py", line 348, in _execute_task
    _task_def = resolver_obj.load_task(loader_args=resolver_args)
  File "/opt/venv/lib/python3.8/site-packages/flytekit/core/python_auto_container.py", line 280, in load_task
    task_module = importlib.import_module(task_module)
  File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'flexs_app'
I used FlyteRemote and ran
register_workflow
method. Also, the script that runs FlyteRemote isn't in packaged along with the project that has the workflow. Could this cause an issue?
s
FlyteRemote code needn't be packaged. Could you share the directory structure?
f
Copy code
flyte-demo
├── Dockerfile      
├── LICENSE
├── README.md
├── flyte_remote.py   # the flyte remote script
├── requirements.txt
└── flexs_app         # app directory
    ├── __init__.py
    └── app_service.py
    └── tasks.py
    └── workflows.py
Look into the pod (kubectl describe), under Args, this is what I'm getting:
Copy code
Args:
      pyflyte-execute
      --inputs
      <s3://s3-bucket/metadata/propeller/flexs-app-development-f2eb08b12a2b84bb591e/n0/data/inputs.pb>
      --output-prefix
      <s3://s3-bucket/metadata/propeller/flexs-app-development-f2eb08b12a2b84bb591e/n0/data/0>
      --raw-output-data-prefix
      <s3://s3-bucket/data/h4/f2eb08b12a2b84bb591e-n0-0>
      --checkpoint-path
      <s3://s3-bucket/data/h4/f2eb08b12a2b84bb591e-n0-0/_flytecheckpoints>
      --prev-checkpoint
      ""
      --resolver
      flytekit.core.python_auto_container.default_task_resolver
      --
      task-module
      flexs_app.tasks
      task-name
      get_fitness
    State:      Terminated
      Reason:   Error
      Message:
Seems it's failing because of the task-module
s
Which module is present in the error? As per your logs, it's
my_project
. So what is it exactly with regard to your directory structure?
Can you also try doing a relative import?
from .your-module ...
f
That was a mistake, the missing package is
flexs_app
. I've corrected the error in the first code block in this thread. I am actually using relative import. But the error still persists.
s
Can you share your Dockerfile?
f
Copy code
FROM python:3.10-slim-buster
WORKDIR /code
Copy code
EXPOSE 8000
RUN pip install --upgrade pip
COPY ./requirements.txt ./
RUN pip install -r requirements.txt
Copy code
COPY . /code
When I run
docker run -it image_id bash
to check the content of the image, everything is there, along with the flexs_app directory.
d
@Fhuad Balogun can you pls share the flyte remote script you're using?
f
Copy code
"""Flyte Remote"""

from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, SerializationSettings, ImageConfig
from .workflows import train_wf
from typing import Dict
from flytekit.configuration import Config, PlatformConfig
import flytekit
import uuid


def get_get_version():
    """To avoid version conflicts, 
    we generate a unique version prefix for each run of this script.
    """
    _VERSION_PREFIX = "flexs-app_v" + uuid.uuid4().hex[:3]
    # logger.warning(f"Test version prefix is {_VERSION_PREFIX}")
    print(f"Flexs-app version prefix is {_VERSION_PREFIX}")

    def fn(suffix: str = "") -> str:
        return _VERSION_PREFIX + (f"_{suffix}" if suffix else "")

    return fn

get_version = get_get_version()

IMAGE_STR = "docker_user/repo_name:tag"

image_config = ImageConfig.auto(img_name=IMAGE_STR)

remote = FlyteRemote(config=Config(
    platform=PlatformConfig(
        endpoint= "localhost:8089",
        insecure=True,
        insecure_skip_verify=True,
    )
))
# remote = FlyteRemote(config=Config.auto())

def register_and_run_workflow(workflow_name: flytekit.core.workflow.WorkflowBase, inputs: Dict):
    version = get_version("1")
    serialization_settings = SerializationSettings(
        project="flexs-app",
        domain="development",
        version=version,
        image_config=image_config,
    )
    remote.register_workflow(workflow_name, serialization_settings)
    fetched_wf = remote.fetch_workflow(
        name=workflow_name.name,
        project="flexs-app",
        domain="development",
        version=version,
    )
    # print("fetched_wf", fetched_wf)
    remote.execute(
        entity=fetched_wf,
        inputs=inputs,
        project="flexs-app",
        domain="development",
        version=version,
        image_config=image_config,
    )
    return "done"


if __name__ == "__main__":
    inputs = {
        "items": [
        'FTLIE',
        'FTLIE',
        'FTLIE',
        'FTLIE',
        'FTLIE',
        ]
    }
    print(register_and_run_workflow(train_wf, inputs))
thank you
Hi @David Espejo (he/him) Any update, please?
d
Hi Fhuad, sorry but still unable to find a root cause for your issue Wondering if @Kevin Su / @Eduardo Apolinario (eapolinario) have any hints?
e
@Fhuad Balogun, can you also set the
PYTHONPATH
env var to
/code
in your dockerfile?
f
@Eduardo Apolinario (eapolinario) thank you. Now I'm getting this error:
flytekit.exceptions.user.FlyteAssertion: Failed to put data from /tmp/flyte-t0xpsb9t/sandbox/local_flytekit/engine_dir to <s3://flyte-bucket/metadata/propeller/flexs> (recursive=True)
. And it says to `Original exception: AWS CLI not found! Please install it with
pip install awscli
. Is it to be installed via dockerfile or in the cluster?
s
You can install it via your dockerfile.
f
This worked. Thank you very much guys. However, another issue arises. I am only running a simple python workflow, that comprises of 3 tasks that a processing simple python calculations. However, Flyte complains of
2 Insufficient cpu, 0/2 nodes are available: 2 No preemption victims found for incoming pod.
If I add limits to each task via the
Resources
module, it raises
OOMKilled
out-of-memory error. It seem to be requiring more resources than expected. Please what could be the issue? I am running a two-node cluster with 2 CPUs and 4GB RAM per node.
d
@Fhuad Balogun 2 CPUs could be not enough. Is it possible for you to allocate 4 CPUs to the node? https://docs.flyte.org/projects/cookbook/en/latest/index.html#running-workflows-in-a-flyte-cluster
f
Thank you. I could. I'm just wondering why it isn't enough for a minute Python function. Wanted to be sure I am not missing anything.
d
cc @Eduardo Apolinario (eapolinario)
e
@Fhuad Balogun, sorry, didn't quite catch the question about the python function. Can you give an example?
f
This is an example of a functions and tasks in the workflow:
Copy code
def get_scores(seqs: List[str]) -> np.ndarray:
    time.sleep(60)
    scores = np.random.rand(len(sequences))
    return scores
the task:
Copy code
@task
def get_fitness(seqs: List[str]) -> np.ndarray:
    fitnesses = get_scores(seqs=seqs)
    return fitnesses
Also, it might be helpful to note that
get_scores
is actually a method of a class. I took it out and make it a function in this example.
e
oh, I see now, @Fhuad Balogun, thanks for explaining. Without knowing the details of the Flyte deployment it's hard to debug, but from the error message you shared (insufficient gpu, etc), it looks like you're schedulign this on a cluster that doesn't have enough resources. Are you running this on flyte sandbox by any chance? We've seen cases of people running this on Mac M1s and just needing to allocate more memory to the Linux VM.
d
right @Eduardo Apolinario (eapolinario) what's probably not clear enough is why 2 CPUs are not sufficient to run a Task like the one @Fhuad Balogun is trying to execute before getting an`OOMKilled` ?. I know the node itself consumes ~1.5CPUs and according to K3s benchmarks, a single workload should consume only 10% of a core (per the error logs, I see it's a sandbox deployment) so in theory should be just enough
e
by default all tasks require 2 CPUs. Those can be customized on a per-task level or changed across a Flyte deployment. That said, we're changing the default task resources to require 0 cpus and 0 memory (meaning there will be no limit) in https://github.com/flyteorg/flyteadmin/pull/530, so it should be out in the next Flyte release. @Fhuad Balogun, OOMKilled is an indication that whatever memory limit you're setting is too low for that code to run. How big are the lists you're passing in?
f
4 items in the list, with about 300 string characters in each item.
s
@Eduardo Apolinario (eapolinario), does a large Docker image result in increased usage of memory in a pod? What factors contribute to an increased usage of mem besides code execution?
e
Large docker images shouldn't impact resources used at runtime. As for what contributes to memory usage, we have the python runtime and any imported modules might be allocating memory. Without the code it's hard to say. @Fhuad Balogun, you mentioned you set limits in your task. What limits you did you set?
f
I set a limit of cpu: 200m and mem: 150mi However, when I increased it to 200m, 500mi respectively, it seemed to work because I then got a different error (yet to be fixed though).
Could this error
RuntimeExecutionError: failed during plugin execution, caused by: failed to check existence of futures file: [User] Failed to do HEAD on futures file
with
Forbidden: Forbidden
status code: 403
still be as a result of insufficient resources? Because the metadata could be found on s3 but it complains of this? @Eduardo Apolinario (eapolinario)
e
@Fhuad Balogun, sorry for the delay. Answering your question, no, this should not be related to insufficient resources. This is a backend error. Can you double-check which versions of
flytekit
and potentially other plugins are you using? We had an issue some time ago where one of flytekit's dependencies (
fsspec
) released a new version which broke flytekit (and we fixed with https://github.com/flyteorg/flytekit/commit/be24c52f3fb8f7949db172f490cca95fa0f0e413).
158 Views