https://flyte.org logo
#ask-the-community
Title
# ask-the-community
t

Tarmily Wen

12/12/2022, 11:32 PM
I am currently trying to combine dynamic overrides with ray to dynamically determine the ray cluster size when running a task. But when the task under the dynamic runs, it is considered a python task and never creates a ray cluster.
Copy code
@dynamic
def request_train_resources_task(
    compressed_dataset: FlyteFile,
    train_parameters: TrainParameters,
) -> TrainOutputs:
    task_config = RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={
                "block": "true",
            }
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group",
                replicas=train_parameters.num_nodes - 1,
            )
        ],
    )
    resources_request = Resources(
        cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
        mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
        gpu=str(train_parameters.num_gpus),
        ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
    )
    print(f"task_config: {task_config}")
    print(f"resources_request: {resources_request}")
    return pl_mnist_train_task(
        compressed_dataset=compressed_dataset,
        train_parameters=train_parameters,
    ).with_overrides(
        task_config=task_config,
        requests=resources_request,
    )
y

Yee

12/13/2022, 1:56 AM
we’ll follow up tomorrow a bit more, but the
task_config
isn’t actually read by the with_overrides call.
t

Tarmily Wen

12/13/2022, 2:04 AM
I noticed something interesting and im guessing it is a bug, but when a ray task requests for resources it overwrites the tolerations set in values_gcp.yaml
i changed the task config for the under lying task to ray and noticed it never autoscalled my gpu node pool, but the python one did
y

Yee

12/13/2022, 2:06 AM
we’ll get back to you tomorrow… it’s a bit tricky. the issue is that typically, the normal resources like gpu/cpu are part of the task node overrides
t

Tarmily Wen

12/13/2022, 2:06 AM
This is the code change i made to test it
Copy code
@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
    compressed_dataset: FlyteFile,
    train_parameters: TrainParameters,
) -> TrainOutputs:
    if train_parameters.use_ray:
        task_config = RayJobConfig(
            head_node_config=HeadNodeConfig(
                ray_start_params={
                    "block": "true",
                }
            ),
            worker_node_config=[
                WorkerNodeConfig(
                    group_name="ray-group",
                    replicas=train_parameters.num_nodes - 1,
                )
            ],
        )
    else:
        task_config = None
    resources_request = Resources(
        cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
        mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
        gpu=str(train_parameters.num_gpus),
        ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
    )
    print(f"task_config: {task_config}")
    print(f"resources_request: {resources_request}")
    if train_parameters.use_ray:
        return pl_mnist_train_ray_task(
            compressed_dataset=compressed_dataset,
            train_parameters=train_parameters,
        ).with_overrides(
            task_config=task_config,
            requests=resources_request,
        )
    else:
        return pl_mnist_train_python_task(
            compressed_dataset=compressed_dataset,
            train_parameters=train_parameters,
        ).with_overrides(
            task_config=task_config,
            requests=resources_request,
        )
y

Yee

12/13/2022, 2:06 AM
when changing the ray cluster resources, you’re now changing part of the task definition. this is okay, but let us get back to you on the best way to do it
t

Tarmily Wen

12/13/2022, 2:06 AM
Alright no rush
y

Yee

12/13/2022, 2:08 AM
yes so this is not possible, not directly at least -
"changed the task config for the under lying task to ray"
the config for the task determines its type. it’s best to leave that the same after you create it.
but it is possible, to change which task is called in a dynamic task
t

Tarmily Wen

