eager-processor-63090
01/15/2025, 9:18 PMRuntimeError: Error encountered while executing 'job2':
Task <Task pending name='Task-57' coro=<AsyncAgentExecutorMixin._create() running at /Users/pryceturner/Desktop/slurm-flyte/flytekit/flytekit/extend/backend/base_agent.py:376> cb=[_run_until_complete_cb() at
/Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:181]> got Future <Future pending> attached to a different loop
eager-processor-63090
01/15/2025, 9:21 PMimport os
from flytekit import task, workflow
from flytekitplugins.slurm import Slurm, SlurmTask
job1 = SlurmTask(
name="job1",
task_config=Slurm(
slurm_host="slurm",
sbatch_conf={
"partition": "LocalQ",
"job-name": "tiny-slurm",
},
# batch_script_path="/home/ubuntu/slurm/scripts/parallel.sh",
batch_script_path="/home/ubuntu/slurm/scripts/check_args.sh",
batch_script_args=["1"],
)
)
job2 = SlurmTask(
name="job2",
task_config=Slurm(
slurm_host="slurm",
sbatch_conf={
"partition": "LocalQ",
"job-name": "tiny-slurm",
},
# batch_script_path="/home/ubuntu/slurm/scripts/parallel.sh",
batch_script_path="/home/ubuntu/slurm/scripts/check_args.sh",
batch_script_args=["2"],
)
)
@workflow
def wf() -> None:
job1()
job2()
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
runner = CliRunner()
path = os.path.realpath(__file__)
# Local run
print(f">>> LOCAL EXEC <<<")
result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "<s3://slurm-agent-test>", path, "wf"])
print(result.output)
eager-processor-63090
01/15/2025, 9:22 PM>>> LOCAL EXEC <<<
Running Execution on local.
Environment: {}
Command executed: sbatch --partition LocalQ --job-name tiny-slurm /home/ubuntu/slurm/scripts/check_args.sh 1
Subsystem: None
Exit status: 0
Exit signal: None
Return code: 0
Standard output: Submitted batch job 15
Standard error:
Environment: {}
Command executed: scontrol show job 15
Subsystem: None
Exit status: 0
Exit signal: None
Return code: 0
Standard output: JobId=15 JobName=tiny-slurm
UserId=ubuntu(1000) GroupId=ubuntu(1000) MCS_label=N/A
Priority=1 Nice=0 Account=(null) QOS=(null)
JobState=PENDING Reason=InvalidAccount Dependency=(null)
Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0
RunTime=00:00:00 TimeLimit=01:00:00 TimeMin=N/A
SubmitTime=2025-01-15T21:01:25 EligibleTime=2025-01-15T21:01:25
AccrueTime=2025-01-15T21:01:25
StartTime=Unknown EndTime=Unknown Deadline=N/A
SuspendTime=None SecsPreSuspend=0 LastSchedEval=2025-01-15T21:01:25 Scheduler=Main
Partition=LocalQ AllocNode:Sid=ip-172-31-7-156:1979
ReqNodeList=(null) ExcNodeList=(null)
NodeList=
NumNodes=1 NumCPUs=1 NumTasks=1 CPUs/Task=1 ReqB:S:C:T=0:0:*:*
ReqTRES=cpu=1,mem=10M,node=1,billing=1
AllocTRES=(null)
Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=*
MinCPUsNode=1 MinMemoryNode=10M MinTmpDiskNode=0
Features=(null) DelayBoot=00:00:00
OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null)
Command=/home/ubuntu/slurm/scripts/check_args.sh
WorkDir=/home/ubuntu
StdErr=/home/ubuntu/job_15.err
StdIn=/dev/null
StdOut=/home/ubuntu/job_15.out
Power=
TresPerTask=cpu:1
(x4)
Standard error:
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:194 in run │
│ │
│ 191 │ │ │ "asyncio.run() cannot be called from a running event loop") │
│ 192 │ │
│ 193 │ with Runner(debug=debug, loop_factory=loop_factory) as runner: │
│ ❱ 194 │ │ return runner.run(main) │
│ 195 │
│ 196 │
│ 197 def _cancel_all_tasks(loop): │
│ │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:118 in run │
│ │
│ 115 │ │ │
│ 116 │ │ self._interrupt_count = 0 │
│ 117 │ │ try: │
│ ❱ 118 │ │ │ return self._loop.run_until_complete(task) │
│ 119 │ │ except exceptions.CancelledError: │
│ 120 │ │ │ if self._interrupt_count > 0: │
│ 121 │ │ │ │ uncancel = getattr(task, "uncancel", None) │
│ │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:686 in │
│ run_until_complete │
│ │
│ 683 │ │ if not future.done(): │
│ 684 │ │ │ raise RuntimeError('Event loop stopped before Future completed.') │
│ 685 │ │ │
│ ❱ 686 │ │ return future.result() │
│ 687 │ │
│ 688 │ def stop(self): │
│ 689 │ │ """Stop running the event loop. │
│ │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4601 in run │
│ │
│ 4598 │ │ │
│ 4599 │ │ """ │
│ 4600 │ │ │
│ ❱ 4601 │ │ process = await self.create_process(*args, **kwargs) # type: ignore │
│ 4602 │ │ │
│ 4603 │ │ return await process.wait(check, timeout) │
│ 4604 │
│ │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4479 in create_process │
│ │
│ 4476 │ │ │
│ 4477 │ │ """ │
│ 4478 │ │ │
│ ❱ 4479 │ │ chan, process = await self.create_session( │
│ 4480 │ │ │ SSHClientProcess, *args, **kwargs) # type: ignore │
│ 4481 │ │ │
│ 4482 │ │ new_stdin: Optional[ProcessSource] = stdin │
│ │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4372 in create_session │
│ │
│ 4369 │ │ chan = SSHClientChannel(self, self._loop, encoding, errors, │
│ 4370 │ │ │ │ │ │ │ │ window, max_pktsize) │
│ 4371 │ │ │
│ ❱ 4372 │ │ session = await chan.create(session_factory, command, subsystem, │
│ 4373 │ │ │ │ │ │ │ │ │ new_env, request_pty, term_type, term_size, │
│ 4374 │ │ │ │ │ │ │ │ │ term_modes or {}, x11_forwarding, │
│ 4375 │ │ │ │ │ │ │ │ │ x11_display, x11_auth_path, │
│ │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/channel.py │
│ :1140 in create │
│ │
│ 1137 │ │ │
│ 1138 │ │ <http://self.logger.info|self.logger.info>('Requesting new SSH session') │
│ 1139 │ │ │
│ ❱ 1140 │ │ packet = await self._open(b'session') │
│ 1141 │ │ │
│ 1142 │ │ # Client sessions should have no extra data in the open confirmation │
│ 1143 │ │ packet.check_end() │
│ │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/channel.py │
│ :710 in _open │
│ │
│ 707 │ │ │ │ │ │ │ UInt32(self._recv_window), │
│ 708 │ │ │ │ │ │ │ UInt32(self._recv_pktsize), *args, handler=self) │
│ 709 │ │ │
│ ❱ 710 │ │ return await self._open_waiter │
│ 711 │ │
│ 712 │ def send_packet(self, pkttype: int, *args: bytes) -> None: │
│ 713 │ │ """Send a packet on the channel""" │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
RuntimeError: Error encountered while executing 'job2':
Task <Task pending name='Task-57' coro=<AsyncAgentExecutorMixin._create() running at /Users/pryceturner/Desktop/slurm-flyte/flytekit/flytekit/extend/backend/base_agent.py:376> cb=[_run_until_complete_cb() at
/Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:181]> got Future <Future pending> attached to a different loop
eager-processor-63090
01/15/2025, 9:22 PMeager-processor-63090
01/15/2025, 9:31 PMcreamy-shampoo-53278
01/16/2025, 1:28 PMcreamy-shampoo-53278
01/16/2025, 2:01 PM_get
method for job1
will be accessed by _create
of job2
because we write a condition in create
method of agent (as shown in the fig). After comment if self._conn is None
out and always re-connecting the Slurm host, this workflow works!
After some survey, I got that asyncio.run()
always creates a new event loop, so the issue mentioned above occurs. But, I still need to digest the underlying mechanism (don't know why the connection can't be shared between two event loops 😓).eager-processor-63090
01/16/2025, 6:56 PMeager-processor-63090
01/17/2025, 12:53 AMdamp-lion-88352
01/17/2025, 2:22 PM