Fabio Grätz
04/05/2023, 7:27 AMKetan (kumare3)
Fabio Grätz
04/05/2023, 1:48 PMKetan (kumare3)
Fabio Grätz
04/05/2023, 8:12 PMKetan (kumare3)
Fabio Grätz
04/05/2023, 8:13 PMKetan (kumare3)
Fabio Grätz
04/05/2023, 8:15 PMKetan (kumare3)
Fabio Grätz
04/08/2023, 9:09 PMKetan (kumare3)
Fabio Grätz
04/08/2023, 9:11 PMKetan (kumare3)
Fabio Grätz
04/08/2023, 9:14 PMKetan (kumare3)
Fabio Grätz
04/08/2023, 10:32 PMclass MultiNodePytorchElasticFunctionTask(PythonFunctionTask[Elastic]):
_ELASTIC_TASK_TYPE = "torch-elastic"
def __init__(self, task_config: Elastic, task_function: Callable, **kwargs):
super(MultiNodePytorchElasticFunctionTask, self).__init__(
task_type=self._ELASTIC_TASK_TYPE,
**kwargs,
def get_custom(...): ...
I also added this to helm values:
enabled_plugins:
tasks:
task-plugins:
enabled-plugins:
- ...
- pytorch
default-for-task-types:
- ...
pytorch: pytorch
torch-elastic: pytorch
Propeller says:
{
"json": {
"exec_id": "f1c678faf0fd74fad828",
"node": "n0",
"ns": "flytesnacks-development",
"res_ver": "23058",
"routine": "worker-2",
"tasktype": "torch-elastic",
"wf": "flytesnacks:development:<http://wf.wf|wf.wf>"
},
"level": "warning",
"msg": "No plugin found for Handler-type [torch-elastic], defaulting to [container]",
"ts": "2023-04-08T22:26:30Z"
}
Do I need to configure this somewhere else as well?
The existing pytorch plugin in flyteplugins just needs an additional if else whether to configure an ElasticPolicy.Ketan (kumare3)
standalone
/ single node pytorch operator right?if num replicas is 1
then the plugin type is torch-elastic-standalone
else it is torch-elastic
and the backend config is set of torch-elastic
to use pytorch-operator
?Fabio Grätz
04/09/2023, 8:58 PMpip install flytekitplugins-kfpytorch
from flytekitplugins.kfpytorch import Pytorch
@task(
task_config=Pytorch(...)
)
But people could to pip install flytekitplugins-kfpytorch[elastic]
(for the torch dependency) and then:
from flytekitplugins.kfpytorch import ElasticPytorch
@task(
task_config=ElasticPytorch(nnodes=1) # single pod, no operator
)
@task(
task_config=ElasticPytorch(nnodes=2) # pytorch operator
)
And in flyteplugins all the pytorch code can be reused as well, just an if whether we need to set elastic config in pytorchjob.class PytorchElasticFunctionTask(PythonFunctionTask[Elastic]):
_ELASTIC_TASK_TYPE = "pytorch"
_ELASTIC_TASK_TYPE_STANDALONE = "container"
def __init__(self, task_config: Elastic, task_function: Callable, **kwargs):
task_type = self._ELASTIC_TASK_TYPE_STANDALONE if task_config.nnodes == 1 else self._ELASTIC_TASK_TYPE
super(PytorchElasticFunctionTask, self).__init__(
task_config=task_config,
task_type=task_type,
...
def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]]:
if self.task_config.nnodes == 1:
"""
Torch elastic distributed training is executed in a normal k8s pod so that this
works without the kubeflow train operator.
"""
return super().get_custom(settings)
else:
from flytekitplugins.kfpytorch.models import PyTorchJob
job = PyTorchJob(
Every 2.0s: kubectl get pods -n flytesnacks-development Fabios-MacBook-Pro.local: Sun Apr 9 23:04:03 2023
NAME READY STATUS RESTARTS AGE
f91014ed8990b4c79b32-n0-0-master-0 1/1 Running 0 23s
f91014ed8990b4c79b32-n0-0-worker-0 1/1 Running 0 22s
f91014ed8990b4c79b32-n0-0-worker-1 1/1 Running 0 16s
f7e922a78842044aba46-n0-0 1/1 Running 0 7s
nnodes
being 1 or not.Ketan (kumare3)
pip install flytekitplugins-kfpytorch[elastic]
Maybe we can simply add an import gate. if modulenot found, raise an error that torch should be installedFabio Grätz
04/11/2023, 7:27 AMrdzv_endpoint=os.environ.get("PET_RDZV_ENDPOINT", f"localhost:0"),
Here, localhost:0
means torchrun picks a free port (see docs).
I’m currently working on making this minimal elastic example from the kubeflow training operator repo work:
import os
import logging
from flytekit import task, workflow
from flytekitplugins.kfpytorch import PyTorch, Elastic
logging.basicConfig(level=<http://logging.INFO|logging.INFO>) # To see torchrun trying to establish the rendevouz
@task(
task_config=Elastic(
nnodes=2,
nproc_per_node=2,
start_method="fork",
)
#task_config=PyTorch(num_workers=2)
)
def train() -> str:
import io
import os
import pprint
import sys
import time
import torch.distributed as dist
env_dict = {
k: os.environ[k]
for k in (
"LOCAL_RANK",
"RANK",
"GROUP_RANK",
"WORLD_SIZE",
"MASTER_ADDR",
"MASTER_PORT",
"TORCHELASTIC_RESTART_COUNT",
"TORCHELASTIC_MAX_RESTARTS",
)
}
with io.StringIO() as buff:
print("======================================================", file=buff)
print(
f"Environment variables set by the agent on PID {os.getpid()}:", file=buff
)
pprint.pprint(env_dict, stream=buff)
print("======================================================", file=buff)
print(buff.getvalue())
sys.stdout.flush()
dist.init_process_group(backend="gloo")
dist.barrier()
rank = dist.get_rank()
print(
(
f"On PID {os.getpid()}, after init process group, "
f"rank={dist.get_rank()}, world_size = {dist.get_world_size()}\n"
)
)
dist.destroy_process_group()
return f"foo-{rank}"
@workflow
def wf():
train()
if __name__ == "__main__":
print(f"Parent {os.getpid()}")
print(wf())
Rendevouz sometimes fails, sometimes works, currently debugging why. Just as fyi where I’m at…