Hi everyone, We successfully implement Slurm agen...
# slurm-flyte-wg
c
Hi everyone, We successfully implement Slurm agent with
ShellTask
, but there's still room for improvement! Following describes how it works: • To mimic NFS, we put a simple python script on Slurm cluster in advance • To run the user-defined batch script on Slurm cluster, we write script content to a tmp file and transfer it to cluster through SFTP • Within
create
method, we construct a
sbatch
command based on user-defined
sbatch
options and run this cmd on cluster
Following provides a naive use case, which echo some message and run the prepared python script on Slurm cluster!
Copy code
import os

from flytekit import task, workflow
from flytekitplugins.slurm import Slurm, SlurmTask


echo = SlurmTask(
    name="echo",
    script="""#!/bin/bash
# We can define sbatch options here
echo "Demo Slurm agent with ShellTask...\n"

# Run a demo python script on Slurm
echo ">>> Run a Demo Python Script <<<"
python3 demo.py
""",
    task_config=Slurm(
        slurm_host="slurm",
        sbatch_conf={
            "partition": "debug",
            "job-name": "tiny-slurm",
        }
    )
)


@workflow
def wf() -> None:
    echo()


if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = os.path.realpath(__file__)

    # Local run
    print(f">>> LOCAL EXEC <<<")
    result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "<s3://flyte-slurm-agent>", path, "wf"])
    print(result.output)
