karthikraj
10/21/2022, 2:59 AMraise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "failed to create workflow in propeller json: error calling MarshalJSON for type *v1alpha1.Inputs: Marshal called with nil"
debug_error_string = "{"created":"@1666318439.715331527","description":"Error received from peer ipv4:35.172.9.181:443","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"failed to create workflow in propeller json: error calling MarshalJSON for type *v1alpha1.Inputs: Marshal called with nil","grpc_status":13}"
Dan Rammer (hamersaw)
10/21/2022, 3:07 AMkarthikraj
10/21/2022, 3:35 AMimport ray
import os
os.system("pip3 install flytekitplugins-ray")
from typing import *
import typing
from flytekit import task
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
@ray.remote
def f(x):
return x * x
@task(task_config=RayJobConfig(head_node_config=HeadNodeConfig(ray_start_params={"block": "true"}),worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=5, min_replicas=2, max_replicas=10)],runtime_env={"pip": ["numpy", "pandas"]}, ))
def ray_task() -> typing.List[int]:
futures = [f.remote(i) for i in range(10)]
return ray.get(futures)
Command:
pyflyte run --remote kuberay_test.py ray_task
Dan Rammer (hamersaw)
10/21/2022, 3:42 AMray_task
task? I do not believe pyflyte run
supports executing tasks. You should be able to do something as simple as:
@workflow
def ray_wf():
ray_task()
and then attempt to execute the workflow with pyflyte run --remote kuberay_test.py ray_wf
karthikraj
10/21/2022, 4:58 AMSamhita Alla
pyflyte run
.karthikraj
10/21/2022, 5:16 AMSamhita Alla
karthikraj
10/21/2022, 5:41 AMSamhita Alla
import ray
import typing
from flytekit import task
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
@ray.remote
def f(x):
return x * x
@task(
task_config=RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params={"block": "True"}),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group", replicas=5, min_replicas=2, max_replicas=10
)
],
runtime_env={"pip": ["numpy", "pandas"]},
)
)
def ray_task(n: int) -> typing.List[int]:
futures = [f.remote(i) for i in range(n)]
return ray.get(futures)
karthikraj
10/22/2022, 3:04 AM