Skip to content

Commit

Permalink
DirectScheduler: Ensure killing child processes (#6572)
Browse files Browse the repository at this point in the history
The current implementation only issues a kill command for the
parent process, but this can leave child processes orphaned. The
child processes are now retrieved and added explicitly to the
kill command.

Cherry-pick: fddffca
  • Loading branch information
agoscinski committed Nov 4, 2024
1 parent d215606 commit 2fe62e4
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
###########################################################################
"""Plugin for direct execution."""

from typing import Union

import aiida.schedulers
from aiida.common.escaping import escape_for_bash
from aiida.schedulers import SchedulerError
Expand Down Expand Up @@ -352,13 +354,28 @@ def _parse_submit_output(self, retval, stdout, stderr):

return stdout.strip()

def _get_kill_command(self, jobid):
"""Return the command to kill the job with specified jobid."""
submit_command = f'kill {jobid}'
def _get_kill_command(self, jobid: Union[int, str]) -> str:
"""Return the command to kill the process with specified id and all its descendants.
:param jobid: The job id is in the case of the
:py:class:`~aiida.schedulers.plugins.direct.DirectScheduler` the process id.
:return: A string containing the kill command.
"""
from psutil import Process

# get a list of the process id of all descendants
process = Process(int(jobid))
children = process.children(recursive=True)
jobids = [str(jobid)]
jobids.extend([str(child.pid) for child in children])
jobids_str = ' '.join(jobids)

kill_command = f'kill {jobids_str}'

self.logger.info(f'killing job {jobid}')

return submit_command
return kill_command

def _parse_kill_output(self, retval, stdout, stderr):
"""Parse the output of the kill command.
Expand Down
38 changes: 38 additions & 0 deletions tests/schedulers/test_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,41 @@ def test_submit_script_with_num_cores_per_mpiproc(scheduler, template):
)
result = scheduler.get_submit_script(template)
assert f'export OMP_NUM_THREADS={num_cores_per_mpiproc}' in result


@pytest.mark.timeout(timeout=10)
def test_kill_job(scheduler, tmpdir):
"""Test if kill_job kill all descendant children from the process.
For that we spawn a new process that runs a sleep command, then we
kill it and check if the sleep process is still alive.
current process forked process run script.sh
python─────────────python───────────────────bash──────sleep
we kill this process we check if still running
"""
import multiprocessing
import time

from aiida.transports.plugins.local import LocalTransport
from psutil import Process

def run_sleep_100():
import subprocess

script = tmpdir / 'sleep.sh'
script.write('sleep 100')
# this is blocking for the process entering
subprocess.run(['bash', script.strpath], check=False)

forked_process = multiprocessing.Process(target=run_sleep_100)
forked_process.start()
while len(forked_process_children := Process(forked_process.pid).children(recursive=True)) != 2:
time.sleep(0.1)
bash_process = forked_process_children[0]
sleep_process = forked_process_children[1]
with LocalTransport() as transport:
scheduler.set_transport(transport)
scheduler.kill_job(forked_process.pid)
while bash_process.is_running() or sleep_process.is_running():
time.sleep(0.1)
forked_process.join()

0 comments on commit 2fe62e4

Please sign in to comment.