Attached shows the python file content and output of the completed Slurm job.
Next, we will: 1. Set the default working directory to
/root
to align with the task pod behavior 2. Better handle issues related to
_config_task_instance
inside
ShellTask
(Many tmp workarounds now)
e
This is awesome! I think this will be the more common usecase since it avoids any flyte dependencies on the Slurm cluster (at least I think)
Will test today, thanks for all your work Abao
f
this is amazing @creamy-shampoo-53278 thank you for working through all of this
I do think we should also integrate logs retrieval endpoint
c
My pleasure, so glad it helps! I'll keep developing it to make it better!
For log retrieval endpoint, did you mean streaming stdout/stderr from Slurm cluster to the agent side? Thanks!
e
Had a chance to try this out with the following task (everything else is the same) and got the error below:
Copy code
echo_job = SlurmTask(
    name="echo-job-name",
    script="""#!/bin/bash
    echo "Working!" >> /home/ubuntu/slurm/remote_touch.txt
    """,
    task_config=Slurm(
        slurm_host="slurm",
        sbatch_conf={
            "partition": "debug",
            "job-name": "tiny-slurm",
        }
    )
)
results in
Copy code
>>> LOCAL EXEC <<<
Running Execution on local.
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/pryceturner/Desktop/slurm-flyte/shelltask.py:24 in wf                                     │
│                                                                                                  │
│   21                                                                                             │
│   22 @workflow                                                                                   │
│   23 def wf() -> None:                                                                           │
│ ❱ 24 │   echo_job()                                                                              │
│   25                                                                                             │
│   26                                                                                             │
│   27 if __name__ == "__main__":                                                                  │
│                                                                                                  │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:194 in run           │
│                                                                                                  │
│   191 │   │   │   "asyncio.run() cannot be called from a running event loop")                    │
│   192 │                                                                                          │
│   193 │   with Runner(debug=debug, loop_factory=loop_factory) as runner:                         │
│ ❱ 194 │   │   return runner.run(main)                                                            │
│   195                                                                                            │
│   196                                                                                            │
│   197 def _cancel_all_tasks(loop):                                                               │
│                                                                                                  │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:118 in run           │
│                                                                                                  │
│   115 │   │                                                                                      │
│   116 │   │   self._interrupt_count = 0                                                          │
│   117 │   │   try:                                                                               │
│ ❱ 118 │   │   │   return self._loop.run_until_complete(task)                                     │
│   119 │   │   except exceptions.CancelledError:                                                  │
│   120 │   │   │   if self._interrupt_count > 0:                                                  │
│   121 │   │   │   │   uncancel = getattr(task, "uncancel", None)                                 │
│                                                                                                  │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:686 in           │
│ run_until_complete                                                                               │
│                                                                                                  │
│    683 │   │   if not future.done():                                                             │
│    684 │   │   │   raise RuntimeError('Event loop stopped before Future completed.')             │
│    685 │   │                                                                                     │
│ ❱  686 │   │   return future.result()                                                            │
│    687 │                                                                                         │
│    688 │   def stop(self):                                                                       │
│    689 │   │   """Stop running the event loop.                                                   │
│                                                                                                  │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4603 in run                                                                                  │
│                                                                                                  │
│   4600 │   │                                                                                     │
│   4601 │   │   process = await self.create_process(*args, **kwargs) # type: ignore               │
│   4602 │   │                                                                                     │
│ ❱ 4603 │   │   return await process.wait(check, timeout)                                         │
│   4604 │                                                                                         │
│   4605 │   async def create_connection(                                                          │
│   4606 │   │   │   self, session_factory: SSHTCPSessionFactory[AnyStr],                          │
│                                                                                                  │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/process.py │
│ :1572 in wait                                                                                    │
│                                                                                                  │
│   1569 │   │   │   │   │   │   │      stderr_data) from None                                     │
│   1570 │   │                                                                                     │
│   1571 │   │   if check and self.exit_status:                                                    │
│ ❱ 1572 │   │   │   raise ProcessError(self.env, self.command, self.subsystem,                    │
│   1573 │   │   │   │   │   │   │      self.exit_status, self.exit_signal,                        │
│   1574 │   │   │   │   │   │   │      self.returncode, stdout_data, stderr_data)                 │
│   1575 │   │   else:                                                                             │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
ProcessError: Error encountered while executing 'echo-job-name':
  Process exited with non-zero exit status 1
I haven't had a chance to dig into it too much but I can see from the auth logs that an SSH session is being created and closed.
I think in general we should assume that most people coming over from Slurm will be doing things the old way, so we can assume scripts, inputs, and dependencies are already present. IMO we shouldn't worry right now about NFS/SFTP and focus on executing things that are already present. I know it sounds overly simple, but we need the most basic use-case to be absolutely bullet-proof and easy to use otherwise people will give up when they hit the first error and never try again. Curious to hear your opinion and very grateful for the work so far! I'll be able to contribute more meaningfully going forward as I understand your implementation better.
f
I agree
c
I completely agree! To simplify things further, perhaps we could assume that the Slurm batch script is already present on the cluster. In that case, users could simply specify a remote batch script path, and we could disable the script definition within
SlurmTask
.
f
Yes like a command to run
I thought I had given this requirement
c
Hi Ketan, I just simplified the implementation and run a naive task shown in the figure.
Just specify a remote Slurm batch script path in
task_config
and remove SFTP things during
create
.
Copy code
echo_job = SlurmTask(
    name="echo-job-name",
    task_config=Slurm(
        slurm_host="slurm",
        # Specify a remote Slurm batch script
        batch_script_path="/home/abaowei/echo.slurm",
        sbatch_conf={
            "partition": "debug",
            "job-name": "tiny-slurm",
        }
    )
)
f
The slurm script may need arguments
c
Do you mean the sbatch options?
f
no the script itself
Copy code
my-script --test1 ... --blah asdasd
e
Wooo I got it working!
c
Got it! Let me think how to handle that case, thanks Ketan 🙏
e
I'm actually working on that now, may I push to your fork?
c
Morning Pryce, Great to hear that!
e
Good morning!
c
No problem, feel free to update it!
e
Pushed, lmk what you think
c
I just try it as shown in the figure (i.e., run a python script which takes additional args), that's cool!! I haven't come up with such use cases. Thanks for being patient with me 🙏
e
I think a lot of times people will just be calling binaries with args
c
Got it! I have two questions to ask! • For common cases users focus now, does it make sense to just fall back to
PythonTask
instead of
ShellTask
? • Would it be good to write the doc to demonstrate how to test Slurm agent locally in the simplest form, just like we use here? I'm open to discussing further progress. If you have any plans, please feel free to let me know. Thanks so much 🙏
f
Yes docs should show how to run from local. Simple usecases and complex ones
c
No problem. I'll complete it today!
Hi Ketan and Pryce, Here is the doc for setting up a local test env and running a demo (simple use case first!). Please let me know if there’s anything that could be improved. Thanks!
d
Is it possible to install slurm cluster on MacOS?
c
It can be hard (or even impossible) to do that because Slurm is primarily designed for Linux-based system and depends on some linux-specific libs. But, I think linux VM or docker container can also fit our needs for local dev & test!
Here comes a 2-min quick demo of Slurm agent!!

https://youtu.be/ZQtZWcPEMm8

f
@creamy-shampoo-53278 I saw the video, can you do something like this
Copy code
tk = Slurm(
  batch_task_script_command="/blah/blah {inputs.x} {inputs.y}",
  inputs=kwargs(x=int, y=FlyteFile),
  sbatch_conf={
      "some-conf": "{inputs.x}",
      ...
  },
)
@damp-lion-88352 I would like all of this to be passable
c
Hi Ketan, We now support 3 different task types and introduce them in the new thread!