Hi, was trying distributed training using ray in f...
# ray-integration
p
Hi, was trying distributed training using ray in flyte. I am getting this error while running.
Copy code
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
import ray
from ray import tune

#ray.init()
#ray.init("auto", ignore_reinit_error=True)

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
)

num_actors = 4
num_cpus_per_actor = 1

ray_params = RayParams(
    num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)


def train_model(config):
    train_x, train_y = load_breast_cancer(return_X_y=True)
    train_set = RayDMatrix(train_x, train_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(train_set, "train")],
        verbose_eval=False,
        ray_params=ray_params)
    bst.save_model("model.xgb")



@task(task_config=ray_config, limits=Resources(mem="2000Mi", cpu="1"))
def train_model_task() -> dict:
    config = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9)
    }


    analysis = tune.run(
        train_model,
        config=config,
        metric="train-error",
        mode="min",
        num_samples=4,
        resources_per_trial=ray_params.get_tune_resources())
    return analysis.best_config

@workflow
def train_model_wf() -> dict:
    return train_model_task()
k
Running out of disk
Request more pleas
s
@task(task_config=ray_config, limits=Resources(mem="2000Mi", cpu="1", ephemeral_storage="500Mi"))
p
Copy code
from sklearn.datasets import load_breast_cancer
from flytekit import Resources, task, workflow
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
import ray
from ray import tune

#ray.shutdown()
#ray.init()
#ray.init("auto", ignore_reinit_error=True)

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=3)],
)

num_actors = 2
num_cpus_per_actor = 1

ray_params = RayParams(
    num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)


def train_model(config):
    train_x, train_y = load_breast_cancer(return_X_y=True)
    train_set = RayDMatrix(train_x, train_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(train_set, "train")],
        verbose_eval=False,
        ray_params=ray_params)
    bst.save_model("model.xgb")



#@task(limits=Resources(mem="2000Mi", cpu="1"))
@task(task_config=ray_config, limits=Resources(mem="3000Mi", cpu="1", ephemeral_storage="3000Mi"))
def train_model_task() -> dict:
    config = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9)
    }

    analysis = tune.run(
        train_model,
        config=config,
        metric="train-error",
        mode="min",
        num_samples=4,
        max_concurrent_trials=1,
        resources_per_trial=ray_params.get_tune_resources())
    return analysis.best_config

@workflow
def train_model_wf() -> dict:
    return train_model_task()
Still getting this error when we specify
ephemeral_storage
value also. do u have any suggested limit for cpu and memory
s
If you’re using demo cluster, I think 1Gi is the limit.
p
i am trying it on EKS cluster
Can you check yours? Please increasing the mem. I believe
kubectl -n flyte edit cm flyte-admin-base-config
is the command but I’m not very sure. Let me know if this doesn’t work.
p
message has been deleted
s
Nice. Please increase your mem and try again.
p
I increased the memory in task. the execution is getting queued but it is in pending state for long time. Even in remote run, the workflow is running for more than 4h for 4 trials but the execution is not happening.
s
Have you seen the message saying you asked for 3 cpu and 0 gpu but the cluster has 2 cpu and 0 gpu?
p
yes but i have requested for only 1 cpu. should i change anywhere else?
Copy code
@task(task_config=ray_config, limits=Resources(mem="5000Mi", cpu="1", ephemeral_storage="3000Mi"))
s
I think it’s because of
get_tune_resources()
.
I’m assuming you’re training an xgboost model.
p
Copy code
ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=3)],
)
do we have any ways to specify the number of cpus in the ray cluster config? like this ?
Copy code
ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=3)],
num_cpus=4,
)
bcause as mentioned above we have 64 cpus in eks cluster. but it shows this warning that we have only 2 cpus in ray cluster. how to increase the cpu limit in ray cluster config?
s
I believe you you can set them in
RayParams
p
yeah but in RayParams we could specify the number of cpus that has to be utilized for each trial
cpus_per_actor
. Is there any config to be changed to increase the cpu of the ray cluster as a whole? bcause when I increased the cpus_per_actor also the requested cpu is still 2 and shows the warning that it has only 2 cpu in the cluster.
s
@Kevin Su, any idea how we can set the ray cluster resources? As per the docs, it should be possible with
init()
, but in this case, since Flyte initializes the cluster, how can a user modify those values?
k
p
Copy code
@task(task_config=ray_config, requests=Resources(mem="5000Mi", cpu="5", ephemeral_storage="1000Mi"), limits=Resources(mem="7000Mi", cpu="9", ephemeral_storage="2000Mi"))
I have requested for 5 cpus but when it executes it shows requested cpus as 2 only.
and show same warning too that we have only 2 cpu in the cluster.
s
I’m wondering where it’s picking “you asked for 9.0 cpu” from. Is it from your
limits
?
p
I think it is based on the resource requested per trial. when i specified cpus_per_trial and num_actors as 2 and 4 it showed requested cpus as 9. when i decreased the resource requested and num actors as 2 and 1 it showed 3.
When the cpus_per_trial and num_actors are 1, the actual requested cpu is 2 and the execution is happening fine since we have sufficient 2 cpus in the cluster. when the num_actors are increased it requests for more cpus so the execution is not happening.
s
Um got it. We need to find a way to increase the cluster resources. Not sure why
requests
isn’t assigning the requested resources to the cluster.
p
yeah. kindly notify if there is any way to do so.
s
@Kevin Su, do you have any ideas?
k
@Padma Priya M Could you describe the RayJob (kubectl describe) and check if the resource is same as you specify in the @task. I guess the head node doesn’t use all the cpu in the pod. In other words, the cpu of head pod could be 10, but cpu of the head node process in the pod could be 2.
p
I have attached the allocated memory when we describe the node.
Copy code
@task(task_config=ray_config, requests=Resources(mem="5000Mi", cpu="5") , limits=Resources(mem="7000Mi", cpu="9"))
This is the requested resources.
k
sorry, could you describe the rayJob you are running?
p
is there any command for this
This is the shown when we describe the kuberay-operator while running.
k
kubectl describe RayJobs <name> -n <namespace>
113 Views