future-notebook-79388
11/04/2022, 1:10 PMvictorious-park-53030
11/04/2022, 3:19 PMhigh-accountant-32689
11/04/2022, 6:42 PMglamorous-carpet-83516
11/04/2022, 11:22 PMfuture-notebook-79388
11/07/2022, 5:10 AMimport ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest.optuna import OptunaSearch
from flytekit import Resources, task, workflow
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
from sklearn.linear_model import LogisticRegression
from sklearn import model_selection
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')
train_df.drop(['PassengerId', 'Name', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked'], axis='columns', inplace=True)
target = train_df.Survived
inputs = train_df.drop('Survived', axis='columns')
encoded_sex = pd.get_dummies(inputs.Sex)
inputs = pd.concat([inputs, encoded_sex], axis='columns')
inputs.drop('Sex', axis='columns', inplace=True)
inputs.columns[inputs.isna().any()]
inputs.Age = inputs.Age.fillna(inputs.Age.mean())
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
runtime_env={"pip": ["numpy", "pandas"]},
)
@ray.remote
def objective(model_params: dict):
X_train, X_test, y_train, y_test = train_test_split(inputs, target, test_size=0.469)
classifier_obj = LogisticRegression(**model_params)
score = model_selection.cross_val_score(classifier_obj, X_test, y_test, n_jobs=-1, cv=3)
accuracy = score.mean()
return accuracy
@task(task_config=ray_config, limits=Resources(mem="450Mi", cpu="1"))
def ray_task(n: int) -> int:
model_params = {
"C": tune.loguniform(1e-10, 1e10)
}
algo = OptunaSearch()
scheduler = AsyncHyperBandScheduler()
tune.run(
objective,
mode="max",
search_alg=algo,
scheduler=scheduler,
num_samples=10,
max_concurrent_trials=concurrent_trials,
config=model_params,
)
@workflow
def ray_optuna_wf(n: int) -> int:
return ray_task(n=n)
this is the code i am trying to execute. and registering the workflow with an image consists of all these dependencies.
pip install optuna
pip install "ray[tune]" optuna
pip install ray
pip install flytekitplugins-ray
high-accountant-32689
11/07/2022, 4:44 PMfuture-notebook-79388
11/09/2022, 5:09 AMfuture-notebook-79388
11/09/2022, 5:12 AMimport typing
import ray
from ray import tune
from flytekit import Resources, task, workflow
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
@ray.remote
def objective(config):
return (config["x"] * config["x"])
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
runtime_env={"pip": ["numpy", "pandas"]},
)
@task(task_config=ray_config, limits=Resources(mem="2000Mi", cpu="1"))
def ray_task(n: int) -> int:
model_params = {
"x": tune.randint(-10, 10)
}
tuner = tune.Tuner(
objective,
tune_config=tune.TuneConfig(
num_samples=10,
max_concurrent_trials=n,
),
param_space=model_params,
)
results = tuner.fit()
return results
@workflow
def ray_workflow(n: int) -> int:
return ray_task(n=n)
high-accountant-32689
11/10/2022, 6:54 PMtuner.fit()
.