Tarmily Wen
12/12/2022, 11:32 PM@dynamic
def request_train_resources_task(
compressed_dataset: FlyteFile,
train_parameters: TrainParameters,
) -> TrainOutputs:
task_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=train_parameters.num_nodes - 1,
)
],
)
resources_request = Resources(
cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
gpu=str(train_parameters.num_gpus),
ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
)
print(f"task_config: {task_config}")
print(f"resources_request: {resources_request}")
return pl_mnist_train_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
Yee
task_config
isn’t actually read by the with_overrides call.Tarmily Wen
12/13/2022, 2:04 AMYee
Tarmily Wen
12/13/2022, 2:06 AM@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
compressed_dataset: FlyteFile,
train_parameters: TrainParameters,
) -> TrainOutputs:
if train_parameters.use_ray:
task_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=train_parameters.num_nodes - 1,
)
],
)
else:
task_config = None
resources_request = Resources(
cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
gpu=str(train_parameters.num_gpus),
ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
)
print(f"task_config: {task_config}")
print(f"resources_request: {resources_request}")
if train_parameters.use_ray:
return pl_mnist_train_ray_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
else:
return pl_mnist_train_python_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
Yee
Tarmily Wen
12/13/2022, 2:06 AMYee
"changed the task config for the under lying task to ray"
Tarmily Wen
12/13/2022, 2:10 AM@task(
task_config=RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=0,
)
],
),
secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_ray_task(...
@task(
secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_python_task(
Yee
Tarmily Wen
12/13/2022, 2:13 AMYee
Tolerations: gpu-type=t4:NoSchedule
<http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> op=Exists for 300s
<http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> op=Exists for 300s
num-gpus=1:NoSchedule
<http://nvidia.com/gpu:NoSchedule|nvidia.com/gpu:NoSchedule> op=Exists
are not correct?Tarmily Wen
12/13/2022, 2:14 AMYee
Tarmily Wen
12/13/2022, 2:16 AMYee
Tarmily Wen
12/13/2022, 8:09 PMKevin Su
12/13/2022, 9:14 PMTarmily Wen
12/13/2022, 9:26 PMKevin Su
12/16/2022, 8:12 PMTarmily Wen
12/21/2022, 1:27 AMKevin Su
12/21/2022, 1:29 AMTarmily Wen
12/21/2022, 2:22 AMKevin Su
12/21/2022, 2:35 AMmake docker_build
. after that, update the propeller image in your cluster.Tarmily Wen
12/21/2022, 3:41 PMYee
Kevin Su
12/21/2022, 7:56 PMpingsutw/flytepropeller:34c4e43574a7fded751c362733508b9f11b8637c
Yee
pl_mnist_train_python_task._task_config
to the config you want.Tarmily Wen
12/21/2022, 8:55 PMYee
Tarmily Wen
12/21/2022, 9:17 PM@task(
task_config=RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=0,
)
],
),
secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
requests=Resources(cpu="22", mem="80G", gpu="1", ephemeral_storage="700G"),
)
def pl_mnist_train_ray_task(
...
@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
compressed_dataset: FlyteFile,
train_parameters: TrainParameters,
) -> TrainOutputs:
if train_parameters.use_ray:
task_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=train_parameters.num_nodes - 1,
)
],
)
else:
task_config = None
resources_request = Resources(
cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
gpu=str(train_parameters.num_gpus),
ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
)
print(f"task_config: {task_config}")
print(f"resources_request: {resources_request}")
if train_parameters.use_ray:
return pl_mnist_train_ray_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
I just give the task a ray config so that it will show up as a ray task. I give it a dummy value of 0 worker replicas. But when it is actually executed from the dynamic it still have 0 workers even when the override task_config specifies workers=1. I have tested this the other way around as well where the dummy value is replicas=1 and the dynamic is set to 0. 1 worker is created but never usedKevin Su
12/21/2022, 9:24 PMtask_config
, like
@dynamic
def dynamic_ray(n: int) -> typing.List[int]:
if n > 2:
ray_task._task_config.worker_node_config[0].replicas = 6
else:
ray_task._task_config.worker_node_config[0].replicas = 8
return ray_task(n=5)
with_override
can’t change the config attribute for now, so just directly change the task variable. it work, I’ve tested itTarmily Wen
12/21/2022, 10:47 PMKevin Su
01/12/2023, 10:31 PMTarmily Wen
01/12/2023, 11:17 PM