Hi, I am trying to use kuberay operator and ray cl...
# ask-the-community
k
Hi, I am trying to use kuberay operator and ray cluster with in the Flyte by following this documentation. https://blog.flyte.org/ray-and-flyte#heading-launch-a-ray-cluster I have deployed the kuberay-operator and ray-cluster by following the helm chart docs given here https://github.com/ray-project/kuberay/tree/master/helm-chart. The pods are up and running. I have also enabled the ray plugin in the flyte. When running the pyflyte remotely, I am getting this below error.(I tried deploying the ray pods both in separate namespace as well as in the namespace where flyte pods are running). Need help here on this error.
Copy code
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "failed to create workflow in propeller json: error calling MarshalJSON for type *v1alpha1.Inputs: Marshal called with nil"
        debug_error_string = "{"created":"@1666318439.715331527","description":"Error received from peer ipv4:35.172.9.181:443","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"failed to create workflow in propeller json: error calling MarshalJSON for type *v1alpha1.Inputs: Marshal called with nil","grpc_status":13}"
d
Hi @karthikraj this doesn't seem to be a Ray specific error. Rather, this indicates that FlyteAdmin is unable to create the FlyteWorkflow CRD due to the specified error. Can you provide the workflow definiton and the pyflyte command you are using? It seems for some reason there is an error in marshalling input values.
k
Thank you. Below is the code and command. This code is referred from the blog that i have pinged above. Code:
Copy code
import ray
import os
os.system("pip3 install flytekitplugins-ray")
from typing import *
import typing
from flytekit import task
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig

@ray.remote
def f(x):
    return x * x


@task(task_config=RayJobConfig(head_node_config=HeadNodeConfig(ray_start_params={"block": "true"}),worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=5, min_replicas=2, max_replicas=10)],runtime_env={"pip": ["numpy", "pandas"]}, ))
def ray_task() -> typing.List[int]:
    futures = [f.remote(i) for i in range(10)]
    return ray.get(futures)
Command:
Copy code
pyflyte run --remote kuberay_test.py ray_task
d
Can you try defining a workflow that calls the
ray_task
task? I do not believe
pyflyte run
supports executing tasks. You should be able to do something as simple as:
Copy code
@workflow
def ray_wf():
  ray_task()
and then attempt to execute the workflow with
pyflyte run --remote kuberay_test.py ray_wf
k
It still gives the same error 😞
s
What’s your flytekit version, @karthikraj?
Also, tasks can be executed with
pyflyte run
.
k
My flytekit version is: 1.1.1
s
Can you upgrade to 1.2.1 and let me know if the same error’s cropping up?
k
Yeah, I am getting same error still...
s
After I added Works for me after I added an input to `ray_task`:
Copy code
import ray

import typing
from flytekit import task
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig


@ray.remote
def f(x):
    return x * x


@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(ray_start_params={"block": "True"}),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group", replicas=5, min_replicas=2, max_replicas=10
            )
        ],
        runtime_env={"pip": ["numpy", "pandas"]},
    )
)
def ray_task(n: int) -> typing.List[int]:
    futures = [f.remote(i) for i in range(n)]
    return ray.get(futures)
I’m not sure if we’re enforcing the usage of inputs; if so, we should improve the error message. This, however, seems like a bug. cc: @Eduardo Apolinario (eapolinario)
k
The workflow is able to be registered with the flyte. Thank you for the help.
157 Views