diff --git a/mxcubecore/HardwareObjects/Beamline.py b/mxcubecore/HardwareObjects/Beamline.py index 686a1f3f09..d4de0e04c2 100644 --- a/mxcubecore/HardwareObjects/Beamline.py +++ b/mxcubecore/HardwareObjects/Beamline.py @@ -882,7 +882,7 @@ def get_default_acquisition_parameters(self, acquisition_type="default"): acq_parameters.detector_binning_mode = "" try: - acq_parameters.detector_roi_mode = self.detector.get_roi_mode() + acq_parameters.detector_roi_mode = self.detector.get_roi_mode_name() except Exception: logging.getLogger("HWR").warning( "get_default_acquisition_parameters: " diff --git a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py index 3e168e082e..0bcaa4771b 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py @@ -27,6 +27,9 @@ from mxcubecore import HardwareRepository as HWR from mxcubecore.queue_entry.base_queue_entry import BaseQueueEntry +from mxlims.pydantic import mxmodel +from mxcubecore.utils import mxlims as mxutils + __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" __category__ = "queue" @@ -54,7 +57,7 @@ def pre_execute(self): def post_execute(self): BaseQueueEntry.post_execute(self) - msg = "Finishing GΦL workflow (%s)" % (self.get_data_model().strategy_name) + msg = "Finishing GΦL workflow (%s)" % self.get_data_model().strategy_name logging.getLogger("user_level_log").info(msg) HWR.beamline.gphl_workflow.post_execute() @@ -63,3 +66,15 @@ def stop(self): BaseQueueEntry.stop(self) logging.getLogger("HWR").info("MXCuBE aborting current GΦL workflow") self.get_view().setText(1, "Stopped") + + def init_mxlims(self): + """Initialise MXLIMS MxExperimentMessage if it is not already set""" + + mxexperiment: mxmodel.MxExperimentMessage = self.get_mxlims_record() + if mxexperiment is None: + data_model = self.get_data_model() + self._mxlims_record = mxutils.create_mxrecord( + sample=data_model.get_sample_node(), + tracking_data=data_model.tracking_data, + measured_flux = HWR.beamline.flux.get_value() + ) diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py index e573ff8f86..4c10838b3e 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py @@ -37,8 +37,8 @@ import socket import subprocess import time -import uuid from collections import OrderedDict +from uuid import uuid1 import f90nml import gevent @@ -77,7 +77,6 @@ class GphlWorkflowStates(enum.Enum): COMPLETED = 4 UNKNOWN = 5 - __copyright__ = """ Copyright © 2016 - 2019 by Global Phasing Ltd. """ __license__ = "LGPLv3+" __author__ = "Rasmus H Fogh" @@ -260,6 +259,7 @@ def init(self): "WorkflowAborted": self.workflow_aborted, "WorkflowCompleted": self.workflow_completed, "WorkflowFailed": self.workflow_failed, + "StartEnactment": self.start_enactment, } # Set standard configurable file paths @@ -729,7 +729,6 @@ def query_pre_strategy_params(self, choose_lattice=None): dispatcher.connect( self.receive_pre_strategy_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) responses = dispatcher.send( self.PARAMETERS_NEEDED, @@ -752,7 +751,6 @@ def query_pre_strategy_params(self, choose_lattice=None): dispatcher.disconnect( self.receive_pre_strategy_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) self._return_parameters = None @@ -851,6 +849,34 @@ def pre_execute(self, queue_entry): self._workflow_queue = gevent.queue.Queue() + def start_enactment(self, enactment_id:str, correlation_id:str): + """Set enactment_id and initialise MXLIMS MxExperimentMessage""" + data_model = self._queue_entry.get_data_model() + tracking_data = data_model.tracking_data + workflow_parameters = data_model.workflow_parameters + tracking_data.uuid = enactment_id + tracking_data.workflow_uid = ( + workflow_parameters.get("workflow_uid") or enactment_id + ) + # NB it is not set it will be overwritten later + tracking_data.workflow_name = workflow_parameters.get("workflow_name") + tracking_data.workflow_type = ( + workflow_parameters.get("workflow_type") + or data_model.strategy_type + ) + tracking_data.location_id = ( + workflow_parameters.get("workflow_position_id") + or str(uuid1()) + ) + # NB first orientation only: + tracking_data.orientation_id = workflow_parameters.get( + "workflow_kappa_settings_id" + ) + tracking_data.characterisation_id = workflow_parameters.get( + "characterisation_id" + ) + self._queue_entry.init_mxlims() + def execute(self): if self._workflow_queue is None: @@ -1261,10 +1287,9 @@ def query_collection_strategy(self, geometric_strategy): raise ValueError( "invalid default recentring mode '%s' " % default_recentring_mode ) - use_modes = ["sweep"] + use_modes = ["sweep", "none"] if len(orientations) > 1: use_modes.append("start") - use_modes.append("none") if is_interleaved: use_modes.append("scan") for indx in range(len(modes) - 1, -1, -1): @@ -1378,7 +1403,6 @@ def query_collection_strategy(self, geometric_strategy): dispatcher.connect( self.receive_pre_collection_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) responses = dispatcher.send( self.PARAMETERS_NEEDED, @@ -1398,7 +1422,6 @@ def query_collection_strategy(self, geometric_strategy): dispatcher.disconnect( self.receive_pre_collection_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) self._return_parameters = None @@ -1949,6 +1972,7 @@ def collect_data(self, payload, correlation_id): last_orientation = () maxdev = -1 snapshotted_rotation_ids = set() + characterisation_id = None scan_numbers = {} for scan in scans: sweep = scan.sweep @@ -2043,31 +2067,48 @@ def collect_data(self, payload, correlation_id): # Handle orientations and (re) centring goniostatRotation = sweep.goniostatSweepSetting - rotation_id = orientation_id = goniostatRotation.id_ - - model_workflow_parameters = gphl_workflow_model.workflow_parameters - if not model_workflow_parameters.get("workflow_name"): - model_workflow_parameters["workflow_name"] = gphl_workflow_model.wfname - if not model_workflow_parameters.get("workflow_type"): - model_workflow_parameters["workflow_type"] = gphl_workflow_model.wftype - if not model_workflow_parameters.get("workflow_uid"): - model_workflow_parameters["workflow_uid"] = str( - HWR.beamline.gphl_connection._enactment_id - ) - if not model_workflow_parameters.get("workflow_position_id"): - # As of 20240911 all workflows use a single position, - model_workflow_parameters["workflow_position_id"] = str(uuid.uuid1()) + rotation_id = goniostatRotation.id_ + + + # handle mxlims + # handle workflow parameters + new_workflow_parameters = gphl_workflow_model.workflow_parameters.copy() + wf_tracking_data = gphl_workflow_model.tracking_data + data_collection = queue_model_objects.DataCollection([acq], crystal) + # Workflow parameters for ICAT / external workflow + # The 'if' statement is to allow this to work in multiple versions + data_collection.workflow_parameters = new_workflow_parameters + tracking_data = data_collection.tracking_data + tracking_data.uuid = str(scan.id_) + tracking_data.workflow_name = wf_tracking_data.workflow_name + tracking_data.workflow_type = wf_tracking_data.workflow_type + tracking_data.workflow_uid = wf_tracking_data.uuid + tracking_data.location_id = wf_tracking_data.location_id + tracking_data.orientation_id = rotation_id if ( gphl_workflow_model.wftype == "acquisition" and not gphl_workflow_model.characterisation_done - and not model_workflow_parameters.get("workflow_characterisation_id") ): - model_workflow_parameters["workflow_characterisation_id"] = str( - sweep.id_ - ) - model_workflow_parameters["workflow_kappa_settings_id"] = str( - orientation_id - ) + if characterisation_id is None: + # NB this is a hack - forces tharacterisation to be a single sweep + characterisation_id = str(sweep.id_) + tracking_data.characterisation_id = characterisation_id + wf_tracking_data.characterisation_id = characterisation_id + tracking_data.role = "Characterisation" + tracking_data.sweep_id = characterisation_id + else: + tracking_data.characterisation_id = wf_tracking_data.characterisation_id# + tracking_data.role = "Result" + tracking_data.sweep_id = str(sweep.id_) + tracking_data.scan_number = gphl_workflow_model.next_scan_number + gphl_workflow_model.next_scan_number += 1 + + new_workflow_parameters["workflow_name"] = tracking_data.workflow_name + new_workflow_parameters["workflow_type"] = tracking_data.workflow_type + new_workflow_parameters["workflow_uid"] = tracking_data.workflow_uid + new_workflow_parameters["workflow_position_id"] = tracking_data.location_id + new_workflow_parameters["characterisation_id"] = tracking_data.characterisation_id + new_workflow_parameters["workflow_kappa_settings_id"] = tracking_data.orientation_id initial_settings = sweep.get_initial_settings() orientation = ( @@ -2129,11 +2170,6 @@ def collect_data(self, payload, correlation_id): acq_parameters.num_images_per_trigger * acq_parameters.osc_range - sweep_offset ) - data_collection = queue_model_objects.DataCollection([acq], crystal) - # Workflow parameters for ICAT / external workflow - # The 'if' statement is to allow this to work in multiple versions - if hasattr(data_collection, "workflow_parameters"): - data_collection.workflow_parameters.update(model_workflow_parameters) data_collections.append(data_collection) data_collection.set_enabled(True) data_collection.ispyb_group_data_collections = True diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py index f6fdb77a4b..b378ae9fb7 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py @@ -619,6 +619,17 @@ def processMessage(self, py4j_message): "GΦL - response=%s jobId=%s messageId=%s" % (result.__class__.__name__, enactment_id, correlation_id) ) + if message_type == "ObtainPriorInformation": + # At this point we have the enactment_id and can set the workflow_id + self.workflow_queue.put_nowait( + ( + "StartEnactment", + self._enactment_id, + None, + None, + ) + ) + return self._response_to_server(result, correlation_id) elif message_type in ("WorkflowAborted", "WorkflowCompleted", "WorkflowFailed"): diff --git a/mxcubecore/configuration/mockup/gphl/gphl-setup.yml b/mxcubecore/configuration/mockup/gphl/gphl-setup.yml index 967bacfc29..5d2c0bc170 100644 --- a/mxcubecore/configuration/mockup/gphl/gphl-setup.yml +++ b/mxcubecore/configuration/mockup/gphl/gphl-setup.yml @@ -89,6 +89,6 @@ software_properties: # OPTIONAL. simcal *binary* For Mock collection emulation only. Not used by workflow co.gphl.wf.simcal.bin: - /alt/rhfogh/Software/GPhL/nightly_20240611/Files_workflow_TRUNK_alpha-bdg/autoPROC/bin/linux64/simcal + /alt/rhfogh/Software/GPhL/nightly_20241108/Files_workflow_TRUNK_alpha-bdg/autoPROC/bin/linux64/simcal co.gphl.wf.simcal.bdg_licence_dir: - /alt/rhfogh/Software/GPhL/nightly_20240611/Files_workflow_TRUNK_alpha-bdg + /alt/rhfogh/Software/GPhL/nightly_20241108/Files_workflow_TRUNK_alpha-bdg diff --git a/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml b/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml index a50f33e0a0..0fb5541bb5 100644 --- a/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml +++ b/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml @@ -42,7 +42,7 @@ settings: default_beam_energy_tag: Main # NB Temporary developer option. Defaults to 1 - allow_duplicate_orientations: 0 +# allow_duplicate_orientations: 0 # Suppress log output of programs called from GPhL workflow # Default to False @@ -240,6 +240,7 @@ workflows: - title: Phasing (SAD) strategy_type: phasing + short_name: SAD wf_selection: mxexpt variants: - full @@ -261,6 +262,7 @@ workflows: - title: Two-wavelength MAD strategy_type: phasing + short_name: "2wvlMAD" wf_selection: mxexpt variants: - quick @@ -291,6 +293,7 @@ workflows: - title: Three-wavelength MAD strategy_type: phasing + short_name: "3wvlMAD" wf_selection: mxexpt variants: - quick diff --git a/mxcubecore/model/queue_model_objects.py b/mxcubecore/model/queue_model_objects.py index e825c667c6..156d4d1cb5 100644 --- a/mxcubecore/model/queue_model_objects.py +++ b/mxcubecore/model/queue_model_objects.py @@ -26,6 +26,9 @@ import copy import logging import os +from typing import Optional + +from pydantic import Field, BaseModel from mxcubecore.model import queue_model_enumerables @@ -51,6 +54,66 @@ __license__ = "LGPLv3+" +class TrackingData(BaseModel): + """Data to connect different tasks into workflows, LIMS input, MXLIMS, etc. + + NB Should be harmonised and merged with workflow_parameters""" + + uuid: Optional[str] = Field( + default=None, + description="Unique identifier string for this queue_model_object", + ) + workflow_name: Optional[str] = Field( + default=None, + description="Name of workflow that this queue_model_object belongs to", + ) + workflow_type: Optional[str] = Field( + default=None, + description="Type of workflow that this queue_model_object belongs to", + ) + workflow_uid: Optional[str] = Field( + default=None, + description="Unique identifier string for the workflow this queue_model_object belongs to", + ) + location_id: Optional[str] = Field( + default=None, + description="Unique identifier string for the location / LogisticalSample " + "of this queue_model_object", + ) + orientation_id: Optional[str] = Field( + default=None, + description="Unique identifier string for the orientation (kappa/phi/chi settings) " + "for this queue_model_object", + ) + characterisation_id: Optional[str] = Field( + default=None, + description="Unique identifier string for characterisation data acquisition " + "that is relevant for this queue_model_object", + ) + sweep_id: Optional[str] = Field( + default=None, + description="Unique identifier string for the sweep that this queue_model_object " + "is part of. Used to combine multiple Acquisitions as scans of a single sweep." + ) + scan_number: Optional[int] = Field( + default=None, + description="Ordinal number (starting at 0), for this queue_model_object " + "in the experiment. Defines the time ordering of acquisitions and scans.", + ) + role: Optional[str] = Field( + default=None, + description="Role of this Task result within the experiment.", + json_schema_extra={ + "examples": [ + "Result", + "Intermediate", + "Characterisation", + "Centring", + ], + }, + ) + + class TaskNode(object): """ Objects that inherit TaskNode can be added to and handled by @@ -70,6 +133,8 @@ def __init__(self, task_data=None): self._requires_centring = True self._origin = None self._task_data = task_data + # tracking data for connecting jobs into workflows, mxlims output, etrc. + self.tracking_data: TrackingData = TrackingData() @property def task_data(self): @@ -1973,6 +2038,7 @@ def __init__(self): self.maximum_dose_budget = 20.0 self.decay_limit = 25 self.characterisation_budget_fraction = 0.05 + self.enactment_id = None # string. Only active mode currently is 'MASSIF1' self.automation_mode = None @@ -2027,6 +2093,9 @@ def __init__(self): self.acquisition_dose = 0.0 self.strategy_length = 0.0 + # Scan number for MXLIMS Scan ordering + self.next_scan_number = 0 + # Workflow attributes - for passing to LIMS (conf Olof Svensson) self.workflow_parameters = {} @@ -2132,7 +2201,7 @@ def set_pre_strategy_params( # noqa: C901 else: space_group = self.space_group if space_group == "None": - # Temporray fix - this should not happen + # Temporary fix - this should not happen # 20240926 Rasmus Fogh and Olof Svensson space_group = None if crystal_classes: @@ -2338,6 +2407,7 @@ def init_from_task_data(self, sample_model, params): raise ValueError( "No GΦL workflow strategy named %s found" % params["strategy_name"] ) + self.tracking_data.workflow_type = self.strategy_type self.shape = params.get("shape", "") for tag in ( @@ -2492,6 +2562,11 @@ def strategy_name(self): """ "Strategy full name, e.g. "Two-wavelength MAD" """ return self.strategy_settings["title"] + @property + def strategy_short_name(self): + """ "Strategy full name, e.g. "Two-wavelength MAD" """ + return self.strategy_settings["short_name"] + # Run name equal to base_prefix def get_name(self): # Required to conform to TaskNode diff --git a/mxcubecore/queue_entry/base_queue_entry.py b/mxcubecore/queue_entry/base_queue_entry.py index 82685fa583..0c92a8ea02 100644 --- a/mxcubecore/queue_entry/base_queue_entry.py +++ b/mxcubecore/queue_entry/base_queue_entry.py @@ -28,6 +28,8 @@ import time import traceback from collections import namedtuple +from typing import Optional +from datetime import datetime import gevent @@ -39,11 +41,16 @@ EXPERIMENT_TYPE, ) +from mxlims.pydantic import mxmodel +from mxcubecore.utils import mxlims as mxutils + + + + __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" __category__ = "General" - status_list = ["SUCCESS", "WARNING", "FAILED", "SKIPPED", "RUNNING", "NOT_EXECUTED"] QueueEntryStatusType = namedtuple("QueueEntryStatusType", status_list) QUEUE_ENTRY_STATUS = QueueEntryStatusType(0, 1, 2, 3, 4, 5) @@ -132,11 +139,11 @@ def swap(self, queue_entry_a, queue_entry_b): Throws a ValueError if one of the entries does not exist in the queue. - :param queue_entry: Queue entry to swap - :type queue_entry: QueueEntry + :param queue_entry_a: Queue entry to swap + :type queue_entry_a: QueueEntry - :param queue_entry: Queue entry to swap - :type queue_entry: QueueEntry + :param queue_entry_b: Queue entry to swap + :type queue_entry_b: QueueEntry """ index_a = None index_b = None @@ -176,7 +183,6 @@ def set_queue_controller(self, queue_controller): def get_queue_controller(self): """ :returns: The queue controller - :type queue_controller: QueueController """ return self._queue_controller @@ -214,6 +220,9 @@ def __init__(self, view=None, data_model=None, view_set_queue_entry=True): self.type_str = "" self._data_model.lims_session_id = HWR.beamline.session.session_id + # MXLIMS record for currently running experiment + self._mxlims_record: Optional[mxmodel.MxExperimentMessage] = None + def is_failed(self): """Returns True if failed""" return self.status == QUEUE_ENTRY_STATUS.FAILED @@ -286,6 +295,17 @@ def set_enabled(self, state): """ self._checked_for_exec = state + def get_mxlims_record(self) -> mxmodel.MxExperimentMessage: + """Get MxExperiment MXLIMS record if the entry is currently running""" + obj = self + result = None + container = obj.get_container() + while result is None and container is not None: + result = obj._mxlims_record + obj = container + container = obj.get_container() + return result + def execute(self): """ Execute method, should be overriden my subclasses, defines @@ -323,6 +343,14 @@ def post_execute(self): self.get_data_model().set_enabled(False) self.set_enabled(False) + mxlims_record = self._mxlims_record + if mxlims_record is not None: + self._mxlims_record = None + mxlims_record.job.end_time = datetime.now() + mxutils.export_mxrecord( + mxlims_record, None, + ) + # self._set_background_color() def _set_background_color(self): diff --git a/mxcubecore/queue_entry/data_collection.py b/mxcubecore/queue_entry/data_collection.py index 66ba691fcf..885701f857 100644 --- a/mxcubecore/queue_entry/data_collection.py +++ b/mxcubecore/queue_entry/data_collection.py @@ -19,6 +19,7 @@ import logging import gevent +import uuid from mxcubecore import HardwareRepository as HWR from mxcubecore.dispatcher import dispatcher @@ -35,6 +36,9 @@ center_before_collect, ) +from mxlims.pydantic import mxmodel +from mxcubecore.utils import mxlims as mxutils + __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" __category__ = "General" @@ -130,11 +134,55 @@ def pre_execute(self): data_model = self.get_data_model() + mxexperiment: mxmodel.MxExperimentMessage = self.get_mxlims_record() + if mxexperiment is None: + tracking_data = data_model.tracking_data + workflow_parameters = data_model.workflow_parameters + tracking_data.workflow_uid = workflow_parameters.get("workflow_uid") + tracking_data.uuid = tracking_data.workflow_uid or uuid.uuid1() + tracking_data.workflow_name = workflow_parameters.get("workflow_name") + tracking_data.workflow_type = ( + workflow_parameters.get("workflow_type") or data_model.experiment_type + ) + tracking_data.location_id = workflow_parameters.get("workflow_position_id") + # NB first orientation only: + tracking_data.orientation_id = workflow_parameters.get( + "workflow_kappa_settings_id" + ) + tracking_data.characterisation_id = workflow_parameters.get( + "characterisation_id" + ) + self._mxlims_record = mxutils.create_mxrecord( + sample=data_model.get_sample_node(), + tracking_data=tracking_data, + measured_flux = HWR.beamline.flux.get_value() + ) + if data_model.get_parent(): gid = data_model.get_parent().lims_group_id data_model.lims_group_id = gid def post_execute(self): + # Done in post_execute and *before* calling BaseQueueEntry + # so that beamline values are set and can be read off + # NBNB TODO look at pre-existing sweep UUIDs + detector = HWR.beamline.detector + # NB Detector distance is taken here rather than from parameters as a more + # reliable source and in preference to the definition-dependent resolution + beam_position = detector.get_beam_position() + if None in beam_position: + beam_position = None + beam = HWR.beamline.beam + data_model = self.get_data_model() + mxutils.add_data_collection( + self.get_mxlims_record(), + data_model, + beam_position=beam_position, + beam_size=beam.get_beam_size(), + beam_shape=beam.get_beam_shape().value, + detector_distance=detector.distance.get_value(), + ) + BaseQueueEntry.post_execute(self) qc = self.get_queue_controller() diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py new file mode 100644 index 0000000000..985d990091 --- /dev/null +++ b/mxcubecore/utils/mxlims.py @@ -0,0 +1,282 @@ +#! /usr/bin/env python +# encoding: utf-8 +# +""" + +License: + +This file is part of the MXLIMS collaboration. + +MXLIMS models and code are free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +MXLIMS is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with MXLIMS. If not, see . +""" + +__copyright__ = """ Copyright © 2024 - 2024 MXLIMS collaboration.""" +__author__ = "rhfogh" +__date__ = "05/11/2024" + +import os +import json +from datetime import datetime + +from typing import Optional +from uuid import uuid1 + +from mxcubecore.HardwareObjects.Native import xmlrpc_prefix +from mxlims.pydantic import mxmodel +from mxcubecore.model import queue_model_objects as qmo + + +def create_mxrecord( + sample: qmo.Sample, + tracking_data: dict, + start_time: datetime = None, + end_time: datetime = None, + job_status: str = None, + **parameters +) -> mxmodel.MxExperimentMessage: + """Create MxExperimentMessage mxlims record from datamodel + + Args: + sample: QueueModelObject representing sample + tracking_data: Dictionary with uuid etc. connecting sweeps and workflows + start_time: Experiment start time + end_time: Experiment end time + job_status: Job status (enumerated string) + **parameters: dict of parameters overriding/supplementing MxExperimentData + + Returns: + + """ + start_time = start_time or datetime.now() + crystal = sample.crystals[0] if sample.crystals else None + diffraction_plan = sample.diffraction_plan + jobuuid = tracking_data.uuid + workflow_name = tracking_data.workflow_name + if not workflow_name: + if diffraction_plan: + if hasattr(diffraction_plan, "experimentType"): + workflow_name = diffraction_plan.experimentType + else: + workflow_name = diffraction_plan.get("experimentType") + if not workflow_name: + workflow_name = parameters.pop("experiment_type", None) + jobpars = {"experiment_strategy": workflow_name} + + if diffraction_plan: + # It is not clear if diffraction_plan is a dict or an object, + # and if so which kind + + if hasattr(diffraction_plan, "aimedResolution"): + resolution = diffraction_plan.aimedResolution + else: + resolution = diffraction_plan.get("aimedResolution") + if resolution: + jobpars["expected_resolution"] = resolution + + if hasattr(diffraction_plan, "requiredCompleteness"): + completeness = diffraction_plan.requiredCompleteness + else: + completeness = diffraction_plan.get("requiredCompleteness") + if completeness: + jobpars["target_completeness"] = completeness + + if hasattr(diffraction_plan, "requiredMultiplicity"): + multiplicity = diffraction_plan.requiredMultiplicity + else: + multiplicity = diffraction_plan.get("requiredMultiplicity") + if multiplicity: + jobpars["target_multiplicity"] = multiplicity + jobpars.update(parameters) + jobdata = mxmodel.MxExperimentData(**jobpars) + + # CrystallographicSample + crystal_form = None + samplepars = {} + samplepars["name"] = ( + sample.name or sample.get_name() or (crystal and crystal.acronym) + ) + if crystal: + space_group_name = crystal.space_group + dd1 = { + "a": crystal.cell_a, + "b": crystal.cell_b, + "c": crystal.cell_c, + "alpha": crystal.cell_alpha, + "beta": crystal.cell_beta, + "gamma": crystal.cell_gamma, + } + unit_cell = mxmodel.UnitCell(**dd1) if all(dd1.values()) else None + if space_group_name or unit_cell: + samplepars["crystal_form"] = mxmodel.CrystalForm( + space_group_name=space_group_name, crystal_form=crystal_form + ) + + # LogisticalSample, not really modeled yet, so not much to put in + crystal_uuid = crystal.crystal_uuid + else: + crystal_uuid = None + # Set parameters from diffraction plan + if diffraction_plan: + # It is not clear if diffraction_plan is a dict or an object, + # and if so which kind + if hasattr(diffraction_plan, "radiationSensitivity"): + radiation_sensitivity = diffraction_plan.radiationSensitivity + else: + radiation_sensitivity = diffraction_plan.get("radiationSensitivity") + if radiation_sensitivity: + samplepars["radiation_sensitivity"] = radiation_sensitivity + sampledata = mxmodel.CrystallographicSampleData(**samplepars) + sample = mxmodel.CrystallographicSample(uuid=uuid1(), data=sampledata) + + crystaldata = mxmodel.CrystalData() + if crystal_uuid: + logistical_sample = mxmodel.Crystal( + uuid=crystal_uuid, sample_id=sample.uuid, data=crystaldata + ) + else: + logistical_sample = mxmodel.Crystal(uuid=uuid1(), sample_id=sample.uuid, data=crystaldata) + + experiment = mxmodel.MxExperiment( + uuid=jobuuid, + data=jobdata, + start_time=start_time, + end_time=end_time, + job_status=job_status, + sample_id=sample.uuid, + logistical_sample_id=logistical_sample.uuid + ) + # + return mxmodel.MxExperimentMessage( + job=experiment, sample=sample, logistical_sample=logistical_sample + ) + + +def add_data_collection( + mxrecord: mxmodel.MxExperimentMessage, + data_collection: qmo.DataCollection, + **parameters: dict, +) -> None: + """Add CollectionSweep record to MxExperiment in mxrecord + + Args: + mxrecord: container MxExperimentMessage + data_collection: DataCollection queue_model_object to add + **parameters: dict of parameters overriding/supplementing MxlimsData + + Returns: + + """ + + # ALwsy true in MXCuBE + SCAN_AXIS = "omega" + mxexperiment = mxrecord.job + + acquisition = data_collection.acquisitions[0] + path_template = acquisition.path_template + acqparams = acquisition.acquisition_parameters + tracking_data = data_collection.tracking_data + startpos = dict( + tpl + for tpl in acqparams.centred_position.as_dict().items() + if tpl[1] is not None + ) + axis_pos_start = acqparams.osc_start + axis_pos_end = axis_pos_start + acqparams.num_images * acqparams.osc_range + startpos[SCAN_AXIS] = axis_pos_start + startpos["detector_distance"] = acqparams.detector_distance + detector_distance = parameters.pop("detector_distance", None) + if detector_distance is not None: + startpos["detector_distance"] = detector_distance + scan = mxmodel.Scan( + scan_position_start=axis_pos_start, + first_image_number=acqparams.first_image, + number_images=acqparams.num_images, + ordinal=tracking_data.scan_number or 0, + ) + + sweep_id = tracking_data.sweep_id + sweep = None + if not mxexperiment.results: + mxexperiment.results = [] + for dataset in mxexperiment.results: + if str(dataset.uuid) == sweep_id: + sweep = dataset + break + if sweep: + # This is a scan for an existing sweep. Add ane update + sweep.data.scans.append(scan) + sweep.data.axis_positions_start[SCAN_AXIS] = min( + sweep.data.axis_positions_start[SCAN_AXIS], axis_pos_start + ) + sweep.data.axis_positions_end[SCAN_AXIS] = max( + sweep.data.axis_positions_end[SCAN_AXIS], axis_pos_end + ) + + else: + sweep_params = { + "uuid": sweep_id or tracking_data.uuid, + "source_id": mxexperiment.uuid, + "logistical_sample_id": mxrecord.logistical_sample.uuid, + "role": tracking_data.role, + } + sweepdata = { + "scan_axis": SCAN_AXIS, + "exposure_time": acqparams.exp_time, + "image_width": acqparams.osc_range, + "energy": acqparams.energy, + "transmission": acqparams.transmission, + "resolution": acqparams.resolution, + "detector_binning_mode": acqparams.detector_binning_mode, + "detector_roi_mode": acqparams.detector_roi_mode, + "overlap": acqparams.overlap, + "number_triggers": acqparams.num_triggers, + "number_images_per_trigger": acqparams.num_images_per_trigger, + "prefix": path_template.get_prefix(), + "file_type": path_template.suffix, + "filename_template": path_template.get_image_file_name(), + "path": path_template.directory, + "axis_positions_start": startpos, + "scans": [scan], + } + + sweepdata["axis_positions_end"] = {SCAN_AXIS: axis_pos_end} + + # NBNB how do we get the detector type? + # NBNB do we use MXCuBE axis names or standardised names? + + sweepdata.update(parameters) + dataset = mxmodel.CollectionSweep( + data=mxmodel.CollectionSweepData(**sweepdata), + **sweep_params + ) + mxexperiment.results.append(dataset) + + + +def export_mxrecord( + mxrecord: mxmodel.MxExperimentMessage, + path_template: Optional[qmo.PathTemplate] = None +): + """Export MxExperiment mxlims record to JSON file""" + if path_template is None: + path = mxrecord.job.results[-1].data.path + file_name = "MxExperiment.json" + else: + template = "MXExperiment_%s_%s.json" + file_name = template % (path_template.get_prefix(), path_template.run_number) + path = os.path.join(path_template.directory, file_name) + path = os.path.join(path, file_name) + print("@~@~ WRITING JSON TO", path) + with open(path, "w") as fp: + fp.write(mxrecord.model_dump_json(indent=4, exclude_none=True))