damp-lion-88352
01/15/2025, 3:10 PMcreamy-shampoo-53278
01/15/2025, 5:01 PMPythonFunctionTask
tomorrow!eager-processor-63090
01/15/2025, 5:23 PMcreamy-shampoo-53278
01/16/2025, 1:14 PMPythonFunctionTask
is now added! We'll briefly describe how tasks/agents are categorized. We first split them into script/
and function/
under slurm plugin root. For the former, we have:
1. SlurmTask
: Executes a batch/shell script on the Slurm host
2. `SlurmShellTask`: Execute an user-defined batch/shell script from localhost (e.g., our local dev pc)
As for the latter, we write an additional SlurmFunctionTask
which is inherited from PythonFunctionTask
. This supports running an user-defined function in the task
body by executing flytekit entrypoints on the Slurm host.
As the structure is slightly changed and tasks are renamed, we need to modify the interface a little bit!creamy-shampoo-53278
01/16/2025, 1:23 PMcreamy-shampoo-53278
01/16/2025, 2:07 PMSlurmTask
import os
from flytekit import workflow
from flytekitplugins.slurm import SlurmRemoteScript, SlurmTask
echo_job = SlurmTask(
name="<task-name>",
task_config=SlurmRemoteScript(
slurm_host="<slurm-host>",
# Specify a remote Slurm batch script
batch_script_path="<remote-batch-script-path>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
)
)
@workflow
def wf() -> None:
echo_job()
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
runner = CliRunner()
path = os.path.realpath(__file__)
# Local run
print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", path, "wf"])
print(result.output)
creamy-shampoo-53278
01/16/2025, 2:08 PMSlurmShellTask
import os
from flytekit import workflow
from flytekitplugins.slurm import Slurm, SlurmShellTask
echo = SlurmShellTask(
name="echo-slurm-shell-task",
script="""#!/bin/bash
# We can define sbatch options here
echo "Demo Slurm agent with SlurmShellTask...\n"
# Run a demo python script on Slurm
echo "Working!"
""",
task_config=Slurm(
slurm_host="<slurm-host>",
sbatch_conf={
"partition": "debug",
"job-name": "tiny-slurm",
}
),
)
@workflow
def wf() -> None:
echo()
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
runner = CliRunner()
path = os.path.realpath(__file__)
# Local run
print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", path, "wf"])
print(result.output)
creamy-shampoo-53278
01/16/2025, 2:09 PMSlurmFunctionTask
import os
from typing import Any, Dict
from flytekit import kwtypes, task, workflow, ImageSpec
from flytekitplugins.slurm import SlurmFunction
@task(
task_config=SlurmFunction(
slurm_host="<slurm-host>",
srun_conf={
"partition": "debug",
"job-name": "demo-slurm-function-task",
"chdir": "<working-dir>"
}
)
)
def plus_one(x: int) -> int:
return x + 1
@task
def greet(year: int) -> str:
return f"Hello {year}!!!"
@workflow
def wf(x: int) -> str:
x = plus_one(x=x)
msg = greet(year=x)
return msg
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
runner = CliRunner()
path = os.path.realpath(__file__)
# Local run
print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "<s3-url>", path, "wf", "--x", 2024])
print(result.output)