Hi, I am getting error while executing ray with op...
# ask-the-community
p
Hi, I am getting error while executing ray with optuna in flyte.
s
cc @Eduardo Apolinario (eapolinario) / @Kevin Su
e
can you add more details? How are you running this? Maybe share the code you're running? If it's easier to collect all this information, please open a ticket: https://github.com/flyteorg/flyte/issues/new/choose
k
I guess you run into this issue, the pod is OOM, but flytekit doesn’t show enough info about OOM. Could you increase memory, and run it again? if not, just create a ticket, we’ll take a look
p
Copy code
import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest.optuna import OptunaSearch

from flytekit import Resources, task, workflow
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig

from sklearn.linear_model import LogisticRegression
from sklearn import model_selection
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd

train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')
train_df.drop(['PassengerId', 'Name', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked'], axis='columns', inplace=True)
target = train_df.Survived
inputs = train_df.drop('Survived', axis='columns')

encoded_sex = pd.get_dummies(inputs.Sex)
inputs = pd.concat([inputs, encoded_sex], axis='columns')
inputs.drop('Sex', axis='columns', inplace=True)


inputs.columns[inputs.isna().any()]
inputs.Age = inputs.Age.fillna(inputs.Age.mean())

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
    runtime_env={"pip": ["numpy", "pandas"]},
)

@ray.remote
def objective(model_params: dict):
    X_train, X_test, y_train, y_test = train_test_split(inputs, target, test_size=0.469)

    classifier_obj = LogisticRegression(**model_params)
     score = model_selection.cross_val_score(classifier_obj, X_test, y_test, n_jobs=-1, cv=3)
    accuracy = score.mean()
    return accuracy

@task(task_config=ray_config, limits=Resources(mem="450Mi", cpu="1"))
def ray_task(n: int) -> int:
    model_params = {
        "C": tune.loguniform(1e-10, 1e10)
    }
    algo = OptunaSearch()
    scheduler = AsyncHyperBandScheduler()
    tune.run(
        objective,
        mode="max",
        search_alg=algo,
        scheduler=scheduler,
        num_samples=10,
        max_concurrent_trials=concurrent_trials,
        config=model_params,
    )
    

@workflow
def ray_optuna_wf(n: int) -> int:
    return ray_task(n=n)
this is the code i am trying to execute. and registering the workflow with an image consists of all these dependencies.
Copy code
pip install optuna
pip install "ray[tune]" optuna
pip install ray
pip install flytekitplugins-ray
e
@Padma Priya M, thanks for sharing the code you're trying. Can you add more details about the environment you're using? A few questions: 1. Is this on a sandbox? 2. Are you on a Mac M1/M2? 3. How are you building the image? 4. How are you scheduling the execution?
p
I am working on AWS EKS setup. we are using dockerfile and pushing the image to dockerhub and the image has all the requirements that i mentioned above. Executing the script by registering the workflow using the image and then launch execution from UI.
now i have simplified my script for demo purpose. now i am facing this issue.
Copy code
import typing

import ray
from ray import tune
from flytekit import Resources, task, workflow
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig


@ray.remote
def objective(config):
    return (config["x"] * config["x"])


ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
    runtime_env={"pip": ["numpy", "pandas"]},
)


@task(task_config=ray_config, limits=Resources(mem="2000Mi", cpu="1"))
def ray_task(n: int) -> int:
    model_params = {
        "x": tune.randint(-10, 10)
    }

    tuner = tune.Tuner(
        objective,
        tune_config=tune.TuneConfig(
            num_samples=10,
            max_concurrent_trials=n,
        ),

        param_space=model_params,
    )
    results = tuner.fit()
    return results


@workflow
def ray_workflow(n: int) -> int:
    return ray_task(n=n)
e
@Padma Priya M, does this work locally? It doesn't seem like this is specific to flytekit as the error is coming from the call to
tuner.fit()
.
158 Views