Hi :wave: I’m setting up the integration with Ray,...
# flyte-support
s
Hi 👋 I’m setting up the integration with Ray, and it seems to work nicely when creating a fresh RayCluster (using
@task(task_config=RayJobConfig(worker_node_config=[WorkerNodeConfig(…)]))
). I can see the cluster starting, the job getting scheduled and distributed, and completing successfully. I’m having trouble with using an existing RayCluster (in the same cluster) though. What is the correct approach for that? From the docs here I read that I should be able to use
@task(task_config=RayJobConfig(address="<RAY_CLUSTER_ADDRESS>"))
. However when trying that it seems
worker_node_config
is a required argument. I tried using an empty list instead:
Copy code
@task(
    container_image=...,
    task_config=RayJobConfig(
        worker_node_config=[],  # No need to create a Ray cluster but argument is required, maybe just setting to empty list helps?
        address="<http://kuberay-cluster-head-svc.kuberay.svc.cluster.local:8265>",
        runtime_env=...
    ),
)
But then it still tries to start a new RayCluster instead of using the existing one found at `address`:
Copy code
❯ k get rayclusters.ray.io -A
NAMESPACE             NAME                                         DESIRED WORKERS   AVAILABLE WORKERS   CPUS   MEMORY   GPUS   STATUS   AGE
<flyte-project-<flyte-domain>   ahvfr924w8k2vgvf97wp-n0-0-raycluster-crb9z                                         100m   500Mi    0      ready    2m25s
kuberay               kuberay-cluster                              1                 1                   2      3G       0      ready    3h37m
...
The address works fine if I just run:
Copy code
k run kuberay-test --rm --tty -i --restart='Never' --image ... --command -- ray job submit --address <http://kuberay-cluster-head-svc.kuberay.svc.cluster.local:8265> -- python -c "import ray; ray.init(); print(ray.cluster_resources())"
It looks like the
worker_node_config
has been required since the initial commit? Not sure how the docs example has ever worked.
I guess we can just submit through the Ray Python SDK directly. This seems to work:
Copy code
@task(
    container_image=<RAY_IMAGE>,
)
def ray_task_job_submit(n: int) -> typing.List[int]:
    ray.init(address="<ray://kuberay-cluster-head-svc.kuberay.svc.cluster.local:10001>")
    futures = [f.remote(i) for i in range(n)]
    return ray.get(futures)
c
Interesting. I’m using Ray myself and will likely need to fix some issues with the plugin so I might be able to take a look at this if you file an issue.
a
this is odd, I don't seem to find logic to handle an empty
worker_node_config
list and avoiding creating a new cluster. @shy-evening-51366 Please report this on an issue to track it
s
Morning 👋 sure thing. Workaround is very simple so it’s definitely not critical by the way.
Added it to an issue here, thanks for the fast response @clean-glass-36808 @average-finland-92144!