```RuntimeError: Error encountered while executing...
# slurm-flyte-wg
e
Copy code
RuntimeError: Error encountered while executing 'job2':
  Task <Task pending name='Task-57' coro=<AsyncAgentExecutorMixin._create() running at /Users/pryceturner/Desktop/slurm-flyte/flytekit/flytekit/extend/backend/base_agent.py:376> cb=[_run_until_complete_cb() at
/Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:181]> got Future <Future pending> attached to a different loop
Copy code
import os

from flytekit import task, workflow
from flytekitplugins.slurm import Slurm, SlurmTask


job1 = SlurmTask(
    name="job1",
    task_config=Slurm(
        slurm_host="slurm",
        sbatch_conf={
            "partition": "LocalQ",
            "job-name": "tiny-slurm",
        },
        # batch_script_path="/home/ubuntu/slurm/scripts/parallel.sh",
        batch_script_path="/home/ubuntu/slurm/scripts/check_args.sh",
        batch_script_args=["1"],
    )
)

job2 = SlurmTask(
    name="job2",
    task_config=Slurm(
        slurm_host="slurm",
        sbatch_conf={
            "partition": "LocalQ",
            "job-name": "tiny-slurm",
        },
        # batch_script_path="/home/ubuntu/slurm/scripts/parallel.sh",
        batch_script_path="/home/ubuntu/slurm/scripts/check_args.sh",
        batch_script_args=["2"],
    )
)


@workflow
def wf() -> None:
    job1()
    job2()


if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = os.path.realpath(__file__)

    # Local run
    print(f">>> LOCAL EXEC <<<")
    result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "<s3://slurm-agent-test>", path, "wf"])
    print(result.output)
Copy code
>>> LOCAL EXEC <<<
Running Execution on local.
Environment: {}
Command executed: sbatch --partition LocalQ --job-name tiny-slurm /home/ubuntu/slurm/scripts/check_args.sh 1
Subsystem: None
Exit status: 0
Exit signal: None
Return code: 0
Standard output: Submitted batch job 15

Standard error: 
Environment: {}
Command executed: scontrol show job 15
Subsystem: None
Exit status: 0
Exit signal: None
Return code: 0
Standard output: JobId=15 JobName=tiny-slurm
   UserId=ubuntu(1000) GroupId=ubuntu(1000) MCS_label=N/A
   Priority=1 Nice=0 Account=(null) QOS=(null)
   JobState=PENDING Reason=InvalidAccount Dependency=(null)
   Requeue=1 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0
   RunTime=00:00:00 TimeLimit=01:00:00 TimeMin=N/A
   SubmitTime=2025-01-15T21:01:25 EligibleTime=2025-01-15T21:01:25
   AccrueTime=2025-01-15T21:01:25
   StartTime=Unknown EndTime=Unknown Deadline=N/A
   SuspendTime=None SecsPreSuspend=0 LastSchedEval=2025-01-15T21:01:25 Scheduler=Main
   Partition=LocalQ AllocNode:Sid=ip-172-31-7-156:1979
   ReqNodeList=(null) ExcNodeList=(null)
   NodeList=
   NumNodes=1 NumCPUs=1 NumTasks=1 CPUs/Task=1 ReqB:S:C:T=0:0:*:*
   ReqTRES=cpu=1,mem=10M,node=1,billing=1
   AllocTRES=(null)
   Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=*
   MinCPUsNode=1 MinMemoryNode=10M MinTmpDiskNode=0
   Features=(null) DelayBoot=00:00:00
   OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null)
   Command=/home/ubuntu/slurm/scripts/check_args.sh
   WorkDir=/home/ubuntu
   StdErr=/home/ubuntu/job_15.err
   StdIn=/dev/null
   StdOut=/home/ubuntu/job_15.out
   Power=
   TresPerTask=cpu:1
   


(x4)
   


Standard error: 

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:194 in run           │
│                                                                                                  │
│   191 │   │   │   "asyncio.run() cannot be called from a running event loop")                    │
│   192 │                                                                                          │
│   193 │   with Runner(debug=debug, loop_factory=loop_factory) as runner:                         │
│ ❱ 194 │   │   return runner.run(main)                                                            │
│   195                                                                                            │
│   196                                                                                            │
│   197 def _cancel_all_tasks(loop):                                                               │
│                                                                                                  │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/runners.py:118 in run           │
│                                                                                                  │
│   115 │   │                                                                                      │
│   116 │   │   self._interrupt_count = 0                                                          │
│   117 │   │   try:                                                                               │
│ ❱ 118 │   │   │   return self._loop.run_until_complete(task)                                     │
│   119 │   │   except exceptions.CancelledError:                                                  │
│   120 │   │   │   if self._interrupt_count > 0:                                                  │
│   121 │   │   │   │   uncancel = getattr(task, "uncancel", None)                                 │
│                                                                                                  │
│ /Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:686 in           │
│ run_until_complete                                                                               │
│                                                                                                  │
│    683 │   │   if not future.done():                                                             │
│    684 │   │   │   raise RuntimeError('Event loop stopped before Future completed.')             │
│    685 │   │                                                                                     │
│ ❱  686 │   │   return future.result()                                                            │
│    687 │                                                                                         │
│    688 │   def stop(self):                                                                       │
│    689 │   │   """Stop running the event loop.                                                   │
│                                                                                                  │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4601 in run                                                                                  │
│                                                                                                  │
│   4598 │   │                                                                                     │
│   4599 │   │   """                                                                               │
│   4600 │   │                                                                                     │
│ ❱ 4601 │   │   process = await self.create_process(*args, **kwargs) # type: ignore               │
│   4602 │   │                                                                                     │
│   4603 │   │   return await process.wait(check, timeout)                                         │
│   4604                                                                                           │
│                                                                                                  │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4479 in create_process                                                                       │
│                                                                                                  │
│   4476 │   │                                                                                     │
│   4477 │   │   """                                                                               │
│   4478 │   │                                                                                     │
│ ❱ 4479 │   │   chan, process = await self.create_session(                                        │
│   4480 │   │   │   SSHClientProcess, *args, **kwargs) # type: ignore                             │
│   4481 │   │                                                                                     │
│   4482 │   │   new_stdin: Optional[ProcessSource] = stdin                                        │
│                                                                                                  │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/connection │
│ .py:4372 in create_session                                                                       │
│                                                                                                  │
│   4369 │   │   chan = SSHClientChannel(self, self._loop, encoding, errors,                       │
│   4370 │   │   │   │   │   │   │   │   window, max_pktsize)                                      │
│   4371 │   │                                                                                     │
│ ❱ 4372 │   │   session = await chan.create(session_factory, command, subsystem,                  │
│   4373 │   │   │   │   │   │   │   │   │   new_env, request_pty, term_type, term_size,           │
│   4374 │   │   │   │   │   │   │   │   │   term_modes or {}, x11_forwarding,                     │
│   4375 │   │   │   │   │   │   │   │   │   x11_display, x11_auth_path,                           │
│                                                                                                  │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/channel.py │
│ :1140 in create                                                                                  │
│                                                                                                  │
│   1137 │   │                                                                                     │
│   1138 │   │   <http://self.logger.info|self.logger.info>('Requesting new SSH session')                                    │
│   1139 │   │                                                                                     │
│ ❱ 1140 │   │   packet = await self._open(b'session')                                             │
│   1141 │   │                                                                                     │
│   1142 │   │   # Client sessions should have no extra data in the open confirmation              │
│   1143 │   │   packet.check_end()                                                                │
│                                                                                                  │
│ /Users/pryceturner/Desktop/slurm-flyte/.slu_env/lib/python3.12/site-packages/asyncssh/channel.py │
│ :710 in _open                                                                                    │
│                                                                                                  │
│    707 │   │   │   │   │   │   │      UInt32(self._recv_window),                                 │
│    708 │   │   │   │   │   │   │      UInt32(self._recv_pktsize), *args, handler=self)           │
│    709 │   │                                                                                     │
│ ❱  710 │   │   return await self._open_waiter                                                    │
│    711 │                                                                                         │
│    712 │   def send_packet(self, pkttype: int, *args: bytes) -> None:                            │
│    713 │   │   """Send a packet on the channel"""                                                │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
RuntimeError: Error encountered while executing 'job2':
  Task <Task pending name='Task-57' coro=<AsyncAgentExecutorMixin._create() running at /Users/pryceturner/Desktop/slurm-flyte/flytekit/flytekit/extend/backend/base_agent.py:376> cb=[_run_until_complete_cb() at
/Users/pryceturner/.pyenv/versions/3.12.8/lib/python3.12/asyncio/base_events.py:181]> got Future <Future pending> attached to a different loop
Seems like a race in the event loop maybe?
It will run job1 but then fail
c
Interesting! Let me try it!
I thought I found the issue! The SSH connection object of
_get
method for
job1
will be accessed by
_create
of
job2
because we write a condition in
create
method of agent (as shown in the fig). After comment
if self._conn is None
out and always re-connecting the Slurm host, this workflow works! After some survey, I got that
asyncio.run()
always creates a new event loop, so the issue mentioned above occurs. But, I still need to digest the underlying mechanism (don't know why the connection can't be shared between two event loops 😓).
e
Async be can hard 😅
Thanks for digging into it! From the auth logs it looks like the session is terminated after job submittal, maybe that's why it can't be reused? For the time being I'll comment that line out to create new connection every time.
d
I've told him about loglinks and message