Got it, first single node multi GPU :+1: I can imp...
# torch-elastic
f
Got it, first single node multi GPU đź‘Ť I can implement this on Friday or Saturday and then ping you for review. Or do you need it before that?
k
Hmm that should be ok. What I want to do is train alpaca on Flyte and have that as a demo
Actually if you open a PR directly on flytekit I can hack too
Else I can copy paste hack and open PR
Let me do it for single machine and you can make it work for distributed
f
Can you give me permissions to open a PR in flytekit please?
Then I’ll push there
Or feel free to just copy, whichever is easier for you 🙂
k
Ohh you don’t have perms
Can I give you
I can send it
Wait you should get it in 2 minutes
f
Ok, branch is ready to push 🙂
thx
k
Ok you should have it
f
Cool thanks 🙂 I closed the other PR from the fork
Feel free to also hack/commit on this branch
k
Perfect
f
This is going to be awesome
We currently beat ignite into launching the local process group instead of torchrun.
Looking very forward to throw that logic out
@Ketan (kumare3) I pushed a few commits to the wip branch. Cleanup + docstrings. Also working on making it work in a distributed way now.
k
Yup, I pushed some Commits too
If you had seen
This is looking great
f
Saw them đź‘Ť
k
I will try to get alpaca working on it too
Then we can test
On a side note I also got tasks working from a jupyter notebook -
That way you can train large models directly from an interactive environment
f
You mean write task in notebook and then just run task from there?
k
Yup
No need to have it in a phythojnscript
Finally you will have to copy
f
I’m not much of a notebook user ^^ But I guess for many data scientists this is a killer feature
k
Ya that’s my hope
Mee too
f
I have a question about how to select the plugin for the task type. I have this:
Copy code
class MultiNodePytorchElasticFunctionTask(PythonFunctionTask[Elastic]):
    _ELASTIC_TASK_TYPE = "torch-elastic"

    def __init__(self, task_config: Elastic, task_function: Callable, **kwargs):
        super(MultiNodePytorchElasticFunctionTask, self).__init__(
            task_type=self._ELASTIC_TASK_TYPE,
            **kwargs,

    def get_custom(...): ...
I also added this to helm values:
Copy code
enabled_plugins:
    tasks:
      task-plugins:
        enabled-plugins:
          - ...
          - pytorch
        default-for-task-types:
          - ...
          pytorch: pytorch
          torch-elastic: pytorch
Propeller says:
Copy code
{
  "json": {
    "exec_id": "f1c678faf0fd74fad828",
    "node": "n0",
    "ns": "flytesnacks-development",
    "res_ver": "23058",
    "routine": "worker-2",
    "tasktype": "torch-elastic",
    "wf": "flytesnacks:development:<http://wf.wf|wf.wf>"
  },
  "level": "warning",
  "msg": "No plugin found for Handler-type [torch-elastic], defaulting to [container]",
  "ts": "2023-04-08T22:26:30Z"
}
Do I need to configure this somewhere else as well? The existing pytorch plugin in flyteplugins just needs an additional if else whether to configure an ElasticPolicy.
k
AFK
i think your config looks right
@Fabio Grätz when you get a chance check the first few log lines if you start flytepropeller
i think this config looks ok
@Fabio Grätz quick question, we should not need
standalone
/ single node pytorch operator right?
we should automatically adapt?
what if, we add a check in TorchElasticConstructor and change the task-type
if num replicas is 1
then the plugin type is
torch-elastic-standalone
else it is
torch-elastic
and the backend config is set of
torch-elastic
to use
pytorch-operator
?
f
I was thinking exactly the same. I feel like this should go into the existing pytorch plugin, not new ones, since also for the kubeflow training operator vanilla torch distributed training and torch elastic training only differs by the elastic config in the pytorchjob manifest. Same k8s kind though. This stays the same for backwards compatibility of course
pip install flytekitplugins-kfpytorch
Copy code
from flytekitplugins.kfpytorch import Pytorch

@task(
    task_config=Pytorch(...)
)
But people could to
pip install flytekitplugins-kfpytorch[elastic]
(for the torch dependency) and then:
Copy code
from flytekitplugins.kfpytorch import ElasticPytorch

@task(
    task_config=ElasticPytorch(nnodes=1) # single pod, no operator
)

@task(
    task_config=ElasticPytorch(nnodes=2) # pytorch operator
)
And in flyteplugins all the pytorch code can be reused as well, just an if whether we need to set elastic config in pytorchjob.
Already works:
Copy code
class PytorchElasticFunctionTask(PythonFunctionTask[Elastic]):
    _ELASTIC_TASK_TYPE = "pytorch"
    _ELASTIC_TASK_TYPE_STANDALONE = "container"

    def __init__(self, task_config: Elastic, task_function: Callable, **kwargs):
        task_type = self._ELASTIC_TASK_TYPE_STANDALONE if task_config.nnodes == 1 else self._ELASTIC_TASK_TYPE

        super(PytorchElasticFunctionTask, self).__init__(
            task_config=task_config,
            task_type=task_type,

    ...

    def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any]]:
        if self.task_config.nnodes == 1:
            """
            Torch elastic distributed training is executed in a normal k8s pod so that this
            works without the kubeflow train operator.
            """
            return super().get_custom(settings)
        else:
            from flytekitplugins.kfpytorch.models import PyTorchJob
            job = PyTorchJob(
Copy code
Every 2.0s: kubectl get pods -n flytesnacks-development          Fabios-MacBook-Pro.local: Sun Apr  9 23:04:03 2023

NAME                                 READY   STATUS    RESTARTS   AGE
f91014ed8990b4c79b32-n0-0-master-0   1/1     Running   0          23s
f91014ed8990b4c79b32-n0-0-worker-0   1/1     Running   0          22s
f91014ed8990b4c79b32-n0-0-worker-1   1/1     Running   0          16s
f7e922a78842044aba46-n0-0            1/1     Running   0          7s
Only diff between the two is
nnodes
being 1 or not.
Can you pls give me perms to make a PR in idl and plugins next week? Or shell I do from fork there?
I will work on the changes in plugins tomorrow.
Free day in Germany
🙂
k
I can give your perms
i like the idea
idl and plugins permissions added
also we can simply add the same plugin for different config types
@Fabio Grätz also thought some more of
Copy code
pip install flytekitplugins-kfpytorch[elastic]
Maybe we can simply add an import gate. if modulenot found, raise an error that torch should be installed
Also @Fabio Grätz I have this repo created = https://github.com/unionai-oss/stanford_alpaca/pull/1
check it out
f
I saw you simplified the models, didn’t know this was possible, nice 👍 Update from my side: I opened a draft PR in idl and in plugins. Built a propeller image, creating a distributed pytorchjob with elastic config works. What is not working reliably yet is the rendevouz when initiating the process group. We definitely need something similar to what I added here:
Copy code
rdzv_endpoint=os.environ.get("PET_RDZV_ENDPOINT", f"localhost:0"),
Here,
localhost:0
means torchrun picks a free port (see docs). I’m currently working on making this minimal elastic example from the kubeflow training operator repo work:
Copy code
import os
import logging

from flytekit import task, workflow
from flytekitplugins.kfpytorch import PyTorch, Elastic

logging.basicConfig(level=<http://logging.INFO|logging.INFO>)  # To see torchrun trying to establish the rendevouz


@task(
    task_config=Elastic(
        nnodes=2,
        nproc_per_node=2,
        start_method="fork",
    )
    #task_config=PyTorch(num_workers=2)
)
def train() -> str:
    import io
    import os
    import pprint
    import sys
    import time

    import torch.distributed as dist

    env_dict = {
        k: os.environ[k]
        for k in (
            "LOCAL_RANK",
            "RANK",
            "GROUP_RANK",
            "WORLD_SIZE",
            "MASTER_ADDR",
            "MASTER_PORT",
            "TORCHELASTIC_RESTART_COUNT",
            "TORCHELASTIC_MAX_RESTARTS",
        )
    }

    with io.StringIO() as buff:
        print("======================================================", file=buff)
        print(
            f"Environment variables set by the agent on PID {os.getpid()}:", file=buff
        )
        pprint.pprint(env_dict, stream=buff)
        print("======================================================", file=buff)
        print(buff.getvalue())
        sys.stdout.flush()

    dist.init_process_group(backend="gloo")
    dist.barrier()
    rank = dist.get_rank()
    print(
        (
            f"On PID {os.getpid()}, after init process group, "
            f"rank={dist.get_rank()}, world_size = {dist.get_world_size()}\n"
        )
    )
    dist.destroy_process_group()
    return f"foo-{rank}"

@workflow
def wf():
    train()


if __name__ == "__main__":
    print(f"Parent {os.getpid()}")
    print(wf())
Rendevouz sometimes fails, sometimes works, currently debugging why. Just as fyi where I’m at…
202 Views