delightful-lion-57360
10/21/2022, 2:59 AMraise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "failed to create workflow in propeller json: error calling MarshalJSON for type *v1alpha1.Inputs: Marshal called with nil"
debug_error_string = "{"created":"@1666318439.715331527","description":"Error received from peer ipv4:35.172.9.181:443","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"failed to create workflow in propeller json: error calling MarshalJSON for type *v1alpha1.Inputs: Marshal called with nil","grpc_status":13}"
hallowed-mouse-14616
10/21/2022, 3:07 AMdelightful-lion-57360
10/21/2022, 3:35 AMimport ray
import os
os.system("pip3 install flytekitplugins-ray")
from typing import *
import typing
from flytekit import task
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
@ray.remote
def f(x):
return x * x
@task(task_config=RayJobConfig(head_node_config=HeadNodeConfig(ray_start_params={"block": "true"}),worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=5, min_replicas=2, max_replicas=10)],runtime_env={"pip": ["numpy", "pandas"]}, ))
def ray_task() -> typing.List[int]:
futures = [f.remote(i) for i in range(10)]
return ray.get(futures)
Command:
pyflyte run --remote kuberay_test.py ray_task
hallowed-mouse-14616
10/21/2022, 3:42 AMray_task
task? I do not believe pyflyte run
supports executing tasks. You should be able to do something as simple as:
@workflow
def ray_wf():
ray_task()
and then attempt to execute the workflow with pyflyte run --remote kuberay_test.py ray_wf
delightful-lion-57360
10/21/2022, 4:58 AMtall-lock-23197
tall-lock-23197
pyflyte run
.delightful-lion-57360
10/21/2022, 5:16 AMtall-lock-23197
delightful-lion-57360
10/21/2022, 5:41 AMtall-lock-23197
import ray
import typing
from flytekit import task
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
@ray.remote
def f(x):
return x * x
@task(
task_config=RayJobConfig(
head_node_config=HeadNodeConfig(ray_start_params={"block": "True"}),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group", replicas=5, min_replicas=2, max_replicas=10
)
],
runtime_env={"pip": ["numpy", "pandas"]},
)
)
def ray_task(n: int) -> typing.List[int]:
futures = [f.remote(i) for i in range(n)]
return ray.get(futures)
tall-lock-23197
high-accountant-32689
10/21/2022, 7:12 AMtyping.List
as the return type for the ray task?delightful-lion-57360
10/22/2022, 3:04 AM