hey team, when i run a flyte with ray integration ...
# ray-integration
a
hey team, when i run a flyte with ray integration the pods tend to be in pending state and in the flyte console the task keeps on running

https://flyte-org.slack.com/files/U05RR32SN00/F05RNV5KE4D/screenshot_2023-09-11_at_2.11.01_pm.png

this is what the log show of pending pods
Defaulted container "ray-worker" out of: ray-worker, init-myservice (init)
s
Can you share the code that's working for you and the code that isn't?
a
working
Copy code
import pandas as pd
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression

import flytekit.extras.sklearn
from flytekit import task, workflow, dynamic


@task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame

@task
def process_data(data: pd.DataFrame) -> pd.DataFrame:
    """Simplify the task from a 3-class to a binary classification problem."""
    return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))

@task
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)

@workflow
def training_workflow(hyperparameters: dict) -> LogisticRegression:
    """Put all of the steps together into a single workflow."""
    data = get_data()
    processed_data = process_data(data=data)
    return train_model(
        data=processed_data,
        hyperparameters=hyperparameters,
    )
Not working
Copy code
import typing

from flytekit import ImageSpec, Resources, task, workflow

custom_image = ImageSpec(
    name="ray-flyte-plugin",
    registry="anirudh1905",
    packages=["flytekitplugins-ray"],
)

if custom_image.is_container():
    import ray
    from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig

@ray.remote
def f1(x):
    return x * x

@ray.remote
def f2(x):
    return x%2

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
    runtime_env={"pip": ["numpy", "pandas"]},  # or runtime_env="./requirements.txt"
)

@task(cache=True, cache_version="0.2",
    task_config=ray_config,
    requests=Resources(mem="2Gi", cpu="1"),
    container_image=custom_image,
)
def ray_task(n: int) -> int:
    futures = [f2.remote(f1.remote(i)) for i in range(n)]
    return sum(ray.get(futures))


@workflow
def ray_workflow(n: int) -> int:
    return ray_task(n=n)
s
Can you share the working Ray code?
a
this code was working earlier with n=10 but was failing at n=120
but now failing for both
s
Can you paste the exact error you're seeing?
a
Warning  FailedScheduling  97s   default-scheduler  0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.
s
@Kevin Su, for some reason, the Ray code isn't working but a simple workflow is. Anirudh requested for 1 CPU in the Ray task, but it is still failing. Any idea why?
k
This says he does not have capacity/ one cpu is not available
s
But no executions are running on the demo cluster and 5 CPUs have been assigned in the Docker config.
@Anirudh Sridhar, can you set replicas to 1?
a
same issue
s
I believe you're seeing the error because of insufficient resources only. Try reducing the value of
n
as well. Set it to 1.
a
i dont think so becoz earlier it was working
s
Why don't you give it a try and see if
n=1
is working?
a
all pods running but not getting output
s
Have you checked the pod's status?
a
Yes they are running
s
Have you checked the logs?
k
which kuberay version you’re using?
a
0.5.2