Anirudh Sridhar
09/11/2023, 10:56 AMhttps://flyte-org.slack.com/files/U05RR32SN00/F05RNV5KE4D/screenshot_2023-09-11_at_2.11.01_pm.png▾
Defaulted container "ray-worker" out of: ray-worker, init-myservice (init)
Samhita Alla
Anirudh Sridhar
09/11/2023, 12:41 PMimport pandas as pd
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
import flytekit.extras.sklearn
from flytekit import task, workflow, dynamic
@task
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
@task
def process_data(data: pd.DataFrame) -> pd.DataFrame:
"""Simplify the task from a 3-class to a binary classification problem."""
return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
@task
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
"""Train a model on the wine dataset."""
features = data.drop("target", axis="columns")
target = data["target"]
return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)
@workflow
def training_workflow(hyperparameters: dict) -> LogisticRegression:
"""Put all of the steps together into a single workflow."""
data = get_data()
processed_data = process_data(data=data)
return train_model(
data=processed_data,
hyperparameters=hyperparameters,
)
Not working
import typing
from flytekit import ImageSpec, Resources, task, workflow
custom_image = ImageSpec(
name="ray-flyte-plugin",
registry="anirudh1905",
packages=["flytekitplugins-ray"],
)
if custom_image.is_container():
import ray
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
@ray.remote
def f1(x):
return x * x
@ray.remote
def f2(x):
return x%2
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
runtime_env={"pip": ["numpy", "pandas"]}, # or runtime_env="./requirements.txt"
)
@task(cache=True, cache_version="0.2",
task_config=ray_config,
requests=Resources(mem="2Gi", cpu="1"),
container_image=custom_image,
)
def ray_task(n: int) -> int:
futures = [f2.remote(f1.remote(i)) for i in range(n)]
return sum(ray.get(futures))
@workflow
def ray_workflow(n: int) -> int:
return ray_task(n=n)
Samhita Alla
Anirudh Sridhar
09/11/2023, 12:55 PMSamhita Alla
Anirudh Sridhar
09/11/2023, 12:56 PMWarning FailedScheduling 97s default-scheduler 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.
Samhita Alla
Ketan (kumare3)
Samhita Alla
Anirudh Sridhar
09/11/2023, 1:28 PMSamhita Alla
n
as well. Set it to 1.Anirudh Sridhar
09/11/2023, 1:29 PMSamhita Alla
n=1
is working?Anirudh Sridhar
09/11/2023, 1:31 PMSamhita Alla
Anirudh Sridhar
09/11/2023, 1:39 PMSamhita Alla
Kevin Su
09/11/2023, 2:40 PMAnirudh Sridhar
09/11/2023, 3:19 PM