Fabio Grätz
04/05/2023, 7:27 AMKetan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Fabio Grätz
04/05/2023, 1:48 PMFabio Grätz
04/05/2023, 1:48 PMFabio Grätz
04/05/2023, 8:10 PMKetan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Fabio Grätz
04/05/2023, 8:12 PMFabio Grätz
04/05/2023, 8:12 PMKetan (kumare3)
Fabio Grätz
04/05/2023, 8:13 PMFabio Grätz
04/05/2023, 8:15 PMFabio Grätz
04/05/2023, 8:15 PMKetan (kumare3)
Fabio Grätz
04/05/2023, 8:15 PMFabio Grätz
04/05/2023, 8:15 PMFabio Grätz
04/05/2023, 8:15 PMFabio Grätz
04/08/2023, 9:07 PMKetan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Fabio Grätz
04/08/2023, 9:09 PMKetan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Fabio Grätz
04/08/2023, 9:11 PMKetan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Fabio Grätz
04/08/2023, 9:14 PMKetan (kumare3)
Ketan (kumare3)
Fabio Grätz
04/08/2023, 10:32 PMclass MultiNodePytorchElasticFunctionTask(PythonFunctionTask[Elastic]):
_ELASTIC_TASK_TYPE = "torch-elastic"
def __init__(self, task_config: Elastic, task_function: Callable, **kwargs):
super(MultiNodePytorchElasticFunctionTask, self).__init__(
task_type=self._ELASTIC_TASK_TYPE,
**kwargs,
def get_custom(...): ...
I also added this to helm values:
enabled_plugins:
tasks:
task-plugins:
enabled-plugins:
- ...
- pytorch
default-for-task-types:
- ...
pytorch: pytorch
torch-elastic: pytorch
Propeller says:
{
"json": {
"exec_id": "f1c678faf0fd74fad828",
"node": "n0",
"ns": "flytesnacks-development",
"res_ver": "23058",
"routine": "worker-2",
"tasktype": "torch-elastic",
"wf": "flytesnacks:development:<http://wf.wf|wf.wf>"
},
"level": "warning",
"msg": "No plugin found for Handler-type [torch-elastic], defaulting to [container]",
"ts": "2023-04-08T22:26:30Z"
}
Do I need to configure this somewhere else as well?
The existing pytorch plugin in flyteplugins just needs an additional if else whether to configure an ElasticPolicy.Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
standalone
/ single node pytorch operator right?Ketan (kumare3)
Ketan (kumare3)
if num replicas is 1
then the plugin type is torch-elastic-standalone
else it is torch-elastic
and the backend config is set of torch-elastic
to use pytorch-operator
?Fabio Grätz
04/09/2023, 8:58 PMpip install flytekitplugins-kfpytorch
from flytekitplugins.kfpytorch import Pytorch
@task(
task_config=Pytorch(...)
)
But people could to pip install flytekitplugins-kfpytorch[elastic]
(for the torch dependency) and then:
from flytekitplugins.kfpytorch import ElasticPytorch
@task(
task_config=ElasticPytorch(nnodes=1) # single pod, no operator
)
@task(
task_config=ElasticPytorch(nnodes=2) # pytorch operator
)
And in flyteplugins all the pytorch code can be reused as well, just an if whether we need to set elastic config in pytorchjob.Fabio Grätz
04/09/2023, 9:03 PMclass PytorchElasticFunctionTask(PythonFunctionTask[Elastic]):
_ELASTIC_TASK_TYPE = "pytorch"
_ELASTIC_TASK_TYPE_STANDALONE = "container"
def __init__(self, task_config: Elastic, task_function: Callable, **kwargs):
task_type = self._ELASTIC_TASK_TYPE_STANDALONE if task_config.nnodes == 1 else self._ELASTIC_TASK_TYPE
super(PytorchElasticFunctionTask, self).__init__(
task_config=task_config,
task_type=task_type,
...
def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]]:
if self.task_config.nnodes == 1:
"""
Torch elastic distributed training is executed in a normal k8s pod so that this
works without the kubeflow train operator.
"""
return super().get_custom(settings)
else:
from flytekitplugins.kfpytorch.models import PyTorchJob
job = PyTorchJob(
Fabio Grätz
04/09/2023, 9:04 PMEvery 2.0s: kubectl get pods -n flytesnacks-development Fabios-MacBook-Pro.local: Sun Apr 9 23:04:03 2023
NAME READY STATUS RESTARTS AGE
f91014ed8990b4c79b32-n0-0-master-0 1/1 Running 0 23s
f91014ed8990b4c79b32-n0-0-worker-0 1/1 Running 0 22s
f91014ed8990b4c79b32-n0-0-worker-1 1/1 Running 0 16s
f7e922a78842044aba46-n0-0 1/1 Running 0 7s
Fabio Grätz
04/09/2023, 9:04 PMnnodes
being 1 or not.Fabio Grätz
04/09/2023, 9:05 PMFabio Grätz
04/09/2023, 9:05 PMFabio Grätz
04/09/2023, 9:05 PMFabio Grätz
04/09/2023, 9:05 PMKetan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
Ketan (kumare3)
pip install flytekitplugins-kfpytorch[elastic]
Maybe we can simply add an import gate. if modulenot found, raise an error that torch should be installedKetan (kumare3)
Ketan (kumare3)
Fabio Grätz
04/11/2023, 7:27 AMrdzv_endpoint=os.environ.get("PET_RDZV_ENDPOINT", f"localhost:0"),
Here, localhost:0
means torchrun picks a free port (see docs).
I’m currently working on making this minimal elastic example from the kubeflow training operator repo work:
import os
import logging
from flytekit import task, workflow
from flytekitplugins.kfpytorch import PyTorch, Elastic
logging.basicConfig(level=<http://logging.INFO|logging.INFO>) # To see torchrun trying to establish the rendevouz
@task(
task_config=Elastic(
nnodes=2,
nproc_per_node=2,
start_method="fork",
)
#task_config=PyTorch(num_workers=2)
)
def train() -> str:
import io
import os
import pprint
import sys
import time
import torch.distributed as dist
env_dict = {
k: os.environ[k]
for k in (
"LOCAL_RANK",
"RANK",
"GROUP_RANK",
"WORLD_SIZE",
"MASTER_ADDR",
"MASTER_PORT",
"TORCHELASTIC_RESTART_COUNT",
"TORCHELASTIC_MAX_RESTARTS",
)
}
with io.StringIO() as buff:
print("======================================================", file=buff)
print(
f"Environment variables set by the agent on PID {os.getpid()}:", file=buff
)
pprint.pprint(env_dict, stream=buff)
print("======================================================", file=buff)
print(buff.getvalue())
sys.stdout.flush()
dist.init_process_group(backend="gloo")
dist.barrier()
rank = dist.get_rank()
print(
(
f"On PID {os.getpid()}, after init process group, "
f"rank={dist.get_rank()}, world_size = {dist.get_world_size()}\n"
)
)
dist.destroy_process_group()
return f"foo-{rank}"
@workflow
def wf():
train()
if __name__ == "__main__":
print(f"Parent {os.getpid()}")
print(wf())
Rendevouz sometimes fails, sometimes works, currently debugging why. Just as fyi where I’m at…