Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add qprocess job launching #328

Merged
merged 15 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion MDANSE/Src/MDANSE/Framework/Jobs/IJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,24 @@ def combine(self):
self._status.update()

def _run_monoprocessor(self):
print(f"Monoprocessor run: expects {self.numberOfSteps} steps")
for index in range(self.numberOfSteps):
if self._status is not None:
if hasattr(self._status, "_pause_event"):
self._status._pause_event.wait()
idx, result = self.run_step(index)
if self._status is not None:
self._status.update()
self.combine(idx, result)

def _run_threadpool(self):
def helper(self, index):
if self._status is not None:
if hasattr(self._status, "_pause_event"):
self._status._pause_event.wait()
idx, result = self.run_step(index)
if self._status is not None:
self._status.update()
self.combine(idx, result)

pool = PoolExecutor(max_workers=self.configuration["running_mode"]["slots"])
Expand Down Expand Up @@ -302,7 +313,7 @@ def run(self, parameters, status=False):
self.initialize()

if self._status is not None:
self._status.start(self.numberOfSteps, rate=0.1)
self._status.start(self.numberOfSteps)
self._status.state["info"] = str(self)

if getattr(self, "numberOfSteps", 0) <= 0:
Expand Down
9 changes: 3 additions & 6 deletions MDANSE/Src/MDANSE/Framework/Status.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ def update(self, force=False):

self._deltas.append(lastUpdate)

if (
force
or (not self._currentStep % self._updateStep)
or (total_seconds(lastUpdate - self._lastRefresh) > 10)
):
if force or (total_seconds(lastUpdate - self._lastRefresh) > 5):
self._lastRefresh = lastUpdate

if self._nSteps is not None:
Expand All @@ -153,4 +149,5 @@ def update(self, force=False):
duration = datetime.timedelta(seconds=round(duration))
duration = convert_duration(total_seconds(duration))
self._eta = "%02dd:%02dh:%02dm:%02ds" % duration
self.update_status()

self.update_status()
227 changes: 227 additions & 0 deletions MDANSE_GUI/Src/MDANSE_GUI/Subprocess/JobState.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from abc import ABC, abstractmethod


class JobState(ABC):

_label = "JobState"
_allowed_actions = []

def __init__(self, parent):
self._parent = parent

@abstractmethod
def pause(self):
"""Pauses the process"""

@abstractmethod
def unpause(self):
"""Resumes the process execution"""

@abstractmethod
def start(self):
"""Starts the process"""

@abstractmethod
def terminate(self):
"""Instructs the process to stop"""

@abstractmethod
def kill(self):
"""Stops the process the hard way"""

@abstractmethod
def finish(self):
"""Reach the end of the run successfully"""

@abstractmethod
def fail(self):
"""Break down on before finishing"""


class Running(JobState):

_label = "Running"
_allowed_actions = [
"Pause",
"Terminate",
"Kill",
]

def pause(self):
"""Pauses the process"""
self._parent._pause_event.clear()
self._parent._current_state = self._parent._Paused

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""
self._parent._current_state = self._parent._Aborted

def kill(self):
"""Stops the process the hard way"""
self._parent._current_state = self._parent._Aborted

def finish(self):
"""Reach the end of the run successfully"""
self._parent.percent_complete = 100
self._parent._current_state = self._parent._Finished

def fail(self):
"""Break down on before finishing"""
self._parent._current_state = self._parent._Failed


class Aborted(JobState):

_label = "Aborted"
_allowed_actions = ["Delete"]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""


class Failed(JobState):

_label = "Failed"
_allowed_actions = ["Delete"]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""


class Finished(JobState):

_label = "Finished"
_allowed_actions = ["Delete"]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""


class Starting(JobState):

_label = "Starting"
_allowed_actions = [
"Pause",
"Terminate",
"Kill",
]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""

def start(self):
"""Starts the process"""
self._parent._current_state = self._parent._Running

def terminate(self):
"""Instructs the process to stop"""

def kill(self):
"""Stops the process the hard way"""

def finish(self):
"""Reach the end of the run successfully"""

def fail(self):
"""Break down on before finishing"""
self._parent._current_state = self._parent._Failed


class Paused(JobState):

_label = "Paused"
_allowed_actions = [
"Resume",
"Terminate",
"Kill",
]

def pause(self):
"""Pauses the process"""

def unpause(self):
"""Resumes the process execution"""
self._parent._pause_event.set()
self._parent._current_state = self._parent._Running

def start(self):
"""Starts the process"""

def terminate(self):
"""Instructs the process to stop"""
self._parent._current_state = self._parent._Aborted

def kill(self):
"""Stops the process the hard way"""
self._parent._current_state = self._parent._Aborted

def finish(self):
"""Reach the end of the run successfully"""
self._parent.percent_complete = 100
self._parent._current_state = self._parent._Finished

def fail(self):
"""Break down on before finishing"""
85 changes: 85 additions & 0 deletions MDANSE_GUI/Src/MDANSE_GUI/Subprocess/JobStatusProcess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# **************************************************************************
#
# MDANSE: Molecular Dynamics Analysis for Neutron Scattering Experiments
#
# @file Src/PyQtGUI/DataViewModel/JobHolder.py
# @brief Subclass of QStandardItemModel for MDANSE jobs
#
# @homepage https://mdanse.org
# @license GNU General Public License v3 or higher (see LICENSE)
# @copyright Institut Laue Langevin 2013-now
# @copyright ISIS Neutron and Muon Source, STFC, UKRI 2021-now
# @authors Research Software Group at ISIS (see AUTHORS)
#
# **************************************************************************

from typing import Tuple
from multiprocessing import Pipe, Queue, Process, Event
from multiprocessing.connection import Connection

from icecream import ic
from qtpy.QtCore import QObject, Slot, Signal, QProcess, QThread, QMutex

from MDANSE.Framework.Status import Status


class JobCommunicator(QObject):
target = Signal(int)
progress = Signal(int)
finished = Signal(bool)
oscillate = Signal()

def status_update(self, input: Tuple):
key, value = input
if key == "FINISHED":
self.finished.emit(value)
elif key == "STEP":
self.progress.emit(value)
elif key == "STARTED":
if value is not None:
self.target.emit(value)
else:
self.oscillate.emit()
elif key == "COMMUNICATION":
print(f"Communication with the subprocess is now {value}")
self.finished.emit(value)


class JobStatusProcess(Status):
def __init__(
self, pipe: "Connection", queue: Queue, pause_event: "Event", **kwargs
):
super().__init__()
self._pipe = pipe
self._queue = queue
self._state = {} # for compatibility with JobStatus
self._progress_meter = 0
self._pause_event = pause_event
self._pause_event.set()

@property
def state(self):
return self._state

def finish_status(self):
ic()
self._pipe.send(("FINISHED", True))

def start_status(self):
ic()
try:
temp = int(self._nSteps)
except:
self._pipe.send(("STARTED", None))
else:
self._pipe.send(("STARTED", temp))
# self._updateStep = 1

def stop_status(self):
ic()
self._pipe.send(("FINISHED", False))

def update_status(self):
self._progress_meter += 1
temp = int(self._progress_meter) * self._updateStep
self._pipe.send(("STEP", temp))
Loading
Loading