I am currently trying to combine dynamic overrides...
# ask-the-community
t
I am currently trying to combine dynamic overrides with ray to dynamically determine the ray cluster size when running a task. But when the task under the dynamic runs, it is considered a python task and never creates a ray cluster.
Copy code
@dynamic
def request_train_resources_task(
    compressed_dataset: FlyteFile,
    train_parameters: TrainParameters,
) -> TrainOutputs:
    task_config = RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={
                "block": "true",
            }
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group",
                replicas=train_parameters.num_nodes - 1,
            )
        ],
    )
    resources_request = Resources(
        cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
        mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
        gpu=str(train_parameters.num_gpus),
        ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
    )
    print(f"task_config: {task_config}")
    print(f"resources_request: {resources_request}")
    return pl_mnist_train_task(
        compressed_dataset=compressed_dataset,
        train_parameters=train_parameters,
    ).with_overrides(
        task_config=task_config,
        requests=resources_request,
    )
y
we’ll follow up tomorrow a bit more, but the
task_config
isn’t actually read by the with_overrides call.
t
I noticed something interesting and im guessing it is a bug, but when a ray task requests for resources it overwrites the tolerations set in values_gcp.yaml
i changed the task config for the under lying task to ray and noticed it never autoscalled my gpu node pool, but the python one did
y
we’ll get back to you tomorrow… it’s a bit tricky. the issue is that typically, the normal resources like gpu/cpu are part of the task node overrides
t
This is the code change i made to test it
Copy code
@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
    compressed_dataset: FlyteFile,
    train_parameters: TrainParameters,
) -> TrainOutputs:
    if train_parameters.use_ray:
        task_config = RayJobConfig(
            head_node_config=HeadNodeConfig(
                ray_start_params={
                    "block": "true",
                }
            ),
            worker_node_config=[
                WorkerNodeConfig(
                    group_name="ray-group",
                    replicas=train_parameters.num_nodes - 1,
                )
            ],
        )
    else:
        task_config = None
    resources_request = Resources(
        cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
        mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
        gpu=str(train_parameters.num_gpus),
        ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
    )
    print(f"task_config: {task_config}")
    print(f"resources_request: {resources_request}")
    if train_parameters.use_ray:
        return pl_mnist_train_ray_task(
            compressed_dataset=compressed_dataset,
            train_parameters=train_parameters,
        ).with_overrides(
            task_config=task_config,
            requests=resources_request,
        )
    else:
        return pl_mnist_train_python_task(
            compressed_dataset=compressed_dataset,
            train_parameters=train_parameters,
        ).with_overrides(
            task_config=task_config,
            requests=resources_request,
        )