12/13/2022, 2:10 AM
So what I mean for that is i made 2 tasks one with a default task type of ray with basically a dummy config for it which will be replaced later and another task whose default task type is python. I did this to just test how dynamically calling the task will work.
Copy code
@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={
                "block": "true",
            }
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group",
                replicas=0,
            )
        ],
    ),
    secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_ray_task(...

@task(
    secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_python_task(
And the ray task that has the dummy config uses its own tolerations
y

Yee

12/13/2022, 2:13 AM
tolerations are a separate concern right?
t

Tarmily Wen

12/13/2022, 2:13 AM
yes
y

Yee

12/13/2022, 2:13 AM
you’re saying the tolerations
Copy code
Tolerations:     gpu-type=t4:NoSchedule
                 <http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> op=Exists for 300s
                 <http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> op=Exists for 300s
                 num-gpus=1:NoSchedule
                 <http://nvidia.com/gpu:NoSchedule|nvidia.com/gpu:NoSchedule> op=Exists
are not correct?
t

Tarmily Wen

12/13/2022, 2:14 AM
That is the correct toleration
but the ray task will exclude my own custom defined keys: num-gpus and gpu-type
y

Yee

12/13/2022, 2:15 AM
but they’ve been set resource tolerations?
t

Tarmily Wen

12/13/2022, 2:16 AM
correct
y

Yee

12/13/2022, 2:19 AM
kevin will take a look, maybe the plugin is not calling that somehow.
t

Tarmily Wen

12/13/2022, 8:09 PM
Another thing I found is that when running a ray task, they cannot find the kube secrets, but the python task can
k

Kevin Su

12/13/2022, 9:14 PM
I found that ray-plugin in propeller doesn’t update the toleration and secret. I will update it, and create a pr.
t

Tarmily Wen

12/13/2022, 9:26 PM
thanks, so I think the only issue now is the dynamic resources. I tried having a dynamic number of replica workers and only the the dummy config amount of workers is created which is 0 in my case and when I changed the placeholder value to 1 it created a worker. So it would need to be able to dynamically create the correct number of workers
Hi is there any progress on this?
k

Kevin Su

12/16/2022, 8:12 PM
@Tarmily Wen I’m working on this today, will get back to you
@Tarmily Wen I just created a pr, mind taking a look when you get a chance
t

Tarmily Wen

12/21/2022, 1:27 AM
sorry I was away. Where is the PR?
t

Tarmily Wen

12/21/2022, 2:22 AM
The pr looks good to me, is there any way for me to test the plugin before merging?
k

Kevin Su

12/21/2022, 2:35 AM
you could update the plugin version in the flytepropeller, and then build an image
make docker_build
. after that, update the propeller image in your cluster.
t

Tarmily Wen

12/21/2022, 3:41 PM
I think it is good to merge in
@Yee @Kevin Su Have you been able to identify a way to dynamically define the ray cluster size?
y

Yee

12/21/2022, 4:39 PM
sorry we had a couple tickets come up on the databricks integration we’re working on.
i’ll take a look at this this morning
k

Kevin Su

12/21/2022, 7:56 PM
@Tarmily Wen have you tested it. If not, I’ve built a propeller image, you can test it on your side.
pingsutw/flytepropeller:34c4e43574a7fded751c362733508b9f11b8637c
y

Yee

12/21/2022, 8:52 PM
@Tarmily Wen how widespread will this be?
there’s the quick way of doing this and there’s the correct way to do this. we’re still trying to work out the latter (but one of kevin’s suggestions that could very well work would be to make the dynamic task itself a task resolver, so that it can load tasks)
for the quick way, you should be able to just override the _task_config object
set
pl_mnist_train_python_task._task_config
to the config you want.
it should get picked up in the compilation process.
t

Tarmily Wen

12/21/2022, 8:55 PM
so in the override(task_config=...) that doesn't do the same thing?
how would it get picked up i the compilation if the input determines the cluster size?
y

Yee

12/21/2022, 9:12 PM
it should
because in dynamic, the compilation happens at run-time
and the task that’s run is not actually the task that’s registered with admin, it’s the one included in the dynamic job spec
(side note for the acutely aware who noticed that the spec does not include launch plans, this trick of modifying the object in the dynamic function doesn’t work for launch plans, as those are fetched post-compilation)
also this will affect local execution, though local execution shouldn’t actually reference the values you’re changing
does this make sense?
t

Tarmily Wen

12/21/2022, 9:17 PM
So these are my tasks
Copy code
@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={
                "block": "true",
            }
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group",
                replicas=0,
            )
        ],
    ),
    secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
    requests=Resources(cpu="22", mem="80G", gpu="1", ephemeral_storage="700G"),
)
def pl_mnist_train_ray_task(
...

@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
    compressed_dataset: FlyteFile,
    train_parameters: TrainParameters,
) -> TrainOutputs:
    if train_parameters.use_ray:
        task_config = RayJobConfig(
            head_node_config=HeadNodeConfig(
                ray_start_params={
                    "block": "true",
                }
            ),
            worker_node_config=[
                WorkerNodeConfig(
                    group_name="ray-group",
                    replicas=train_parameters.num_nodes - 1,
                )
            ],
        )
    else:
        task_config = None
    resources_request = Resources(
        cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
        mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
        gpu=str(train_parameters.num_gpus),
        ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
    )
    print(f"task_config: {task_config}")
    print(f"resources_request: {resources_request}")
    if train_parameters.use_ray:
        return pl_mnist_train_ray_task(
            compressed_dataset=compressed_dataset,
            train_parameters=train_parameters,
        ).with_overrides(
            task_config=task_config,
            requests=resources_request,
        )
I just give the task a ray config so that it will show up as a ray task. I give it a dummy value of 0 worker replicas. But when it is actually executed from the dynamic it still have 0 workers even when the override task_config specifies workers=1. I have tested this the other way around as well where the dummy value is replicas=1 and the dynamic is set to 0. 1 worker is created but never used
k

Kevin Su

12/21/2022, 9:24 PM
@Tarmily Wen one of the option is that override
task_config
, like
Copy code
@dynamic
def dynamic_ray(n: int) -> typing.List[int]:
    if n > 2:
        ray_task._task_config.worker_node_config[0].replicas = 6
    else:
        ray_task._task_config.worker_node_config[0].replicas = 8
    return ray_task(n=5)
with_override
can’t change the config attribute for now, so just directly change the task variable. it work, I’ve tested it
t

Tarmily Wen

12/21/2022, 10:47 PM
Okay that workaround works for me just tested it. When do you think this will be fixed?
k

Kevin Su

01/12/2023, 10:31 PM
I just created a pr to support it. https://github.com/flyteorg/flytekit/pull/1410
t

Tarmily Wen

01/12/2023, 11:17 PM
Thank you!
92 Views