Skip to content

Commit

Permalink
Merge pull request #328 from ISISNeutronMuon/add-qprocess-job-launching
Browse files Browse the repository at this point in the history
Add qprocess job launching
  • Loading branch information
ChiCheng45 authored Mar 6, 2024
2 parents 59e3129 + 3bd69ae commit f29beef
Show file tree
Hide file tree
Showing 11 changed files with 600 additions and 98 deletions.
13 changes: 12 additions & 1 deletion MDANSE/Src/MDANSE/Framework/Jobs/IJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,24 @@ def combine(self):
self._status.update()

def _run_monoprocessor(self):
print(f"Monoprocessor run: expects {self.numberOfSteps} steps")
for index in range(self.numberOfSteps):
if self._status is not None:
if hasattr(self._status, "_pause_event"):
self._status._pause_event.wait()
idx, result = self.run_step(index)
if self._status is not None:
self._status.update()
self.combine(idx, result)

def _run_threadpool(self):
def helper(self, index):
if self._status is not None:
if hasattr(self._status, "_pause_event"):
self._status._pause_event.wait()
idx, result = self.run_step(index)
if self._status is not None:
self._status.update()
self.combine(idx, result)

pool = PoolExecutor(max_workers=self.configuration["running_mode"]["slots"])
Expand Down Expand Up @@ -302,7 +313,7 @@ def run(self, parameters, status=False):
self.initialize()

if self._status is not None:
self._status.start(self.numberOfSteps, rate=0.1)
self._status.start(self.numberOfSteps)
self._status.state["info"] = str(self)

if getattr(self, "numberOfSteps", 0) <= 0:
Expand Down
9 changes: 3 additions & 6 deletions MDANSE/Src/MDANSE/Framework/Status.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ def update(self, force=False):

self._deltas.append(lastUpdate)

if (
force
or (not self._currentStep % self._updateStep)
or (total_seconds(lastUpdate - self._lastRefresh) > 10)
):
if force or (total_seconds(lastUpdate - self._lastRefresh) > 5):
self._lastRefresh = lastUpdate

if self._nSteps is not None:
Expand All @@ -153,4 +149,5 @@ def update(self, force=False):
duration = datetime.timedelta(seconds=round(duration))
duration = convert_duration(total_seconds(duration))
self._eta = "%02dd:%02dh:%02dm:%02ds" % duration
self.update_status()

self.update_status()
227 changes: 227 additions & 0 deletions MDANSE_GUI/Src/MDANSE_GUI/Subprocess/JobState.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from abc import ABC, abstractmethod


class JobState(ABC):

_label = "JobState"
_allowed_actions = []

def __init__(self, parent):
self._parent = parent

@abstractmethod
def pause(self):
"""Pauses the process"""

@abstractmethod
def unpause(self):
"""Resumes the process execution"""

@abstractmethod
def start(self):
"""Starts the process"""

@abstractmethod
def terminate(self):
"""Instructs the process to stop"""

@abstractmethod
def kill(self):
"""Stops the process the hard way"""

@abstractmethod
def finish(self):
"""Reach the end of the run successfully"""

@abstractmethod
def fail(self):
"""Break down on before finishing"""


class Running(JobState):

_label = "Running"
_allowed_actions = [
"Pause",
"Terminate",
"Kill",
]

def pause(self):
"""Pauses the process"""
self._parent._pause_event.clear()
self._parent._current_state = self._parent._Paused

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""
self._parent._current_state = self._parent._Aborted

def kill(self):
"""Stops the process the hard way"""
self._parent._current_state = self._parent._Aborted

def finish(self):
"""Reach the end of the run successfully"""
self._parent.percent_complete = 100
self._parent._current_state = self._parent._Finished

def fail(self):
"""Break down on before finishing"""
self._parent._current_state = self._parent._Failed


class Aborted(JobState):

_label = "Aborted"
_allowed_actions = ["Delete"]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""


class Failed(JobState):

_label = "Failed"
_allowed_actions = ["Delete"]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""


class Finished(JobState):

_label = "Finished"
_allowed_actions = ["Delete"]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""


class Starting(JobState):

_label = "Starting"
_allowed_actions = [
"Pause",
"Terminate",
"Kill",
]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""
self._parent._current_state = self._parent._Running

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""
self._parent._current_state = self._parent._Failed


class Paused(JobState):

_label = "Paused"
_allowed_actions = [
"Resume",
"Terminate",
"Kill",
]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""
self._parent._pause_event.set()
self._parent._current_state = self._parent._Running

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""
self._parent._current_state = self._parent._Aborted

def kill(self):
"""Stops the process the hard way"""
self._parent._current_state = self._parent._Aborted

def finish(self):
"""Reach the end of the run successfully"""
self._parent.percent_complete = 100
self._parent._current_state = self._parent._Finished

def fail(self):
"""Break down on before finishing"""
85 changes: 85 additions & 0 deletions MDANSE_GUI/Src/MDANSE_GUI/Subprocess/JobStatusProcess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# **************************************************************************
#
# MDANSE: Molecular Dynamics Analysis for Neutron Scattering Experiments
#
# @file Src/PyQtGUI/DataViewModel/JobHolder.py
# @brief Subclass of QStandardItemModel for MDANSE jobs
#
# @homepage https://mdanse.org
# @license GNU General Public License v3 or higher (see LICENSE)
# @copyright Institut Laue Langevin 2013-now
# @copyright ISIS Neutron and Muon Source, STFC, UKRI 2021-now
# @authors Research Software Group at ISIS (see AUTHORS)
#
# **************************************************************************

from typing import Tuple
from multiprocessing import Pipe, Queue, Process, Event
from multiprocessing.connection import Connection

from icecream import ic
from qtpy.QtCore import QObject, Slot, Signal, QProcess, QThread, QMutex

from MDANSE.Framework.Status import Status


class JobCommunicator(QObject):
target = Signal(int)
progress = Signal(int)
finished = Signal(bool)
oscillate = Signal()

def status_update(self, input: Tuple):
key, value = input
if key == "FINISHED":
self.finished.emit(value)
elif key == "STEP":
self.progress.emit(value)
elif key == "STARTED":
if value is not None:
self.target.emit(value)
else:
self.oscillate.emit()
elif key == "COMMUNICATION":
print(f"Communication with the subprocess is now {value}")
self.finished.emit(value)


class JobStatusProcess(Status):
def __init__(
self, pipe: "Connection", queue: Queue, pause_event: "Event", **kwargs
):
super().__init__()
self._pipe = pipe
self._queue = queue
self._state = {} # for compatibility with JobStatus
self._progress_meter = 0
self._pause_event = pause_event
self._pause_event.set()

@property
def state(self):
return self._state

def finish_status(self):
ic()
self._pipe.send(("FINISHED", True))

def start_status(self):
ic()
try:
temp = int(self._nSteps)
except:
self._pipe.send(("STARTED", None))
else:
self._pipe.send(("STARTED", temp))
# self._updateStep = 1

def stop_status(self):
ic()
self._pipe.send(("FINISHED", False))

def update_status(self):
self._progress_meter += 1
temp = int(self._progress_meter) * self._updateStep
self._pipe.send(("STEP", temp))
Loading

0 comments on commit f29beef

Please sign in to comment.