y
when changing the ray cluster resources, you’re now changing part of the task definition. this is okay, but let us get back to you on the best way to do it
t
Alright no rush
y
yes so this is not possible, not directly at least -
"changed the task config for the under lying task to ray"
the config for the task determines its type. it’s best to leave that the same after you create it.
but it is possible, to change which task is called in a dynamic task
t
So what I mean for that is i made 2 tasks one with a default task type of ray with basically a dummy config for it which will be replaced later and another task whose default task type is python. I did this to just test how dynamically calling the task will work.
Copy code
@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={
                "block": "true",
            }
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group",
                replicas=0,
            )
        ],
    ),
    secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_ray_task(...

@task(
    secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_python_task(
And the ray task that has the dummy config uses its own tolerations
y
tolerations are a separate concern right?
t
yes
y
you’re saying the tolerations
Copy code
Tolerations:     gpu-type=t4:NoSchedule
                 <http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> op=Exists for 300s
                 <http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> op=Exists for 300s
                 num-gpus=1:NoSchedule
                 <http://nvidia.com/gpu:NoSchedule|nvidia.com/gpu:NoSchedule> op=Exists
are not correct?
t
That is the correct toleration
but the ray task will exclude my own custom defined keys: num-gpus and gpu-type
y
but they’ve been set resource tolerations?
t
correct
y
kevin will take a look, maybe the plugin is not calling that somehow.
t
Another thing I found is that when running a ray task, they cannot find the kube secrets, but the python task can
k
I found that ray-plugin in propeller doesn’t update the toleration and secret. I will update it, and create a pr.
t
thanks, so I think the only issue now is the dynamic resources. I tried having a dynamic number of replica workers and only the the dummy config amount of workers is created which is 0 in my case and when I changed the placeholder value to 1 it created a worker. So it would need to be able to dynamically create the correct number of workers
Hi is there any progress on this?
k
@Tarmily Wen I’m working on this today, will get back to you
@Tarmily Wen I just created a pr, mind taking a look when you get a chance
t
sorry I was away. Where is the PR?
t
The pr looks good to me, is there any way for me to test the plugin before merging?
k
you could update the plugin version in the flytepropeller, and then build an image
make docker_build
. after that, update the propeller image in your cluster.
t
I think it is good to merge in
@Yee @Kevin Su Have you been able to identify a way to dynamically define the ray cluster size?
y
sorry we had a couple tickets come up on the databricks integration we’re working on.
i’ll take a look at this this morning
k
@Tarmily Wen have you tested it. If not, I’ve built a propeller image, you can test it on your side.
pingsutw/flytepropeller:34c4e43574a7fded751c362733508b9f11b8637c
y
@Tarmily Wen how widespread will this be?
there’s the quick way of doing this and there’s the correct way to do this. we’re still trying to work out the latter (but one of kevin’s suggestions that could very well work would be to make the dynamic task itself a task resolver, so that it can load tasks)
for the quick way, you should be able to just override the _task_config object
set
pl_mnist_train_python_task._task_config
to the config you want.
it should get picked up in the compilation process.
t
so in the override(task_config=...) that doesn't do the same thing?
how would it get picked up i the compilation if the input determines the cluster size?
y
it should
because in dynamic, the compilation happens at run-time
and the task that’s run is not actually the task that’s registered with admin, it’s the one included in the dynamic job spec
(side note for the acutely aware who noticed that the spec does not include launch plans, this trick of modifying the object in the dynamic function doesn’t work for launch plans, as those are fetched post-compilation)
also this will affect local execution, though local execution shouldn’t actually reference the values you’re changing
does this make sense?
t
So these are my tasks
Copy code
@task(
    task_config=RayJobConfig(
        head_node_config=HeadNodeConfig(
            ray_start_params={
                "block": "true",
            }
        ),
        worker_node_config=[
            WorkerNodeConfig(
                group_name="ray-group",
                replicas=0,
            )
        ],
    ),
    secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
    requests=Resources(cpu="22", mem="80G", gpu="1", ephemeral_storage="700G"),
)
def pl_mnist_train_ray_task(
...

@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
    compressed_dataset: FlyteFile,
    train_parameters: TrainParameters,
) -> TrainOutputs:
    if train_parameters.use_ray:
        task_config = RayJobConfig(
            head_node_config=HeadNodeConfig(
                ray_start_params={
                    "block": "true",
                }
            ),
            worker_node_config=[
                WorkerNodeConfig(
                    group_name="ray-group",
                    replicas=train_parameters.num_nodes - 1,
                )
            ],
        )
    else:
        task_config = None
    resources_request = Resources(
        cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
        mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
        gpu=str(train_parameters.num_gpus),
        ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
    )
    print(f"task_config: {task_config}")
    print(f"resources_request: {resources_request}")
    if train_parameters.use_ray:
        return pl_mnist_train_ray_task(
            compressed_dataset=compressed_dataset,
            train_parameters=train_parameters,
        ).with_overrides(
            task_config=task_config,
            requests=resources_request,
        )
I just give the task a ray config so that it will show up as a ray task. I give it a dummy value of 0 worker replicas. But when it is actually executed from the dynamic it still have 0 workers even when the override task_config specifies workers=1. I have tested this the other way around as well where the dummy value is replicas=1 and the dynamic is set to 0. 1 worker is created but never used
k
@Tarmily Wen one of the option is that override
task_config
, like
Copy code
@dynamic
def dynamic_ray(n: int) -> typing.List[int]:
    if n > 2:
        ray_task._task_config.worker_node_config[0].replicas = 6
    else:
        ray_task._task_config.worker_node_config[0].replicas = 8
    return ray_task(n=5)
with_override
can’t change the config attribute for now, so just directly change the task variable. it work, I’ve tested it
t
Okay that workaround works for me just tested it. When do you think this will be fixed?
k
I just created a pr to support it. https://github.com/flyteorg/flytekit/pull/1410
t
Thank you!
191 Views