diff --git a/_modules/biomero/slurm_client.html b/_modules/biomero/slurm_client.html index 6dbc28d..2ff0a1c 100644 --- a/_modules/biomero/slurm_client.html +++ b/_modules/biomero/slurm_client.html @@ -113,6 +113,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional, Tuple, Any
+from uuid import UUID
from fabric import Connection, Result
from fabric.transfer import Result as TransferResult
from invoke.exceptions import UnexpectedExit
@@ -129,6 +130,12 @@ Source code for biomero.slurm_client
from importlib_resources import files
import io
import os
+from biomero.eventsourcing import WorkflowTracker, NoOpWorkflowTracker
+from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics, WorkflowProgress
+from biomero.database import EngineManager, JobProgressView, JobView, TaskExecution, WorkflowProgressView
+from eventsourcing.system import System, SingleThreadedRunner
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.sql import text
logger = logging.getLogger(__name__)
@@ -145,19 +152,20 @@ Source code for biomero.slurm_client
submit_result (Result): The result of submitting the job.
ok (bool): Indicates whether the job submission was successful.
job_state (str): The current state of the Slurm job.
- error_message (str): The error message, if any.
-
- Args:
- submit_result (Result): The result of submitting the job.
- job_id (int): The Slurm job ID.
+ progress (str): The progress of the Slurm job.
+ error_message (str): The error message, if any, encountered during job submission.
+ wf_id (UUID): The workflow ID associated with the job.
+ task_id (UUID): The task ID within the workflow.
+ slurm_polling_interval (int): The polling interval (in seconds) for checking the job status.
Example:
# Submit some job with the SlurmClient
- submit_result, job_id = slurmClient.run_workflow(
- workflow_name, workflow_version, input_data, email, time, **kwargs)
+ submit_result, job_id, wf_id, task_id = slurmClient.run_workflow(
+ workflow_name, workflow_version, input_data, email, time, wf_id,
+ **kwargs)
# Create a SlurmJob instance
- slurmJob = SlurmJob(submit_result, job_id)
+ slurmJob = SlurmJob(submit_result, job_id, wf_id, task_id)
if not slurmJob.ok:
logger.warning(f"Error with job: {slurmJob.get_error()}")
@@ -173,21 +181,33 @@ Source code for biomero.slurm_client
raise e
"""
+ SLURM_POLLING_INTERVAL = 10 # seconds
def __init__(self,
submit_result: Result,
- job_id: int):
+ job_id: int,
+ wf_id: UUID,
+ task_id: UUID,
+ slurm_polling_interval: int = SLURM_POLLING_INTERVAL):
"""
Initialize a SlurmJob instance.
Args:
submit_result (Result): The result of submitting the job.
job_id (int): The Slurm job ID.
+ wf_id (UUID): The workflow ID associated with this job.
+ task_id (UUID): The task ID within the workflow.
+ slurm_polling_interval (int, optional): The interval in seconds for
+ polling the job status. Defaults to SLURM_POLLING_INTERVAL.
"""
self.job_id = job_id
+ self.wf_id = wf_id
+ self.task_id = task_id
+ self.slurm_polling_interval = slurm_polling_interval
self.submit_result = submit_result
self.ok = self.submit_result.ok
self.job_state = None
+ self.progress = None
self.error_message = self.submit_result.stderr if hasattr(self.submit_result, 'stderr') else ''
[docs] def wait_for_completion(self, slurmClient, omeroConn) -> str:
@@ -211,6 +231,7 @@ Source code for biomero.slurm_client
"TIMEOUT+"):
job_status_dict, poll_result = slurmClient.check_job_status(
[self.job_id])
+ self.progress = slurmClient.get_active_job_progress(self.job_id)
if not poll_result.ok:
logger.warning(
f"Error checking job status:{poll_result.stderr}")
@@ -219,7 +240,11 @@ Source code for biomero.slurm_client
self.job_state = job_status_dict[self.job_id]
# wait for 10 seconds before checking again
omeroConn.keepAlive() # keep the OMERO connection alive
- timesleep.sleep(10)
+ slurmClient.workflowTracker.update_task_status(self.task_id,
+ self.job_state)
+ slurmClient.workflowTracker.update_task_progress(
+ self.task_id, self.progress)
+ timesleep.sleep(self.slurm_polling_interval)
logger.info(f"Job {self.job_id} finished: {self.job_state}")
logger.info(
f"You can get the logfile using `Slurm Get Update` on job {self.job_id}")
@@ -301,6 +326,7 @@ Source code for biomero.slurm_client
containing the Slurm job submission scripts. Optional.
Example:
+
# Create a SlurmClient object as contextmanager
with SlurmClient.from_config() as client:
@@ -319,6 +345,7 @@ Source code for biomero.slurm_client
print(result.stdout)
Example 2:
+
# Create a SlurmClient and setup Slurm (download containers etc.)
with SlurmClient.from_config(init_slurm=True) as client:
@@ -376,82 +403,100 @@ Source code for biomero.slurm_client
slurm_script_path: str = _DEFAULT_SLURM_GIT_SCRIPT_PATH,
slurm_script_repo: str = None,
init_slurm: bool = False,
- ):
- """Initializes a new instance of the SlurmClient class.
+ track_workflows: bool = True,
+ enable_job_accounting: bool = True,
+ enable_job_progress: bool = True,
+ enable_workflow_analytics: bool = True,
+ sqlalchemy_url: str = None):
+ """
+ Initializes a new instance of the SlurmClient class.
- It is preferable to use #from_config(...) method to initialize
- parameters from a config file.
+ It is preferable to use the `#from_config(...)` method to initialize
+ parameters from a configuration file.
Args:
- host (str, optional): The hostname or IP address of the remote
- server. Defaults to _DEFAULT_HOST.
+ host (str, optional): The hostname or IP address of the remote
+ server. Defaults to `_DEFAULT_HOST`.
user (str, optional): The username to use when connecting to
- the remote server. Defaults to None, which defaults
- to config.user.
+ the remote server. Defaults to None, which falls back to
+ `config.user`.
port (int, optional): The SSH port to use when connecting.
- Defaults to None, which defaults to config.port.
+ Defaults to None, which falls back to `config.port`.
config (str, optional): Path to the SSH config file.
- Defaults to None, which defaults to your SSH config file.
+ Defaults to None, which falls back to your SSH config file.
gateway (Connection, optional): An optional gateway for connecting
through a jump host. Defaults to None.
forward_agent (bool, optional): Whether to forward the local SSH
agent to the remote server. Defaults to None, which
- defaults to config.forward_agent.
+ falls back to `config.forward_agent`.
connect_timeout (int, optional): Timeout for establishing the SSH
- connection. Defaults to None, which defaults
- to config.timeouts.connect.
+ connection. Defaults to None, which falls back to
+ `config.timeouts.connect`.
connect_kwargs (dict, optional): Additional keyword arguments for
- the underlying SSH connection. Handed verbatim to
+ the underlying SSH connection. These are passed verbatim to
`SSHClient.connect <paramiko.client.SSHClient.connect>`.
Defaults to None.
- inline_ssh_env (bool, optional): Whether to use inline SSH
- environment. This is necessary if the remote server has
- a restricted ``AcceptEnv`` setting (which is the common
- default). Defaults to _DEFAULT_INLINE_SSH_ENV.
- slurm_data_path (str, optional): The path to the directory
- containing the data files for Slurm jobs.
- Defaults to _DEFAULT_SLURM_DATA_PATH.
- slurm_images_path (str, optional): The path to the directory
- containing the Singularity images for Slurm jobs.
- Defaults to _DEFAULT_SLURM_IMAGES_PATH.
- slurm_converters_path (str, optional): The path to the directory
- containing the Singularity images for file converters.
- Defaults to _DEFAULT_SLURM_CONVERTERS_PATH.
+ inline_ssh_env (bool, optional): Whether to use inline SSH
+ environment variables. This is necessary if the remote server
+ has a restricted `AcceptEnv` setting (the common default).
+ Defaults to `_DEFAULT_INLINE_SSH_ENV`.
+ slurm_data_path (str, optional): The path to the directory
+ containing the data files for Slurm jobs.
+ Defaults to `_DEFAULT_SLURM_DATA_PATH`.
+ slurm_images_path (str, optional): The path to the directory
+ containing the Singularity images for Slurm jobs.
+ Defaults to `_DEFAULT_SLURM_IMAGES_PATH`.
+ slurm_converters_path (str, optional): The path to the directory
+ containing the Singularity images for file converters.
+ Defaults to `_DEFAULT_SLURM_CONVERTERS_PATH`.
slurm_model_paths (dict, optional): A dictionary containing the
- paths to the Singularity images for specific Slurm job models.
+ paths to the Singularity images for specific Slurm job models.
Defaults to None.
slurm_model_repos (dict, optional): A dictionary containing the
- git repositories of Singularity images for specific Slurm
- job models.
- Defaults to None.
+ Git repositories of Singularity images for specific Slurm
+ job models. Defaults to None.
slurm_model_images (dict, optional): A dictionary containing the
- dockerhub of the Singularity images for specific Slurm
- job models. Will fill automatically from the data in the git
- repository if you set init_slurm.
+ DockerHub images of the Singularity images for specific
+ Slurm job models. Will be filled automatically from the
+ data in the Git repository if `init_slurm` is set to True.
Defaults to None.
- converter_images (dict, optional): A dictionairy containing the
- dockerhub of the Singularity images for converters.
- Will default to building converter available in this package
- on Slurm instead if not configured.
+ converter_images (dict, optional): A dictionary containing the
+ DockerHub images of the Singularity images for file converters.
+ Will default to building the converter available in this package
+ on Slurm instead if not configured.
Defaults to None.
- slurm_model_jobs (dict, optional): A dictionary containing
- information about specific Slurm job models.
+ slurm_model_jobs (dict, optional): A dictionary containing
+ information about specific Slurm job models.
Defaults to None.
- slurm_model_jobs_params (dict, optional): A dictionary containing
- parameters for specific Slurm job models.
+ slurm_model_jobs_params (dict, optional): A dictionary containing
+ parameters for specific Slurm job models.
Defaults to None.
- slurm_script_path (str, optional): The path to the directory
- containing the Slurm job submission scripts on Slurm.
- Defaults to _DEFAULT_SLURM_GIT_SCRIPT_PATH.
- slurm_script_repo (str, optional): The git https URL for cloning
- the repo containing the Slurm job submission scripts.
+ slurm_script_path (str, optional): The path to the directory
+ containing the Slurm job submission scripts on Slurm.
+ Defaults to `_DEFAULT_SLURM_GIT_SCRIPT_PATH`.
+ slurm_script_repo (str, optional): The Git HTTPS URL for cloning
+ the repository containing the Slurm job submission scripts.
Defaults to None.
- init_slurm (bool): Whether to set up the required structures
+ init_slurm (bool, optional): Whether to set up the required structures
on Slurm after initiating this client. This includes creating
- missing folders, downloading container images, cloning git,etc.
- This will take a while at first but will validate your setup.
- Defaults to False to save time.
+ missing folders, downloading container images, cloning Git, etc.
+ This process will take some time initially but will validate
+ your setup. Defaults to False to save time.
+ track_workflows (bool, optional): Whether to track workflows.
+ Defaults to True.
+ enable_job_accounting (bool, optional): Whether to enable job
+ accounting. Defaults to True.
+ enable_job_progress (bool, optional): Whether to track job
+ progress. Defaults to True.
+ enable_workflow_analytics (bool, optional): Whether to enable
+ workflow analytics. Defaults to True.
+ sqlalchemy_url (str, optional): URL for eventsourcing database
+ connection. Defaults to None, which falls back to the
+ `SQLALCHEMY_URL` environment variable. Note that it will
+ always be overridden with the environment variable
+ `SQLALCHEMY_URL`, if that is set.
"""
+
super(SlurmClient, self).__init__(host,
user,
port,
@@ -480,6 +525,159 @@ Source code for biomero.slurm_client
self.init_workflows()
self.validate(validate_slurm_setup=init_slurm)
+
+ # Setup workflow tracking and accounting
+ # Initialize the analytics settings
+ self.track_workflows = track_workflows
+ self.enable_job_accounting = enable_job_accounting
+ self.enable_job_progress = enable_job_progress
+ self.enable_workflow_analytics = enable_workflow_analytics
+
+ # Initialize the analytics system
+ self.sqlalchemy_url = sqlalchemy_url
+ self.initialize_analytics_system(reset_tables=init_slurm)
+
+[docs] def initialize_analytics_system(self, reset_tables=False):
+ """
+ Initialize the analytics system based on the analytics configuration
+ passed to the constructor.
+
+ Args:
+ reset_tables (bool): If True, drops and recreates all views.
+ """
+ # Get persistence settings, prioritize environment variables
+ persistence_module = os.getenv("PERSISTENCE_MODULE", "eventsourcing_sqlalchemy")
+ if persistence_module != "eventsourcing_sqlalchemy":
+ raise NotImplementedError(f"Can't handle {persistence_module}. Currently only supports 'eventsourcing_sqlalchemy' as PERSISTENCE_MODULE")
+
+ sqlalchemy_url = os.getenv("SQLALCHEMY_URL", self.sqlalchemy_url)
+ if not sqlalchemy_url:
+ raise ValueError("SQLALCHEMY_URL must be set either in init, config ('sqlalchemy_url') or as an environment variable.")
+ if sqlalchemy_url != self.sqlalchemy_url:
+ logger.info("Overriding configured SQLALCHEMY_URL with env var SQLALCHEMY_URL.")
+
+ # Build the system based on the analytics configuration
+ pipes = []
+ runner = None
+ if self.track_workflows:
+ # Add JobAccounting to the pipeline if enabled
+ if self.enable_job_accounting:
+ pipes.append([WorkflowTracker, JobAccounting])
+
+ # Add JobProgress to the pipeline if enabled
+ if self.enable_job_progress:
+ pipes.append([WorkflowTracker, JobProgress])
+ pipes.append([WorkflowTracker, WorkflowProgress])
+
+ # Add WorkflowAnalytics to the pipeline if enabled
+ if self.enable_workflow_analytics:
+ pipes.append([WorkflowTracker, WorkflowAnalytics])
+
+ # Add onlys WorkflowTracker if no listeners are enabled
+ if not pipes:
+ pipes = [[WorkflowTracker]]
+
+ system = System(pipes=pipes)
+ scoped_session_topic = EngineManager.create_scoped_session(
+ sqlalchemy_url=sqlalchemy_url)
+ runner = SingleThreadedRunner(system, env={
+ 'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic,
+ 'PERSISTENCE_MODULE': persistence_module})
+ runner.start()
+ self.workflowTracker = runner.get(WorkflowTracker)
+ else: # turn off persistence, override
+ logger.warning("Tracking workflows is disabled. No-op WorkflowTracker will be used.")
+ self.workflowTracker = NoOpWorkflowTracker()
+
+ self.setup_listeners(runner, reset_tables)
+
+[docs] def setup_listeners(self, runner, reset_tables):
+ # Only when people run init script, we just drop and rebuild.
+ self.get_listeners(runner)
+
+ # Optionally drop and recreate tables
+ if reset_tables:
+ logger.info("Resetting view tables.")
+ tables = []
+ # gather the listener tables
+ listeners = [self.jobAccounting,
+ self.jobProgress,
+ self.wfProgress,
+ self.workflowAnalytics]
+ for listener in listeners:
+ if not isinstance(listener, NoOpWorkflowTracker):
+ tables.append(listener.recorder.tracking_table_name)
+ tables.append(listener.recorder.events_table_name)
+ runner.stop()
+ # gather the view tables
+ tables.append(TaskExecution.__tablename__)
+ tables.append(JobProgressView.__tablename__)
+ tables.append(WorkflowProgressView.__tablename__)
+ tables.append(JobView.__tablename__)
+ with EngineManager.get_session() as session:
+ try:
+ # Begin a transaction
+ for table in tables:
+ # Drop the table if it exists
+ logger.info(f"Dropping table {table}")
+ drop_table_sql = text(f'DROP TABLE IF EXISTS {table}')
+ session.execute(drop_table_sql)
+ # Only when people run init script, we just drop and rebuild.
+ session.commit()
+ logger.info("Dropped view tables successfully")
+ except IntegrityError as e:
+ logger.error(e)
+ session.rollback()
+ raise Exception(f"Error trying to reset the view tables: {e}")
+
+ EngineManager.close_engine() # close current sql session
+ # restart runner, listeners and recreate views
+ self.initialize_analytics_system(reset_tables=False)
+ # Update the view tables again
+ listeners = [self.jobAccounting,
+ self.jobProgress,
+ self.wfProgress,
+ self.workflowAnalytics]
+ for listener in listeners:
+ if listener:
+ self.bring_listener_uptodate(listener)
+
+[docs] def get_listeners(self, runner):
+ if self.track_workflows and self.enable_job_accounting:
+ self.jobAccounting = runner.get(JobAccounting)
+ else:
+ self.jobAccounting = NoOpWorkflowTracker()
+
+ if self.track_workflows and self.enable_job_progress:
+ self.jobProgress = runner.get(JobProgress)
+ self.wfProgress = runner.get(WorkflowProgress)
+ else:
+ self.jobProgress = NoOpWorkflowTracker()
+ self.wfProgress = NoOpWorkflowTracker()
+
+ if self.track_workflows and self.enable_workflow_analytics:
+ self.workflowAnalytics = runner.get(WorkflowAnalytics)
+ else:
+ self.workflowAnalytics = NoOpWorkflowTracker()
+
+[docs] def bring_listener_uptodate(self, listener, start=1):
+ with EngineManager.get_session() as session:
+ try:
+ # Begin a transaction
+ listener.pull_and_process(leader_name=WorkflowTracker.__name__, start=start)
+ session.commit()
+ logger.info("Updated listener successfully")
+ except IntegrityError as e:
+ logger.error(e)
+ session.rollback()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ # Ensure to call the parent class's __exit__
+ # to clean up Connection resources
+ super().__exit__(exc_type, exc_val, exc_tb)
+ # Cleanup resources specific to SlurmClient
+ EngineManager.close_engine()
+ # If we have any other resources to close or cleanup, do it here
[docs] def init_workflows(self, force_update: bool = False):
"""
@@ -768,17 +966,16 @@ Source code for biomero.slurm_client
- /etc/slurm-config.ini
- ~/slurm-config.ini
- Note that this is only for the SLURM specific values that we added.
+ Note that this is only for the SLURM-specific values that we added.
Most configuration values are set via configuration mechanisms from
- Fabric library,
- like SSH settings being loaded from SSH config, /etc/fabric.yml or
- environment variables.
+ Fabric library, like SSH settings being loaded from SSH config,
+ /etc/fabric.yml or environment variables.
See Fabric's documentation for more info on configuration if needed.
Args:
configfile (str): The path to your configuration file. Optional.
init_slurm (bool): Initiate / validate slurm setup. Optional
- Might take some time the first time with downloading etc.
+ Might take some time the first time with downloading, etc.
Returns:
SlurmClient: A new SlurmClient object.
@@ -789,6 +986,7 @@ Source code for biomero.slurm_client
configs.read([cls._DEFAULT_CONFIG_PATH_1,
cls._DEFAULT_CONFIG_PATH_2,
configfile])
+
# Read the required parameters from the configuration file,
# fallback to defaults
host = configs.get("SSH", "host", fallback=cls._DEFAULT_HOST)
@@ -820,12 +1018,11 @@ Source code for biomero.slurm_client
slurm_model_jobs[k[:-len(suffix_job)]] = v
slurm_model_jobs_params[k[:-len(suffix_job)]] = []
elif job_param_match:
- print(f"Match: {slurm_model_jobs_params}")
slurm_model_jobs_params[job_param_match.group(1)].append(
f" --{job_param_match.group(2)}={v}")
- print(f"Added: {slurm_model_jobs_params}")
else:
slurm_model_paths[k] = v
+ logger.info(f"Using job params: {slurm_model_jobs_params}")
slurm_script_path = configs.get(
"SLURM", "slurm_script_path",
@@ -836,7 +1033,6 @@ Source code for biomero.slurm_client
)
# Parse converters, if available
- # Should be key=value where key is a name and value a docker image
try:
converter_items = configs.items("CONVERTERS")
if converter_items:
@@ -844,7 +1040,22 @@ Source code for biomero.slurm_client
else:
converter_images = None # Section exists but is empty
except configparser.NoSectionError:
- converter_images = None # Section does not exist
+ converter_images = None # Section does not exist
+
+ # Read the analytics section, if available
+ try:
+ track_workflows = configs.getboolean('ANALYTICS', 'track_workflows', fallback=True)
+ enable_job_accounting = configs.getboolean('ANALYTICS', 'enable_job_accounting', fallback=True)
+ enable_job_progress = configs.getboolean('ANALYTICS', 'enable_job_progress', fallback=True)
+ enable_workflow_analytics = configs.getboolean('ANALYTICS', 'enable_workflow_analytics', fallback=True)
+ sqlalchemy_url = configs.get('ANALYTICS', 'sqlalchemy_url', fallback=None)
+ except configparser.NoSectionError:
+ # If the ANALYTICS section is missing, fallback to default values
+ track_workflows = True
+ enable_job_accounting = True
+ enable_job_progress = True
+ enable_workflow_analytics = True
+ sqlalchemy_url = None
# Create the SlurmClient object with the parameters read from
# the config file
@@ -861,7 +1072,13 @@ Source code for biomero.slurm_client
slurm_model_jobs_params=slurm_model_jobs_params,
slurm_script_path=slurm_script_path,
slurm_script_repo=slurm_script_repo,
- init_slurm=init_slurm)
+ init_slurm=init_slurm,
+ # Pass analytics settings to the constructor
+ track_workflows=track_workflows,
+ enable_job_accounting=enable_job_accounting,
+ enable_job_progress=enable_job_progress,
+ enable_workflow_analytics=enable_workflow_analytics,
+ sqlalchemy_url=sqlalchemy_url)
[docs] def cleanup_tmp_files(self,
slurm_job_id: str,
@@ -971,7 +1188,7 @@ Source code for biomero.slurm_client
[docs] def get_active_job_progress(self,
slurm_job_id: str,
pattern: str = r"\d+%",
- env: Optional[Dict[str, str]] = None) -> str:
+ env: Optional[Dict[str, str]] = None) -> Any:
"""
Get the progress of an active Slurm job from its logfiles.
@@ -984,7 +1201,7 @@ Source code for biomero.slurm_client
to set when running the command. Defaults to None.
Returns:
- str: The progress of the Slurm job.
+ Any: The progress of the Slurm job according to the pattern, or None.
"""
cmdlist = []
cmd = self.get_recent_log_command(
@@ -997,13 +1214,14 @@ Source code for biomero.slurm_client
except Exception as e:
logger.error(f"Issue with run command: {e}")
# Match the specified pattern in the result's stdout
+ latest_progress = None
try:
latest_progress = re.findall(
pattern, result.stdout)[-1]
except Exception as e:
- logger.error(f"Issue with extracting progress: {e}")
+ logger.warning(f"Issue with extracting progress: {e}")
- return f"Progress: {latest_progress}\n"
+ return latest_progress
[docs] def run_commands(self, cmdlist: List[str],
env: Optional[Dict[str, str]] = None,
@@ -1338,8 +1556,9 @@ Source code for biomero.slurm_client
input_data: str,
email: Optional[str] = None,
time: Optional[str] = None,
+ wf_id: Optional[UUID] = None,
**kwargs
- ) -> Tuple[Result, int]:
+ ) -> Tuple[Result, int, UUID, UUID]:
"""
Run a specified workflow on Slurm using the given parameters.
@@ -1352,24 +1571,48 @@ Source code for biomero.slurm_client
email (str, optional): Email address for Slurm job notifications.
time (str, optional): Time limit for the Slurm job in the
format HH:MM:SS.
+ wf_id (UUID, optional): Workflow ID for tracking purposes. If not provided, a new one is created.
**kwargs: Additional keyword arguments for the workflow.
Returns:
- Tuple[Result, int]:
- A tuple containing the result of starting the workflow job and
- the Slurm job ID, or -1 if the job ID could not be extracted.
+ Tuple[Result, int, UUID, UUID]:
+ A tuple containing the result of starting the workflow job,
+ the Slurm job ID, the workflow ID, and the task ID.
+ If the Slurm job ID could not be extracted, it returns -1 for the job ID.
Note:
- The Slurm job ID is extracted from the result of the
- `run_commands` method.
+ The Slurm job ID is extracted from the result of the `run_commands` method.
+ If `track_workflows` is enabled, workflow and task tracking is performed.
"""
+ if not wf_id:
+ wf_id = self.workflowTracker.initiate_workflow(
+ workflow_name,
+ workflow_version,
+ -1,
+ -1
+ )
+ task_id = self.workflowTracker.add_task_to_workflow(
+ wf_id,
+ workflow_name,
+ workflow_version,
+ input_data,
+ kwargs)
+ logger.debug(f"Added new task {task_id} to workflow {wf_id}")
+
sbatch_cmd, sbatch_env = self.get_workflow_command(
workflow_name, workflow_version, input_data, email, time, **kwargs)
print(f"Running {workflow_name} job on {input_data} on Slurm:\
{sbatch_cmd} w/ {sbatch_env}")
logger.info(f"Running {workflow_name} job on {input_data} on Slurm")
res = self.run_commands([sbatch_cmd], sbatch_env)
- return res, self.extract_job_id(res)
+ slurm_job_id = self.extract_job_id(res)
+
+ if task_id:
+ self.workflowTracker.start_task(task_id)
+ self.workflowTracker.add_job_id(task_id, slurm_job_id)
+ self.workflowTracker.add_result(task_id, res)
+
+ return res, slurm_job_id, wf_id, task_id
[docs] def run_workflow_job(self,
workflow_name: str,
@@ -1377,6 +1620,7 @@ Source code for biomero.slurm_client
input_data: str,
email: Optional[str] = None,
time: Optional[str] = None,
+ wf_id: Optional[UUID] = None,
**kwargs
) -> SlurmJob:
"""
@@ -1388,19 +1632,23 @@ Source code for biomero.slurm_client
input_data (str): Name of the input data folder containing input image files.
email (str, optional): Email address for Slurm job notifications.
time (str, optional): Time limit for the Slurm job in the format HH:MM:SS.
+ wf_id (UUID, optional): Workflow ID for tracking purposes. If not provided, a new one is created.
**kwargs: Additional keyword arguments for the workflow.
Returns:
SlurmJob: A SlurmJob instance representing the started workflow job.
"""
- result, job_id = self.run_workflow(
- workflow_name, workflow_version, input_data, email, time, **kwargs)
- return SlurmJob(result, job_id)
+ result, job_id, wf_id, task_id = self.run_workflow(
+ workflow_name, workflow_version, input_data, email, time, wf_id,
+ **kwargs)
+ return SlurmJob(result, job_id, wf_id, task_id)
-[docs] def run_conversion_workflow_job(self, folder_name: str,
+[docs] def run_conversion_workflow_job(self,
+ folder_name: str,
source_format: str = 'zarr',
- target_format: str = 'tiff'
- ) -> Tuple[Result, int]:
+ target_format: str = 'tiff',
+ wf_id: UUID = None
+ ) -> SlurmJob:
"""
Run the data conversion workflow on Slurm using the given data folder.
@@ -1410,9 +1658,8 @@ Source code for biomero.slurm_client
target_format (str): Target data format after conversion (default is 'tiff').
Returns:
- Tuple[Result, int]:
- A tuple containing the result of starting the conversion job and
- the Slurm job ID, or -1 if the job ID could not be extracted.
+ SlurmJob:
+ the conversion job
Warning:
The default implementation only supports conversion from 'zarr' to 'tiff'.
@@ -1424,7 +1671,7 @@ Source code for biomero.slurm_client
# Construct all commands to run consecutively
data_path = f"{self.slurm_data_path}/{folder_name}"
- conversion_cmd, sbatch_env = self.get_conversion_command(
+ conversion_cmd, sbatch_env, chosen_converter, version = self.get_conversion_command(
data_path, config_file, source_format, target_format)
commands = [
f"find \"{data_path}/data/in\" -name \"*.{source_format}\" | awk '{{print NR, $0}}' > \"{config_file}\"",
@@ -1432,11 +1679,34 @@ Source code for biomero.slurm_client
f"echo \"Number of .{source_format} files: $N\"",
conversion_cmd
]
+ logger.debug(f"wf_id: {wf_id}")
+ if not wf_id:
+ wf_id = self.workflowTracker.initiate_workflow(
+ "conversion",
+ -1,
+ -1,
+ -1
+ )
+ logger.debug(f"wf_id: {wf_id}")
+ task_id = self.workflowTracker.add_task_to_workflow(
+ wf_id,
+ f"convert_{source_format}_to_{target_format}".upper(),
+ version,
+ data_path,
+ sbatch_env
+ )
# Run all commands consecutively
res = self.run_commands(commands, sbatch_env)
-
- return SlurmJob(res, self.extract_job_id(res))
+
+ slurm_job_id = self.extract_job_id(res)
+
+ if task_id:
+ self.workflowTracker.start_task(task_id)
+ self.workflowTracker.add_job_id(task_id, slurm_job_id)
+ self.workflowTracker.add_result(task_id, res)
+
+ return SlurmJob(res, slurm_job_id, wf_id, task_id)
[docs] def extract_job_id(self, result: Result) -> int:
"""
@@ -1861,7 +2131,7 @@ Source code for biomero.slurm_client
[docs] def get_conversion_command(self, data_path: str,
config_file: str,
source_format: str = 'zarr',
- target_format: str = 'tiff') -> Tuple[str, Dict]:
+ target_format: str = 'tiff') -> Tuple[str, Dict, str, str]:
"""
Generate Slurm conversion command and environment variables for data conversion.
@@ -1872,9 +2142,9 @@ Source code for biomero.slurm_client
target_format (str): Target data format (default is 'tiff').
Returns:
- Tuple[str, Dict]:
+ Tuple[str, Dict, str, str]:
A tuple containing the Slurm conversion command and
- the environment variables.
+ the environment variables, followed by the converter image name and version.
Warning:
The default implementation only supports conversion from 'zarr' to 'tiff'.
@@ -1890,11 +2160,13 @@ Source code for biomero.slurm_client
f"Conversion from {source_format} to {target_format} is not supported by default!")
chosen_converter = f"convert_{source_format}_to_{target_format}_latest.sif"
+ version = None
if self.converter_images:
image = self.converter_images[f"{source_format}_to_{target_format}"]
version, image = self.parse_docker_image_version(image)
if version:
- chosen_converter = f"convert_{source_format}_to_{target_format}_{version}.sif"
+ chosen_converter = f"convert_{source_format}_to_{target_format}_{version}.sif"
+ version = version or "latest"
logger.info(f"Converting with {chosen_converter}")
sbatch_env = {
@@ -1908,7 +2180,7 @@ Source code for biomero.slurm_client
conversion_cmd = "sbatch --job-name=conversion --export=ALL,CONFIG_PATH=\"$PWD/$CONFIG_FILE\" --array=1-$N \"$SCRIPT_PATH/convert_job_array.sh\""
# conversion_cmd_waiting = "sbatch --job-name=conversion --export=ALL,CONFIG_PATH=\"$PWD/$CONFIG_FILE\" --array=1-$N --wait $SCRIPT_PATH/convert_job_array.sh"
- return conversion_cmd, sbatch_env
+ return conversion_cmd, sbatch_env, chosen_converter, version
[docs] def workflow_params_to_envvars(self, **kwargs) -> Dict:
"""
diff --git a/biomero.html b/biomero.html
index 0b5231c..ff6888a 100644
--- a/biomero.html
+++ b/biomero.html
@@ -111,7 +111,7 @@ biomero packagebiomero.slurm_client module
-
-class biomero.slurm_client.SlurmClient(host='slurm', user=None, port=None, config=None, gateway=None, forward_agent=None, connect_timeout=None, connect_kwargs=None, inline_ssh_env=True, slurm_data_path: str = 'my-scratch/data', slurm_images_path: str = 'my-scratch/singularity_images/workflows', slurm_converters_path: str = 'my-scratch/singularity_images/converters', slurm_model_paths: Optional[dict] = None, slurm_model_repos: Optional[dict] = None, slurm_model_images: Optional[dict] = None, converter_images: Optional[dict] = None, slurm_model_jobs: Optional[dict] = None, slurm_model_jobs_params: Optional[dict] = None, slurm_script_path: str = 'slurm-scripts', slurm_script_repo: Optional[str] = None, init_slurm: bool = False)[source]
+class biomero.slurm_client.SlurmClient(host='slurm', user=None, port=None, config=None, gateway=None, forward_agent=None, connect_timeout=None, connect_kwargs=None, inline_ssh_env=True, slurm_data_path: str = 'my-scratch/data', slurm_images_path: str = 'my-scratch/singularity_images/workflows', slurm_converters_path: str = 'my-scratch/singularity_images/converters', slurm_model_paths: Optional[dict] = None, slurm_model_repos: Optional[dict] = None, slurm_model_images: Optional[dict] = None, converter_images: Optional[dict] = None, slurm_model_jobs: Optional[dict] = None, slurm_model_jobs_params: Optional[dict] = None, slurm_script_path: str = 'slurm-scripts', slurm_script_repo: Optional[str] = None, init_slurm: bool = False, track_workflows: bool = True, enable_job_accounting: bool = True, enable_job_progress: bool = True, enable_workflow_analytics: bool = True, sqlalchemy_url: Optional[str] = None)[source]
Bases: fabric.connection.Connection
A client for connecting to and interacting with a Slurm cluster over
SSH.
@@ -128,7 +128,7 @@ biomero package
- Type
--
+
-
@@ -140,7 +140,7 @@ biomero package
Type
-
+
@@ -152,7 +152,7 @@ biomero package
Type
-
+
@@ -164,7 +164,7 @@ biomero package
Type
-
+
@@ -176,7 +176,7 @@ biomero package
Type
-
+
@@ -190,7 +190,7 @@ biomero package
Type
-
+
@@ -202,7 +202,7 @@ biomero package
Type
-
+
@@ -214,7 +214,7 @@ biomero package
Type
-
+
@@ -233,17 +233,22 @@ biomero package
+
+bring_listener_uptodate(listener, start=1)[source]
+
+
-
-check_job_status(slurm_job_ids: List[int], env: Optional[Dict[str, str]] = None) Tuple[Dict[int, str], fabric.runners.Result] [source]
+check_job_status(slurm_job_ids: List[int], env: Optional[Dict[str, str]] = None) Tuple[Dict[int, str], fabric.runners.Result] [source]
Check the status of Slurm jobs with the given job IDs.
Note: This doesn’t return job arrays individually.
It takes the last value returned for those sub ids
@@ -251,8 +256,8 @@
biomero package
- Parameters
-slurm_job_ids (List[int]) – The job IDs of the Slurm jobs to check.
-env (Dict[str, str], optional) – Optional environment variables to
+
slurm_job_ids (List[int]) – The job IDs of the Slurm jobs to check.
+env (Dict[str, str], optional) – Optional environment variables to
set when running the command. Defaults to None.
@@ -261,7 +266,7 @@ biomero packageReturn type
--
+
-
- Raises
SSHException – If the command execution fails or no response is
@@ -272,16 +277,16 @@
biomero package
-
-cleanup_tmp_files(slurm_job_id: str, filename: Optional[str] = None, data_location: Optional[str] = None, logfile: Optional[str] = None) fabric.runners.Result [source]
+cleanup_tmp_files(slurm_job_id: str, filename: Optional[str] = None, data_location: Optional[str] = None, logfile: Optional[str] = None) fabric.runners.Result [source]
Cleanup zip and unzipped files/folders associated with a Slurm job.
- Parameters
-slurm_job_id (str) – The job ID of the Slurm script.
-filename (str) – The zip filename on Slurm.
-data_location (str, optional) – The location of data files on Slurm.
+
slurm_job_id (str) – The job ID of the Slurm script.
+filename (str) – The zip filename on Slurm.
+data_location (str, optional) – The location of data files on Slurm.
If not provided, it will be extracted from the log file.
-logfile (str, optional) – The log file of the Slurm job.
+
logfile (str, optional) – The log file of the Slurm job.
If not provided, a default log file will be used.
@@ -304,7 +309,7 @@ biomero package
-
-convert_cytype_to_omtype(cytype: str, _default, *args, **kwargs) Any [source]
+convert_cytype_to_omtype(cytype: str, _default, *args, **kwargs) Any [source]
Convert a Cytomine type to an OMERO type and instantiates it
with args/kwargs.
Note that Cytomine has a Python Client, and some conversion methods
@@ -315,7 +320,7 @@
biomero package
- Parameters
-cytype (str) – The Cytomine type to convert.
+cytype (str) – The Cytomine type to convert.
_default – The default value. Required to distinguish between float
and int.
*args – Additional positional arguments.
@@ -334,28 +339,28 @@ biomero package
-
-convert_url(input_url: str) str [source]
+convert_url(input_url: str) str [source]
Convert the input GitHub URL to an output URL that retrieves
the ‘descriptor.json’ file in raw format.
- Parameters
-input_url (str) – The input GitHub URL.
+input_url (str) – The input GitHub URL.
- Returns
The output URL to the ‘descriptor.json’ file.
- Return type
--
+
-
- Raises
-ValueError – If the input URL is not a valid GitHub URL.
+ValueError – If the input URL is not a valid GitHub URL.
-
-copy_zip_locally(local_tmp_storage: str, filename: str) fabric.transfer.Result [source]
+copy_zip_locally(local_tmp_storage: str, filename: str) fabric.transfer.Result [source]
Copy a zip file from Slurm to the local server.
Note about (Transfer)Result:
Unlike similar classes such as invoke.runners.Result or
@@ -382,21 +387,21 @@
biomero package
-
-extract_data_location_from_log(slurm_job_id: Optional[str] = None, logfile: Optional[str] = None) str [source]
+extract_data_location_from_log(slurm_job_id: Optional[str] = None, logfile: Optional[str] = None) str [source]
Read SLURM job logfile to find location of the data.
One of the parameters is required, either id or file.
- Parameters
-
- Returns
Data location according to the log
- Return type
--
+
-
- Raises
SSHException – If there is an issue with the command execution.
@@ -406,7 +411,7 @@ biomero package
-
-extract_job_id(result: fabric.runners.Result) int [source]
+extract_job_id(result: fabric.runners.Result) int [source]
Extract the Slurm job ID from the result of a command.
- Parameters
@@ -417,35 +422,35 @@ biomero packageReturn type
--
+
-
-
-extract_parts_from_url(input_url: str) Tuple[List[str], str] [source]
+extract_parts_from_url(input_url: str) Tuple[List[str], str] [source]
Extract the repository and branch information from the input URL.
- Parameters
-input_url (str) – The input GitHub URL.
+input_url (str) – The input GitHub URL.
- Returns
The list of url parts and the branch/version.
If no branch is found, it will return “master”
- Return type
--
+
-
- Raises
-ValueError – If the input URL is not a valid GitHub URL.
+ValueError – If the input URL is not a valid GitHub URL.
-
-classmethod from_config(configfile: str = '', init_slurm: bool = False) biomero.slurm_client.SlurmClient [source]
+classmethod from_config(configfile: str = '', init_slurm: bool = False) biomero.slurm_client.SlurmClient [source]
Creates a new SlurmClient object using the parameters read from a
configuration file (.ini).
@@ -455,18 +460,17 @@ biomero package
- Parameters
-configfile (str) – The path to your configuration file. Optional.
-init_slurm (bool) – Initiate / validate slurm setup. Optional
-Might take some time the first time with downloading etc.
+configfile (str) – The path to your configuration file. Optional.
+init_slurm (bool) – Initiate / validate slurm setup. Optional
+Might take some time the first time with downloading, etc.
- Returns
@@ -480,15 +484,15 @@ biomero package
-
-generate_slurm_job_for_workflow(workflow: str, substitutes: Dict[str, str], template: str = 'job_template.sh') str [source]
+generate_slurm_job_for_workflow(workflow: str, substitutes: Dict[str, str], template: str = 'job_template.sh') str [source]
Generate a Slurm job script for a specific workflow.
- Parameters
-workflow (str) – The name of the workflow.
-substitutes (Dict[str, str]) – A dictionary containing key-value
+
workflow (str) – The name of the workflow.
+substitutes (Dict[str, str]) – A dictionary containing key-value
pairs for substituting placeholders in the job template.
-template (str, optional) – The filename of the job template.
+
template (str, optional) – The filename of the job template.
Defaults to “job_template.sh”.
@@ -496,37 +500,37 @@ biomero packageThe generated Slurm job script as a string.
- Return type
--
+
-
-
-get_active_job_progress(slurm_job_id: str, pattern: str = '\\d+%', env: Optional[Dict[str, str]] = None) str [source]
+get_active_job_progress(slurm_job_id: str, pattern: str = '\\d+%', env: Optional[Dict[str, str]] = None) Any [source]
Get the progress of an active Slurm job from its logfiles.
- Parameters
-slurm_job_id (str) – The ID of the Slurm job.
-pattern (str) – The pattern to match in the job log to extract
+
slurm_job_id (str) – The ID of the Slurm job.
+pattern (str) – The pattern to match in the job log to extract
the progress (default: r”d+%”).
-env (Dict[str, str], optional) – Optional environment variables
+
env (Dict[str, str], optional) – Optional environment variables
to set when running the command. Defaults to None.
- Returns
-The progress of the Slurm job.
+The progress of the Slurm job according to the pattern, or None.
- Return type
--
+
Any
-
-get_all_image_versions_and_data_files() Tuple[Dict[str, List[str]], List[str]] [source]
+get_all_image_versions_and_data_files() Tuple[Dict[str, List[str]], List[str]] [source]
Retrieve all available image versions and data files from
the Slurm cluster.
-
-get_cellpose_command(image_version: str, input_data: str, cp_model: str, nuc_channel: int, prob_threshold: float, cell_diameter: float, email: Optional[str] = None, time: Optional[str] = None, use_gpu: bool = True, model: str = 'cellpose') Tuple[str, Dict] [source]
+get_cellpose_command(image_version: str, input_data: str, cp_model: str, nuc_channel: int, prob_threshold: float, cell_diameter: float, email: Optional[str] = None, time: Optional[str] = None, use_gpu: bool = True, model: str = 'cellpose') Tuple[str, Dict] [source]
Return the command and environment dictionary to run a CellPose job
on the Slurm workload manager.
A specific example of using the generic ‘get_workflow_command’.
- Parameters
-image_version (str) – The version of the Singularity image to use.
-input_data (str) – The name of the input data folder on the shared
+
image_version (str) – The version of the Singularity image to use.
+input_data (str) – The name of the input data folder on the shared
file system.
-cp_model (str) – The name of the CellPose model to use.
-nuc_channel (int) – The index of the nuclear channel.
-prob_threshold (float) – The probability threshold for
+
cp_model (str) – The name of the CellPose model to use.
+nuc_channel (int) – The index of the nuclear channel.
+prob_threshold (float) – The probability threshold for
nuclei detection.
-cell_diameter (float) – The expected cell diameter in pixels.
-email (Optional[str]) – The email address to send notifications to.
+
cell_diameter (float) – The expected cell diameter in pixels.
+email (Optional[str]) – The email address to send notifications to.
Defaults to None.
-time (Optional[str]) – The maximum time for the job to run.
+
time (Optional[str]) – The maximum time for the job to run.
Defaults to None.
-use_gpu (bool) – Whether to use GPU for the CellPose job.
+
use_gpu (bool) – Whether to use GPU for the CellPose job.
Defaults to True.
-model (str) – The name of the folder of the Docker image to use.
+
model (str) – The name of the folder of the Docker image to use.
Defaults to “cellpose”.
@@ -573,30 +577,30 @@ biomero packageReturn type
--
+
-
-
-get_conversion_command(data_path: str, config_file: str, source_format: str = 'zarr', target_format: str = 'tiff') Tuple[str, Dict] [source]
+get_conversion_command(data_path: str, config_file: str, source_format: str = 'zarr', target_format: str = 'tiff') Tuple[str, Dict, str, str] [source]
Generate Slurm conversion command and environment variables for data conversion.
- Parameters
-data_path (str) – Path to the data folder.
-config_file (str) – Path to the configuration file.
-source_format (str) – Source data format (default is ‘zarr’).
-target_format (str) – Target data format (default is ‘tiff’).
+data_path (str) – Path to the data folder.
+config_file (str) – Path to the configuration file.
+source_format (str) – Source data format (default is ‘zarr’).
+target_format (str) – Target data format (default is ‘tiff’).
- Returns
A tuple containing the Slurm conversion command and
-the environment variables.
+the environment variables, followed by the converter image name and version.
- Return type
-Tuple[str, Dict]
+-
@@ -609,12 +613,12 @@ biomero package
-
-get_image_versions_and_data_files(model: str) Tuple[List[str], List[str]] [source]
+get_image_versions_and_data_files(model: str) Tuple[List[str], List[str]] [source]
Retrieve the available image versions and input data files for a
given model.
- Parameters
-model (str) – The name of the model to query for.
+model (str) – The name of the model to query for.
- Returns
A tuple containing two lists:
@@ -628,10 +632,10 @@
biomero packageReturn type
--
+
-
- Raises
-ValueError – If the provided model is not found in the
+
ValueError – If the provided model is not found in the
SlurmClient’s known model paths.
@@ -639,25 +643,25 @@ biomero package
-
-get_job_status_command(slurm_job_ids: List[int]) str [source]
+get_job_status_command(slurm_job_ids: List[int]) str [source]
Return the Slurm command to get the status of jobs with the given
job IDs.
-
-get_jobs_info_command(start_time: str = '2023-01-01', end_time: str = 'now', columns: str = 'JobId', states: str = 'r,cd,f,to,rs,dl,nf') str [source]
+get_jobs_info_command(start_time: str = '2023-01-01', end_time: str = 'now', columns: str = 'JobId', states: str = 'r,cd,f,to,rs,dl,nf') str [source]
Return the Slurm command to retrieve information about old jobs.
The command will be formatted with the specified start time, which is
expected to be in the ISO format “YYYY-MM-DD”.
@@ -668,13 +672,13 @@
biomero package
- Parameters
-start_time (str) – The start time from which to retrieve job
+
start_time (str) – The start time from which to retrieve job
information. Defaults to “2023-01-01”.
-end_time (str) – The end time until which to retrieve job
+
end_time (str) – The end time until which to retrieve job
information. Defaults to “now”.
-columns (str) – The columns to retrieve from the job information.
+
columns (str) – The columns to retrieve from the job information.
Defaults to “JobId”. It is comma separated, e.g. “JobId,State”.
-states (str) – The job states to include in the query.
+
states (str) – The job states to include in the query.
Defaults to “r,cd,f,to,rs,dl,nf”.
@@ -683,14 +687,19 @@ biomero packageReturn type
--
+
-
+
+
-
-get_logfile_from_slurm(slurm_job_id: str, local_tmp_storage: str = '/tmp/', logfile: Optional[str] = None) Tuple[str, str, fabric.transfer.Result] [source]
+get_logfile_from_slurm(slurm_job_id: str, local_tmp_storage: str = '/tmp/', logfile: Optional[str] = None) Tuple[str, str, fabric.transfer.Result] [source]
Copy the logfile of the given SLURM job to the local server.
Note about (Transfer)Result:
Unlike similar classes such as invoke.runners.Result
@@ -702,10 +711,10 @@
biomero package
- Parameters
-slurm_job_id (str) – The ID of the SLURM job.
-local_tmp_storage (str, optional) – Path to store the logfile
+
slurm_job_id (str) – The ID of the SLURM job.
+local_tmp_storage (str, optional) – Path to store the logfile
locally. Defaults to “/tmp/”.
-logfile (str, optional) – Path to the logfile on the SLURM server.
+
logfile (str, optional) – Path to the logfile on the SLURM server.
Defaults to None.
@@ -725,14 +734,14 @@ biomero package
-
-get_recent_log_command(log_file: str, n: int = 10) str [source]
+get_recent_log_command(log_file: str, n: int = 10) str [source]
Get the command to retrieve the recent log entries from a
specified log file.
- Parameters
-
@@ -740,22 +749,22 @@ biomero packageThe command to retrieve the recent log entries.
- Return type
--
+
-
-
-get_unzip_command(zipfile: str, filter_filetypes: str = '*.zarr *.tiff *.tif') str [source]
+get_unzip_command(zipfile: str, filter_filetypes: str = '*.zarr *.tiff *.tif') str [source]
Generate a command string for unzipping a data archive and creating
required directories for Slurm jobs.
- Parameters
-zipfile (str) – The name of the zip archive file to extract.
+
zipfile (str) – The name of the zip archive file to extract.
Without extension.
-filter_filetypes (str, optional) – A space-separated string
+
filter_filetypes (str, optional) – A space-separated string
containing the file extensions to extract from the zip file.
E.g. defaults to “*.zarr *.tiff *.tif”.
Setting this argument to None will omit the file
@@ -767,14 +776,14 @@
biomero packageReturn type
--
+
-
-
-get_update_slurm_scripts_command() str [source]
+get_update_slurm_scripts_command() str [source]
Generate the command to update the Git repository containing
the Slurm scripts, if necessary.
@@ -783,25 +792,25 @@ biomero packageReturn type
--
+
-
-
-get_workflow_command(workflow: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, **kwargs) Tuple[str, Dict] [source]
+get_workflow_command(workflow: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, **kwargs) Tuple[str, Dict] [source]
Generate the Slurm workflow command and environment variables.
- Parameters
-workflow (str) – The name of the workflow.
-workflow_version (str) – The version of the workflow.
-input_data (str) – The name of the input data folder containing
+
workflow (str) – The name of the workflow.
+workflow_version (str) – The version of the workflow.
+input_data (str) – The name of the input data folder containing
the input image files.
-email (str, optional) – The email address for job notifications.
+
email (str, optional) – The email address for job notifications.
Defaults to None, which defaults to what is in the job script.
-time (str, optional) – The time limit for the job in the
+
time (str, optional) – The time limit for the job in the
format HH:MM:SS. Defaults to None, which defaults to what
is in the job script.
**kwargs – Additional keyword arguments for the workflow.
@@ -812,27 +821,27 @@ biomero packageReturn type
-Tuple[str, Dict]
+Tuple[str, Dict]
-
-get_workflow_parameters(workflow: str) Dict[str, Dict[str, Any]] [source]
+get_workflow_parameters(workflow: str) Dict[str, Dict[str, Any]] [source]
Retrieve the parameters of a workflow.
- Parameters
-workflow (str) – The workflow for which to retrieve the parameters.
+workflow (str) – The workflow for which to retrieve the parameters.
- Returns
A dictionary containing the workflow parameters.
- Return type
--
+
-
- Raises
-ValueError – If an error occurs while retrieving the workflow
+
ValueError – If an error occurs while retrieving the workflow
parameters.
@@ -840,13 +849,13 @@ biomero package
-
-get_zip_command(data_location: str, filename: str) str [source]
+get_zip_command(data_location: str, filename: str) str [source]
Generate a command string for zipping the data on Slurm.
- Parameters
-
@@ -854,63 +863,75 @@ biomero packageThe command to create the zip file.
- Return type
--
+
-
-
-init_workflows(force_update: bool = False)[source]
+init_workflows(force_update: bool = False)[source]
Retrieves the required info for the configured workflows from github.
It will fill slurm_model_images with dockerhub links.
+
+-
+initialize_analytics_system(reset_tables=False)[source]
+Initialize the analytics system based on the analytics configuration
+passed to the constructor.
+
+- Parameters
+reset_tables (bool) – If True, drops and recreates all views.
+
+
+
+
-
-list_active_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
+list_active_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
Get a list of active jobs from SLURM.
-
-list_all_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
+list_all_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
Get a list of all jobs from SLURM.
-
-list_available_converter_versions() Dict [source]
+list_available_converter_versions() Dict [source]
Note, assumes you use versioned converters.
Will return a dict with a version of each converter on your Slurm.
However, doesn’t work properly with unversioned sif.
@@ -918,29 +939,29 @@ biomero package
-
-list_completed_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
+list_completed_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
Get a list of completed jobs from SLURM.
-
-parse_docker_image_version(image: str) Tuple[str, str] [source]
+parse_docker_image_version(image: str) Tuple[str, str] [source]
Parses the Docker image string to extract the image name and version tag.
- Parameters
-image (str) – The Docker image string in the format ‘image_name:version’.
+image (str) – The Docker image string in the format ‘image_name:version’.
- Returns
@@ -953,18 +974,18 @@ biomero packageReturn type
--
+
-
-
-pull_descriptor_from_github(workflow: str) Dict [source]
+pull_descriptor_from_github(workflow: str) Dict [source]
Pull the workflow descriptor from GitHub.
- Parameters
-workflow (str) – The workflow for which to pull the descriptor.
+workflow (str) – The workflow for which to pull the descriptor.
- Returns
The JSON descriptor.
@@ -973,14 +994,14 @@ biomero packageDict
- Raises
-ValueError – If an error occurs while pulling the descriptor file.
+ValueError – If an error occurs while pulling the descriptor file.
-
-run_commands(cmdlist: List[str], env: Optional[Dict[str, str]] = None, sep: str = ' && ', **kwargs) fabric.runners.Result [source]
+run_commands(cmdlist: List[str], env: Optional[Dict[str, str]] = None, sep: str = ' && ', **kwargs) fabric.runners.Result [source]
Run a list of shell commands consecutively on the Slurm cluster,
ensuring the success of each before proceeding to the next.
The environment variables can be set using the env argument.
@@ -989,10 +1010,10 @@
biomero package
- Parameters
-cmdlist (List[str]) – A list of shell commands to run on Slurm.
-env (Dict[str, str], optional) – Optional environment variables to
+
cmdlist (List[str]) – A list of shell commands to run on Slurm.
+env (Dict[str, str], optional) – Optional environment variables to
set when running the command. Defaults to None.
-sep (str, optional) – The separator used to concatenate the
+
sep (str, optional) – The separator used to concatenate the
commands. Defaults to ‘ && ‘.
**kwargs – Additional keyword arguments.
@@ -1008,7 +1029,7 @@ biomero package
-
-run_commands_split_out(cmdlist: List[str], env: Optional[Dict[str, str]] = None) List[str] [source]
+run_commands_split_out(cmdlist: List[str], env: Optional[Dict[str, str]] = None) List[str] [source]
Run a list of shell commands consecutively and split the output
of each command.
Each command in the list is executed with a separator in between
@@ -1019,8 +1040,8 @@
biomero package
- Parameters
-
@@ -1030,7 +1051,7 @@ biomero packageReturn type
-List[str]
+List[str]
- Raises
SSHException – If any of the commands fail to execute successfully.
@@ -1040,22 +1061,21 @@ biomero package
-
-run_conversion_workflow_job(folder_name: str, source_format: str = 'zarr', target_format: str = 'tiff') Tuple[fabric.runners.Result, int] [source]
+run_conversion_workflow_job(folder_name: str, source_format: str = 'zarr', target_format: str = 'tiff', wf_id: Optional[uuid.UUID] = None) biomero.slurm_client.SlurmJob [source]
Run the data conversion workflow on Slurm using the given data folder.
- Parameters
-folder_name (str) – The name of the data folder containing source format files.
-source_format (str) – Source data format for conversion (default is ‘zarr’).
-target_format (str) – Target data format after conversion (default is ‘tiff’).
+folder_name (str) – The name of the data folder containing source format files.
+source_format (str) – Source data format for conversion (default is ‘zarr’).
+target_format (str) – Target data format after conversion (default is ‘tiff’).
- Returns
-A tuple containing the result of starting the conversion job and
-the Slurm job ID, or -1 if the job ID could not be extracted.
+the conversion job
- Return type
-Tuple[Result, int]
+-
@@ -1068,49 +1088,52 @@ biomero package
-
-run_workflow(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, **kwargs) Tuple[fabric.runners.Result, int] [source]
+run_workflow(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, wf_id: Optional[uuid.UUID] = None, **kwargs) Tuple[fabric.runners.Result, int, uuid.UUID, uuid.UUID] [source]
Run a specified workflow on Slurm using the given parameters.
- Parameters
-workflow_name (str) – Name of the workflow to execute.
-workflow_version (str) – Version of the workflow (image version
+
workflow_name (str) – Name of the workflow to execute.
+workflow_version (str) – Version of the workflow (image version
on Slurm).
-input_data (str) – Name of the input data folder containing input
+
input_data (str) – Name of the input data folder containing input
image files.
-email (str, optional) – Email address for Slurm job notifications.
-time (str, optional) – Time limit for the Slurm job in the
+
email (str, optional) – Email address for Slurm job notifications.
+time (str, optional) – Time limit for the Slurm job in the
format HH:MM:SS.
+wf_id (UUID, optional) – Workflow ID for tracking purposes. If not provided, a new one is created.
**kwargs – Additional keyword arguments for the workflow.
- Returns
-A tuple containing the result of starting the workflow job and
-the Slurm job ID, or -1 if the job ID could not be extracted.
+A tuple containing the result of starting the workflow job,
+the Slurm job ID, the workflow ID, and the task ID.
+If the Slurm job ID could not be extracted, it returns -1 for the job ID.
- Return type
-Tuple[Result, int]
+Tuple[Result, int, UUID, UUID]
Note
-The Slurm job ID is extracted from the result of the
-run_commands method.
+The Slurm job ID is extracted from the result of the run_commands method.
+If track_workflows is enabled, workflow and task tracking is performed.
-
-run_workflow_job(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, **kwargs) biomero.slurm_client.SlurmJob [source]
+run_workflow_job(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, wf_id: Optional[uuid.UUID] = None, **kwargs) biomero.slurm_client.SlurmJob [source]
Run a specified workflow on Slurm using the given parameters and return a SlurmJob instance.
- Parameters
-workflow_name (str) – Name of the workflow to execute.
-workflow_version (str) – Version of the workflow (image version on Slurm).
-input_data (str) – Name of the input data folder containing input image files.
-email (str, optional) – Email address for Slurm job notifications.
-time (str, optional) – Time limit for the Slurm job in the format HH:MM:SS.
+workflow_name (str) – Name of the workflow to execute.
+workflow_version (str) – Version of the workflow (image version on Slurm).
+input_data (str) – Name of the input data folder containing input image files.
+email (str, optional) – Email address for Slurm job notifications.
+time (str, optional) – Time limit for the Slurm job in the format HH:MM:SS.
+wf_id (UUID, optional) – Workflow ID for tracking purposes. If not provided, a new one is created.
**kwargs – Additional keyword arguments for the workflow.
@@ -1178,6 +1201,11 @@ biomero package
+-
+setup_listeners(runner, reset_tables)[source]
+
+
-
setup_slurm()[source]
@@ -1191,13 +1219,13 @@ biomero package
-
-str_to_class(module_name: str, class_name: str, *args, **kwargs)[source]
+str_to_class(module_name: str, class_name: str, *args, **kwargs)[source]
Return a class instance from a string reference.
- Parameters
-module_name (str) – The name of the module.
-class_name (str) – The name of the class.
+module_name (str) – The name of the module.
+class_name (str) – The name of the class.
*args – Additional positional arguments for the class constructor.
**kwargs – Additional keyword arguments for the class constructor.
@@ -1210,19 +1238,19 @@ biomero packageReturn type
--
+
-
-
-transfer_data(local_path: str) fabric.runners.Result [source]
+transfer_data(local_path: str) fabric.runners.Result [source]
Transfers a file or directory from the local machine to the remote
Slurm cluster.
- Parameters
-local_path (str) – The local path to the file or directory to
+
local_path (str) – The local path to the file or directory to
transfer.
- Returns
@@ -1236,13 +1264,13 @@ biomero package
-
-unpack_data(zipfile: str, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
+unpack_data(zipfile: str, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
Unpacks a zipped file on the remote Slurm cluster.
- Parameters
-zipfile (str) – The name of the zipped file to be unpacked.
-env (Dict[str, str], optional) – Optional environment variables
+
zipfile (str) – The name of the zipped file to be unpacked.
+env (Dict[str, str], optional) – Optional environment variables
to set when running the command. Defaults to None.
@@ -1257,7 +1285,7 @@ biomero package
-
-update_slurm_scripts(generate_jobs: bool = False, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
+update_slurm_scripts(generate_jobs: bool = False, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
Updates the local copy of the Slurm job submission scripts.
This function pulls the latest version of the scripts from the Git
repository and copies them to the slurm_script_path directory.
@@ -1267,10 +1295,10 @@
biomero package
- Parameters
-generate_jobs (bool) – Whether to generate new slurm job scripts
+
generate_jobs (bool) – Whether to generate new slurm job scripts
INSTEAD (of pulling from git). Defaults to False, except
if no slurm_script_repo is configured.
-env (Dict[str, str], optional) – Optional environment variables
+
env (Dict[str, str], optional) – Optional environment variables
to set when running the command. Defaults to None.
@@ -1285,12 +1313,12 @@ biomero package
-
-validate(validate_slurm_setup: bool = False)[source]
+validate(validate_slurm_setup: bool = False)[source]
Validate the connection to the Slurm cluster by running
a simple command.
- Parameters
-validate_slurm_setup (bool) – Whether to also check
+
validate_slurm_setup (bool) – Whether to also check
and fix the Slurm setup (folders, images, etc.)
- Returns
@@ -1298,14 +1326,14 @@ biomero packageReturn type
--
+
-
-
-workflow_params_to_envvars(**kwargs) Dict [source]
+workflow_params_to_envvars(**kwargs) Dict [source]
Convert workflow parameters to environment variables.
- Parameters
@@ -1322,7 +1350,7 @@ biomero package
-
-workflow_params_to_subs(params) Dict[str, str] [source]
+workflow_params_to_subs(params) Dict[str, str] [source]
Convert workflow parameters to substitution dictionary for job script.
- Parameters
@@ -1336,14 +1364,14 @@ biomero packageReturn type
--
+
-
-
-zip_data_on_slurm_server(data_location: str, filename: str, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
+zip_data_on_slurm_server(data_location: str, filename: str, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
Zip the output folder of a job on Slurm
- Parameters
@@ -1351,7 +1379,7 @@ biomero packagestr, str], optional) – Optional environment variables to
+
env (Dict[str, str], optional) – Optional environment variables to
set when running the command. Defaults to None.
@@ -1368,8 +1396,8 @@ biomero package
-
-class biomero.slurm_client.SlurmJob(submit_result: fabric.runners.Result, job_id: int)[source]
-Bases: object
+class biomero.slurm_client.SlurmJob(submit_result: fabric.runners.Result, job_id: int, wf_id: uuid.UUID, task_id: uuid.UUID, slurm_polling_interval: int = 10)[source]
+Bases: object
Represents a job submitted to a Slurm cluster.
This class encapsulates information and methods related to managing a job
submitted to a Slurm cluster. It provides functionality to monitor the
@@ -1380,7 +1408,7 @@
biomero package
- Type
--
+
-
@@ -1402,7 +1430,7 @@ biomero package
- Type
--
+
-
@@ -1413,7 +1441,18 @@ biomero package
- Type
--
+
-
+
+
+
+
+
@@ -1421,30 +1460,56 @@ biomero package
-
error_message
-The error message, if any.
+The error message, if any, encountered during job submission.
+
+-
+wf_id
+The workflow ID associated with the job.
-- Parameters
-
-submit_result (Result) – The result of submitting the job.
-job_id (int) – The Slurm job ID.
-
+- Type
+UUID
+
+
+
+-
+task_id
+The task ID within the workflow.
+
+- Type
+UUID
+
+
+
+
+
+-
+slurm_polling_interval
+The polling interval (in seconds) for checking the job status.
+
+- Type
+-
+
+
+
+
Example
# Submit some job with the SlurmClient
-submit_result, job_id = slurmClient.run_workflow(
+submit_result, job_id, wf_id, task_id = slurmClient.run_workflow(
-workflow_name, workflow_version, input_data, email, time, **kwargs)
+workflow_name, workflow_version, input_data, email, time, wf_id,
+**kwargs)
# Create a SlurmJob instance
-slurmJob = SlurmJob(submit_result, job_id)
+slurmJob = SlurmJob(submit_result, job_id, wf_id, task_id)
- if not slurmJob.ok:
logger.warning(f”Error with job: {slurmJob.get_error()}”)
@@ -1465,6 +1530,11 @@ biomero package
+-
+SLURM_POLLING_INTERVAL = 10
+
+
-
cleanup(slurmClient) fabric.runners.Result [source]
@@ -1491,28 +1561,28 @@ biomero packageTrue if the job has completed; False otherwise.
- Return type
--
+
-
-
-get_error() str [source]
+get_error() str [source]
Get the error message associated with the Slurm job submission.
-
-wait_for_completion(slurmClient, omeroConn) str [source]
+wait_for_completion(slurmClient, omeroConn) str [source]
Wait for the Slurm job to reach completion, cancellation, failure, or timeout.
- Parameters
@@ -1525,7 +1595,7 @@ biomero packageThe final state of the Slurm job.
- Return type
--
+
-
diff --git a/genindex.html b/genindex.html
index df14fa7..cd3493f 100644
--- a/genindex.html
+++ b/genindex.html
@@ -131,6 +131,10 @@ B
+
C
@@ -197,10 +201,12 @@ G
- get_image_versions_and_data_files() (biomero.slurm_client.SlurmClient method)
- get_job_status_command() (biomero.slurm_client.SlurmClient method)
+
+ - get_jobs_info_command() (biomero.slurm_client.SlurmClient method)
- - get_jobs_info_command() (biomero.slurm_client.SlurmClient method)
+
- get_listeners() (biomero.slurm_client.SlurmClient method)
- get_logfile_from_slurm() (biomero.slurm_client.SlurmClient method)
@@ -225,6 +231,10 @@ I
+
@@ -285,6 +295,8 @@ P
@@ -318,6 +330,8 @@ S
- setup_directories() (biomero.slurm_client.SlurmClient method)
- setup_job_scripts() (biomero.slurm_client.SlurmClient method)
+
+ - setup_listeners() (biomero.slurm_client.SlurmClient method)
- setup_slurm() (biomero.slurm_client.SlurmClient method)
@@ -327,13 +341,17 @@ S
- slurm_images_path (biomero.slurm_client.SlurmClient attribute)
-
-
+
- slurm_model_paths (biomero.slurm_client.SlurmClient attribute)
- slurm_model_repos (biomero.slurm_client.SlurmClient attribute)
+
+ - SLURM_POLLING_INTERVAL (biomero.slurm_client.SlurmJob attribute)
+
+ - slurm_polling_interval (biomero.slurm_client.SlurmJob attribute)
- slurm_script_path (biomero.slurm_client.SlurmClient attribute)
@@ -352,6 +370,10 @@ S
T