creamy-shampoo-53278
01/07/2025, 4:18 PMShellTask
, but there's still room for improvement!
Following describes how it works:
• To mimic NFS, we put a simple python script on Slurm cluster in advance
• To run the user-defined batch script on Slurm cluster, we write script content to a tmp file and transfer it to cluster through SFTP
• Within create
method, we construct a sbatch
command based on user-defined sbatch
options and run this cmd on clustercreamy-shampoo-53278
01/07/2025, 4:20 PMimport os
from flytekit import task, workflow
from flytekitplugins.slurm import Slurm, SlurmTask
echo = SlurmTask(
name="echo",
script="""#!/bin/bash
# We can define sbatch options here
echo "Demo Slurm agent with ShellTask...\n"
# Run a demo python script on Slurm
echo ">>> Run a Demo Python Script <<<"
python3 demo.py
""",
task_config=Slurm(
slurm_host="slurm",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
)
)
@workflow
def wf() -> None:
echo()
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
runner = CliRunner()
path = os.path.realpath(__file__)
# Local run
print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "<s3://flyte-slurm-agent>", path, "wf"])
print(result.output)
creamy-shampoo-53278
01/07/2025, 4:23 PMcreamy-shampoo-53278
01/07/2025, 4:31 PM/root
to align with the task pod behavior
2. Better handle issues related to _config_task_instance
inside ShellTask
(Many tmp workarounds now)eager-processor-63090
01/07/2025, 4:53 PMeager-processor-63090
01/07/2025, 4:53 PMfreezing-airport-6809
freezing-airport-6809
creamy-shampoo-53278
01/08/2025, 10:20 AMcreamy-shampoo-53278
01/08/2025, 10:25 AMeager-processor-63090
01/09/2025, 3:24 AMecho_job = SlurmTask(
name="echo-job-name",
script="""#!/bin/bash
echo "Working!" >> /home/ubuntu/slurm/remote_touch.txt
""",
task_config=Slurm(
slurm_host="slurm",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
)
)
results in
>>> LOCAL EXEC <<<
Running Execution on local.
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/pryceturner/Desktop/slurm-flyte/shelltask.py:24 in wf │
│ │
│ 21 │
│ 22 @workflow │
│ 23 def wf() -> None: │
│ ❱ 24 │ echo_job() │
│ 25 │
│ 26 │
│ 27 if __name__ == "__main__": │
│ │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:194 in run │
│ │
│ 191 │ │ │ "asyncio.run() cannot be called from a running event loop") │
│ 192 │ │
│ 193 │ with Runner(debug=debug, loop_factory=loop_factory) as runner: │
│ ❱ 194 │ │ return runner.run(main) │
│ 195 │
│ 196 │
│ 197 def _cancel_all_tasks(loop): │
│ │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:118 in run │
│ │
│ 115 │ │ │
│ 116 │ │ self._interrupt_count = 0 │
│ 117 │ │ try: │
│ ❱ 118 │ │ │ return self._loop.run_until_complete(task) │
│ 119 │ │ except exceptions.CancelledError: │
│ 120 │ │ │ if self._interrupt_count > 0: │
│ 121 │ │ │ │ uncancel = getattr(task, "uncancel", None) │
│ │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:686 in │
│ run_until_complete │
│ │
│ 683 │ │ if not future.done(): │
│ 684 │ │ │ raise RuntimeError('Event loop stopped before Future completed.') │
│ 685 │ │ │
│ ❱ 686 │ │ return future.result() │
│ 687 │ │
│ 688 │ def stop(self): │
│ 689 │ │ """Stop running the event loop. │
│ │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4603 in run │
│ │
│ 4600 │ │ │
│ 4601 │ │ process = await self.create_process(*args, **kwargs) # type: ignore │
│ 4602 │ │ │
│ ❱ 4603 │ │ return await process.wait(check, timeout) │
│ 4604 │ │
│ 4605 │ async def create_connection( │
│ 4606 │ │ │ self, session_factory: SSHTCPSessionFactory[AnyStr], │
│ │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/process.py │
│ :1572 in wait │
│ │
│ 1569 │ │ │ │ │ │ │ stderr_data) from None │
│ 1570 │ │ │
│ 1571 │ │ if check and self.exit_status: │
│ ❱ 1572 │ │ │ raise ProcessError(self.env, self.command, self.subsystem, │
│ 1573 │ │ │ │ │ │ │ self.exit_status, self.exit_signal, │
│ 1574 │ │ │ │ │ │ │ self.returncode, stdout_data, stderr_data) │
│ 1575 │ │ else: │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
ProcessError: Error encountered while executing 'echo-job-name':
Process exited with non-zero exit status 1
I haven't had a chance to dig into it too much but I can see from the auth logs that an SSH session is being created and closed.eager-processor-63090
01/09/2025, 3:29 AMfreezing-airport-6809
creamy-shampoo-53278
01/09/2025, 3:12 PMSlurmTask
.freezing-airport-6809
freezing-airport-6809
creamy-shampoo-53278
01/09/2025, 3:41 PMcreamy-shampoo-53278
01/09/2025, 3:42 PMtask_config
and remove SFTP things during create
.
echo_job = SlurmTask(
name="echo-job-name",
task_config=Slurm(
slurm_host="slurm",
# Specify a remote Slurm batch script
batch_script_path="/home/abaowei/echo.slurm",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
)
)
freezing-airport-6809
creamy-shampoo-53278
01/09/2025, 3:46 PMfreezing-airport-6809
freezing-airport-6809
my-script --test1 ... --blah asdasd
eager-processor-63090
01/09/2025, 10:59 PMcreamy-shampoo-53278
01/09/2025, 11:36 PMeager-processor-63090
01/09/2025, 11:38 PMcreamy-shampoo-53278
01/09/2025, 11:38 PMeager-processor-63090
01/09/2025, 11:39 PMcreamy-shampoo-53278
01/09/2025, 11:39 PMeager-processor-63090
01/10/2025, 12:12 AMcreamy-shampoo-53278
01/10/2025, 12:15 PMeager-processor-63090
01/10/2025, 6:00 PMcreamy-shampoo-53278
01/11/2025, 2:15 AMPythonTask
instead of ShellTask
?
• Would it be good to write the doc to demonstrate how to test Slurm agent locally in the simplest form, just like we use here?
I'm open to discussing further progress. If you have any plans, please feel free to let me know. Thanks so much 🙏freezing-airport-6809
creamy-shampoo-53278
01/11/2025, 3:23 AMcreamy-shampoo-53278
01/11/2025, 3:36 PMdamp-lion-88352
01/13/2025, 1:27 PMcreamy-shampoo-53278
01/13/2025, 2:25 PMfreezing-airport-6809
tk = Slurm(
batch_task_script_command="/blah/blah {inputs.x} {inputs.y}",
inputs=kwargs(x=int, y=FlyteFile),
sbatch_conf={
"some-conf": "{inputs.x}",
...
},
)
freezing-airport-6809
creamy-shampoo-53278
01/17/2025, 3:04 PM