delightful-computer-49028
12/12/2022, 11:32 PM@dynamic
def request_train_resources_task(
compressed_dataset: FlyteFile,
train_parameters: TrainParameters,
) -> TrainOutputs:
task_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=train_parameters.num_nodes - 1,
)
],
)
resources_request = Resources(
cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
gpu=str(train_parameters.num_gpus),
ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
)
print(f"task_config: {task_config}")
print(f"resources_request: {resources_request}")
return pl_mnist_train_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
thankful-minister-83577
task_config
isn’t actually read by the with_overrides call.delightful-computer-49028
12/13/2022, 2:04 AMdelightful-computer-49028
12/13/2022, 2:04 AMdelightful-computer-49028
12/13/2022, 2:05 AMthankful-minister-83577
delightful-computer-49028
12/13/2022, 2:06 AM@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
compressed_dataset: FlyteFile,
train_parameters: TrainParameters,
) -> TrainOutputs:
if train_parameters.use_ray:
task_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=train_parameters.num_nodes - 1,
)
],
)
else:
task_config = None
resources_request = Resources(
cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
gpu=str(train_parameters.num_gpus),
ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
)
print(f"task_config: {task_config}")
print(f"resources_request: {resources_request}")
if train_parameters.use_ray:
return pl_mnist_train_ray_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
else:
return pl_mnist_train_python_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
thankful-minister-83577
delightful-computer-49028
12/13/2022, 2:06 AMthankful-minister-83577
"changed the task config for the under lying task to ray"
thankful-minister-83577
thankful-minister-83577
delightful-computer-49028
12/13/2022, 2:10 AM@task(
task_config=RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=0,
)
],
),
secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_ray_task(...
@task(
secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
)
def pl_mnist_train_python_task(
delightful-computer-49028
12/13/2022, 2:10 AMthankful-minister-83577
delightful-computer-49028
12/13/2022, 2:13 AMthankful-minister-83577
Tolerations: gpu-type=t4:NoSchedule
<http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> op=Exists for 300s
<http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> op=Exists for 300s
num-gpus=1:NoSchedule
<http://nvidia.com/gpu:NoSchedule|nvidia.com/gpu:NoSchedule> op=Exists
are not correct?delightful-computer-49028
12/13/2022, 2:14 AMdelightful-computer-49028
12/13/2022, 2:14 AMthankful-minister-83577
delightful-computer-49028
12/13/2022, 2:16 AMthankful-minister-83577
delightful-computer-49028
12/13/2022, 8:09 PMglamorous-carpet-83516
12/13/2022, 9:14 PMdelightful-computer-49028
12/13/2022, 9:26 PMdelightful-computer-49028
12/16/2022, 3:50 PMglamorous-carpet-83516
12/16/2022, 8:12 PMglamorous-carpet-83516
12/17/2022, 2:40 AMdelightful-computer-49028
12/21/2022, 1:27 AMglamorous-carpet-83516
12/21/2022, 1:29 AMdelightful-computer-49028
12/21/2022, 2:22 AMglamorous-carpet-83516
12/21/2022, 2:35 AMmake docker_build
. after that, update the propeller image in your cluster.delightful-computer-49028
12/21/2022, 3:41 PMdelightful-computer-49028
12/21/2022, 4:38 PMthankful-minister-83577
thankful-minister-83577
glamorous-carpet-83516
12/21/2022, 7:56 PMpingsutw/flytepropeller:34c4e43574a7fded751c362733508b9f11b8637c
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
pl_mnist_train_python_task._task_config
to the config you want.thankful-minister-83577
delightful-computer-49028
12/21/2022, 8:55 PMdelightful-computer-49028
12/21/2022, 8:55 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
delightful-computer-49028
12/21/2022, 9:17 PM@task(
task_config=RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=0,
)
],
),
secret_requests=[Secret(group="wandb-secrets", key="API_KEY")],
requests=Resources(cpu="22", mem="80G", gpu="1", ephemeral_storage="700G"),
)
def pl_mnist_train_ray_task(
...
@dynamic(requests=Resources(ephemeral_storage="50G"))
def request_train_resources_task(
compressed_dataset: FlyteFile,
train_parameters: TrainParameters,
) -> TrainOutputs:
if train_parameters.use_ray:
task_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={
"block": "true",
}
),
worker_node_config=[
WorkerNodeConfig(
group_name="ray-group",
replicas=train_parameters.num_nodes - 1,
)
],
)
else:
task_config = None
resources_request = Resources(
cpu=str(train_parameters.cpus_per_gpu * train_parameters.num_gpus),
mem=f"{train_parameters.memory_per_gpu * train_parameters.num_gpus}G",
gpu=str(train_parameters.num_gpus),
ephemeral_storage=f"{train_parameters.ephemeral_storage}G",
)
print(f"task_config: {task_config}")
print(f"resources_request: {resources_request}")
if train_parameters.use_ray:
return pl_mnist_train_ray_task(
compressed_dataset=compressed_dataset,
train_parameters=train_parameters,
).with_overrides(
task_config=task_config,
requests=resources_request,
)
I just give the task a ray config so that it will show up as a ray task. I give it a dummy value of 0 worker replicas. But when it is actually executed from the dynamic it still have 0 workers even when the override task_config specifies workers=1. I have tested this the other way around as well where the dummy value is replicas=1 and the dynamic is set to 0. 1 worker is created but never usedglamorous-carpet-83516
12/21/2022, 9:24 PMtask_config
, like
@dynamic
def dynamic_ray(n: int) -> typing.List[int]:
if n > 2:
ray_task._task_config.worker_node_config[0].replicas = 6
else:
ray_task._task_config.worker_node_config[0].replicas = 8
return ray_task(n=5)
glamorous-carpet-83516
12/21/2022, 9:25 PMwith_override
can’t change the config attribute for now, so just directly change the task variable. it work, I’ve tested itdelightful-computer-49028
12/21/2022, 10:47 PMglamorous-carpet-83516
01/12/2023, 10:31 PMdelightful-computer-49028
01/12/2023, 11:17 PM