From 1dadad4fd4f22173809ded0f860cfc32f322dc16 Mon Sep 17 00:00:00 2001 From: rhfogh Date: Tue, 5 Nov 2024 15:35:21 +0000 Subject: [PATCH 1/7] WIP Working on MXExperiment generation --- .../HardwareObjects/Gphl/GphlQueueEntry.py | 7 +++ .../HardwareObjects/Gphl/GphlWorkflow.py | 9 +++ .../Gphl/GphlWorkflowConnection.py | 11 ++++ .../mockup/gphl/gphl-workflow.yml | 3 + mxcubecore/model/queue_model_objects.py | 3 +- mxcubecore/queue_entry/base_queue_entry.py | 24 ++++++++ mxcubecore/queue_entry/data_collection.py | 10 ++++ mxcubecore/utils/mxlims.py | 59 +++++++++++++++++++ 8 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 mxcubecore/utils/mxlims.py diff --git a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py index 3e168e082e..e3ef7bd880 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py @@ -63,3 +63,10 @@ def stop(self): BaseQueueEntry.stop(self) logging.getLogger("HWR").info("MXCuBE aborting current GΦL workflow") self.get_view().setText(1, "Stopped") + + def init_mxlims(self): + """Initialise MXLIMS MXExperiment if it is not already set""" + + mxexperiment: mxmodel.MXExperiment = self.get_mxlims_record() + if mxexperiment is None: + self._mxlims_record = mxutils.create_mxexperiment(self.get_data_model()) diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py index e573ff8f86..67d826873b 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py @@ -58,6 +58,8 @@ QueueAbortedException, ) +from mxlims import crystallography as mxmodel + @enum.unique class GphlWorkflowStates(enum.Enum): @@ -851,6 +853,11 @@ def pre_execute(self, queue_entry): self._workflow_queue = gevent.queue.Queue() + def start_enactment(self, enactment_id:str): + """Set enactment_id and initialise MXLIMS MXExperiment""" + self._queue_entry.get_data_model().enactment_id = enactment_id + self._queue_entry.start_enactment() + def execute(self): if self._workflow_queue is None: @@ -901,6 +908,8 @@ def execute(self): elif message_type == "String": if not self.settings.get("suppress_external_log_output"): func(payload, correlation_id) + elif message_type == "StartEnactment": + self.start_enactment(payload) else: logging.getLogger("HWR").info( "GΦL queue processing %s", message_type diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py index f6fdb77a4b..b378ae9fb7 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflowConnection.py @@ -619,6 +619,17 @@ def processMessage(self, py4j_message): "GΦL - response=%s jobId=%s messageId=%s" % (result.__class__.__name__, enactment_id, correlation_id) ) + if message_type == "ObtainPriorInformation": + # At this point we have the enactment_id and can set the workflow_id + self.workflow_queue.put_nowait( + ( + "StartEnactment", + self._enactment_id, + None, + None, + ) + ) + return self._response_to_server(result, correlation_id) elif message_type in ("WorkflowAborted", "WorkflowCompleted", "WorkflowFailed"): diff --git a/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml b/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml index a50f33e0a0..ed7c523927 100644 --- a/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml +++ b/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml @@ -240,6 +240,7 @@ workflows: - title: Phasing (SAD) strategy_type: phasing + short_name: SAD wf_selection: mxexpt variants: - full @@ -261,6 +262,7 @@ workflows: - title: Two-wavelength MAD strategy_type: phasing + short_name: "2wvlMAD" wf_selection: mxexpt variants: - quick @@ -291,6 +293,7 @@ workflows: - title: Three-wavelength MAD strategy_type: phasing + short_name: "3wvlMAD" wf_selection: mxexpt variants: - quick diff --git a/mxcubecore/model/queue_model_objects.py b/mxcubecore/model/queue_model_objects.py index e825c667c6..5397c591ba 100644 --- a/mxcubecore/model/queue_model_objects.py +++ b/mxcubecore/model/queue_model_objects.py @@ -1973,6 +1973,7 @@ def __init__(self): self.maximum_dose_budget = 20.0 self.decay_limit = 25 self.characterisation_budget_fraction = 0.05 + self.enactment_id = None # string. Only active mode currently is 'MASSIF1' self.automation_mode = None @@ -2132,7 +2133,7 @@ def set_pre_strategy_params( # noqa: C901 else: space_group = self.space_group if space_group == "None": - # Temporray fix - this should not happen + # Temporary fix - this should not happen # 20240926 Rasmus Fogh and Olof Svensson space_group = None if crystal_classes: diff --git a/mxcubecore/queue_entry/base_queue_entry.py b/mxcubecore/queue_entry/base_queue_entry.py index 82685fa583..a40dcdfee4 100644 --- a/mxcubecore/queue_entry/base_queue_entry.py +++ b/mxcubecore/queue_entry/base_queue_entry.py @@ -28,6 +28,7 @@ import time import traceback from collections import namedtuple +from typing import Optional import gevent @@ -39,10 +40,14 @@ EXPERIMENT_TYPE, ) +from mxlims.pydantic import crystallography as mxmodel +from utils import mxlims as mxutils + __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" __category__ = "General" +from numpy.lib.shape_base import apply_along_axis status_list = ["SUCCESS", "WARNING", "FAILED", "SKIPPED", "RUNNING", "NOT_EXECUTED"] QueueEntryStatusType = namedtuple("QueueEntryStatusType", status_list) @@ -214,6 +219,9 @@ def __init__(self, view=None, data_model=None, view_set_queue_entry=True): self.type_str = "" self._data_model.lims_session_id = HWR.beamline.session.session_id + # MXLIMS record for currently running experiment + self._mxlims_record: Optional[mxmodel.MXExperiment] = None + def is_failed(self): """Returns True if failed""" return self.status == QUEUE_ENTRY_STATUS.FAILED @@ -286,6 +294,17 @@ def set_enabled(self, state): """ self._checked_for_exec = state + def get_mxlims_record(self) -> mxmodel.MXExperiment: + """Get MXExperiment MXLIMS record if the entry is currently running""" + obj = self + result = None + while obj is not None: + result = obj._mxlims_record + if result is None: + obj = obj.get_container() + return result + + def execute(self): """ Execute method, should be overriden my subclasses, defines @@ -323,6 +342,11 @@ def post_execute(self): self.get_data_model().set_enabled(False) self.set_enabled(False) + mxlims_record = self._mxlims_record + if mxlims_record is not None: + self._mxlims_record = None + mxutils.export_mxexperiment(mxlims_record, self.get_data_model()) + # self._set_background_color() def _set_background_color(self): diff --git a/mxcubecore/queue_entry/data_collection.py b/mxcubecore/queue_entry/data_collection.py index 66ba691fcf..db9e550e8f 100644 --- a/mxcubecore/queue_entry/data_collection.py +++ b/mxcubecore/queue_entry/data_collection.py @@ -35,6 +35,9 @@ center_before_collect, ) +from mxlims.pydantic import crystallography as mxmodel +from utils import mxlims as mxutils + __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" __category__ = "General" @@ -130,6 +133,13 @@ def pre_execute(self): data_model = self.get_data_model() + mxexperiment: mxmodel.MXExperiment = self.get_mxlims_record() + if mxexperiment is None: + mxexperiment = mxutils.create_mxexperiment(data_model) + self._mxlims_record = mxexperiment + mxutils.add_sweep(mxexperiment, data_model) + + if data_model.get_parent(): gid = data_model.get_parent().lims_group_id data_model.lims_group_id = gid diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py new file mode 100644 index 0000000000..eeb3446619 --- /dev/null +++ b/mxcubecore/utils/mxlims.py @@ -0,0 +1,59 @@ +#! /usr/bin/env python +# encoding: utf-8 +# +""" + +License: + +This file is part of the MXLIMS collaboration. + +MXLIMS models and code are free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +MXLIMS is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with MXLIMS. If not, see . +""" + +__copyright__ = """ Copyright © 2024 - 2024 MXLIMS collaboration.""" +__author__ = "rhfogh" +__date__ = "05/11/2024" + +from mxlims.pydantic import crystallography as mxmodel +from mxcubecore.model import queue_model_objects as qmo +from mxlims.pydantic.crystallography import MXExperiment + + +def create_mxexperiment(datamodel: qmo.TaskNode) -> mxmodel.MXExperiment: + """Create MXExperiment mxlims record from datamodel""" + + sample = datamodel.get_sample_node() + + if isinstance(datamodel, qmo.GphlWorkflow): + # Initialise MXExperiment from GPhL workflow + prefix = "GPhL." + settings = datamodel.strategy_settings + short_name = settings.get("short_name", settings.get("strategy_type")) + result = MXExperiment(experiment_strategy = prefix+short_name) + elif isinstance(datamodel, qmo.DataCollection): + # Initialise MXExperimnent from single Acquisition + result = MXExperiment(experiment_strategy = datamodel.experiemnt_type) + else: + raise ValueError("Unsupported queue_model_object: %s" % self) + # + return result + +def add_sweep(mxexperiment: mxmodel.MXExperiment, acquisition: qmo.Acquisition): + """Add CollectionSweep record to MXExperiment""" + pass + +def export_mxexperiment(mxexperiment: mxmodel.MXExperiment, + datamodel: qmo.TaskNode): + """Export MXExperiment mxlims record to JSON file""" + pass \ No newline at end of file From 8e6f82854dc1c2c4b000024bc3791686f8db8b52 Mon Sep 17 00:00:00 2001 From: rhfogh Date: Wed, 6 Nov 2024 15:28:25 +0000 Subject: [PATCH 2/7] Finished create_mxexperiment function --- mxcubecore/utils/mxlims.py | 97 +++++++++++++++++++++++++++++++++++--- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py index eeb3446619..11f5b0d94a 100644 --- a/mxcubecore/utils/mxlims.py +++ b/mxcubecore/utils/mxlims.py @@ -33,27 +33,112 @@ def create_mxexperiment(datamodel: qmo.TaskNode) -> mxmodel.MXExperiment: """Create MXExperiment mxlims record from datamodel""" + # Add MXSample and LogisticalSample sample = datamodel.get_sample_node() + crystal = sample.crystals[0] if sample.crystals else None + diffraction_plan = sample.diffraction_plan + # LogisticalSample, not really modeled yet, so not much to put in + crystal_uuid = crystal.uuid if crystal else None + if crystal_uuid: + logistical_sample = mxmodel.LogisticalSample(uuid=crystal_uuid) + else: + logistical_sample = mxmodel.LogisticalSample() + result.logistical_sample = logistical_sample + + # MXSample + samplepars = {} + samplepars["name"] = ( + sample.name or sample.get_name() or (crystal and crystal.acronym) + ) + if crystal: + space_group_name = crystal.space_group + if space_group_name: + samplepars["space_group_name"] = space_group_name + dd1 = { + "a": crystal.cell_a, + "b": crystal.cell_b, + "c": crystal.cell_c, + "alpha": crystal.cell_alpha, + "beta": crystal.cell_beta, + "gamma": crystal.cell_gamma, + } + if all(dd1.values()): + samplepars["unit_cell"] = mxmodel.UnitCell(**dd1) + + # Set parameters from diffraction plan + if diffraction_plan: + # It is not clear if diffraction_plan is a dict or an object, + # and if so which kind + if hasattr(diffraction_plan, "radiationSensitivity"): + radiation_sensitivity = diffraction_plan.radiationSensitivity + else: + radiation_sensitivity = diffraction_plan.get("radiationSensitivity") + if radiation_sensitivity: + samplepars["radiation_sensitivity"] = radiation_sensitivity + + if hasattr(diffraction_plan, "aimedResolution"): + resolution = diffraction_plan.aimedResolution + else: + resolution = diffraction_plan.get("aimedResolution") + if resolution: + samplepars["expected_resolution"] = resolution + + if hasattr(diffraction_plan, "requiredCompleteness"): + completeness = diffraction_plan.requiredCompleteness + else: + completeness = diffraction_plan.get("requiredCompleteness") + if completeness: + samplepars["target_completeness"] = completeness + + if hasattr(diffraction_plan, "requiredMultiplicity"): + multiplicity = diffraction_plan.requiredMultiplicity + else: + multiplicity = diffraction_plan.get("requiredMultiplicity") + if multiplicity: + samplepars["target_multiplicity"] = multiplicity + sample = mxmodel.MXSample(**samplepars) + + # Create MXExperiment if isinstance(datamodel, qmo.GphlWorkflow): # Initialise MXExperiment from GPhL workflow prefix = "GPhL." settings = datamodel.strategy_settings short_name = settings.get("short_name", settings.get("strategy_type")) - result = MXExperiment(experiment_strategy = prefix+short_name) + result = MXExperiment( + uuid=datamodel.enactment_id, + experiment_strategy=prefix + short_name, + sample=sample, + logistical_sample=logistical_sample, + ) + elif isinstance(datamodel, qmo.DataCollection): - # Initialise MXExperimnent from single Acquisition - result = MXExperiment(experiment_strategy = datamodel.experiemnt_type) + # Initialise MXExperiment from single Acquisition + if diffraction_plan: + if hasattr(diffraction_plan, "experimentType"): + experiment_strategy = diffraction_plan.experimentType + else: + experiment_strategy = diffraction_plan.get("experimentType") + experiment_strategy = experiment_strategy or datamodel.experiment_type + + result = MXExperiment( + experiment_strategy=experiment_strategy, + sample=sample, + logistical_sample=logistical_sample, + ) + else: raise ValueError("Unsupported queue_model_object: %s" % self) + # return result + def add_sweep(mxexperiment: mxmodel.MXExperiment, acquisition: qmo.Acquisition): """Add CollectionSweep record to MXExperiment""" pass -def export_mxexperiment(mxexperiment: mxmodel.MXExperiment, - datamodel: qmo.TaskNode): + +def export_mxexperiment(mxexperiment: mxmodel.MXExperiment, datamodel: qmo.TaskNode): """Export MXExperiment mxlims record to JSON file""" - pass \ No newline at end of file + pass From 11fa3327fc8fda901072719cdc6a7e5b5036ef3a Mon Sep 17 00:00:00 2001 From: rhfogh Date: Mon, 11 Nov 2024 17:34:37 +0000 Subject: [PATCH 3/7] Bug fixes and WIP improving MXLIMS handling --- mxcubecore/HardwareObjects/Beamline.py | 2 +- .../HardwareObjects/Gphl/GphlQueueEntry.py | 11 +- .../HardwareObjects/Gphl/GphlWorkflow.py | 105 +++++---- mxcubecore/model/queue_model_objects.py | 71 ++++++ mxcubecore/queue_entry/base_queue_entry.py | 30 ++- mxcubecore/queue_entry/data_collection.py | 52 +++- mxcubecore/utils/mxlims.py | 222 +++++++++++++----- 7 files changed, 367 insertions(+), 126 deletions(-) diff --git a/mxcubecore/HardwareObjects/Beamline.py b/mxcubecore/HardwareObjects/Beamline.py index 686a1f3f09..d4de0e04c2 100644 --- a/mxcubecore/HardwareObjects/Beamline.py +++ b/mxcubecore/HardwareObjects/Beamline.py @@ -882,7 +882,7 @@ def get_default_acquisition_parameters(self, acquisition_type="default"): acq_parameters.detector_binning_mode = "" try: - acq_parameters.detector_roi_mode = self.detector.get_roi_mode() + acq_parameters.detector_roi_mode = self.detector.get_roi_mode_name() except Exception: logging.getLogger("HWR").warning( "get_default_acquisition_parameters: " diff --git a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py index e3ef7bd880..12521ec32c 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py @@ -23,10 +23,14 @@ import logging +from datetime import datetime from mxcubecore import HardwareRepository as HWR from mxcubecore.queue_entry.base_queue_entry import BaseQueueEntry +from mxlims.pydantic import crystallography as mxmodel +from mxcubecore.utils import mxlims as mxutils + __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" __category__ = "queue" @@ -69,4 +73,9 @@ def init_mxlims(self): mxexperiment: mxmodel.MXExperiment = self.get_mxlims_record() if mxexperiment is None: - self._mxlims_record = mxutils.create_mxexperiment(self.get_data_model()) + data_model = self.get_data_model() + self._mxlims_record = mxutils.create_mxexperiment( + data_model, + start_time = datetime.now(), + measured_flux = HWR.beamline.flux.get_value() + ) diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py index 67d826873b..17982acf38 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py @@ -37,7 +37,6 @@ import socket import subprocess import time -import uuid from collections import OrderedDict import f90nml @@ -58,8 +57,6 @@ QueueAbortedException, ) -from mxlims import crystallography as mxmodel - @enum.unique class GphlWorkflowStates(enum.Enum): @@ -79,7 +76,6 @@ class GphlWorkflowStates(enum.Enum): COMPLETED = 4 UNKNOWN = 5 - __copyright__ = """ Copyright © 2016 - 2019 by Global Phasing Ltd. """ __license__ = "LGPLv3+" __author__ = "Rasmus H Fogh" @@ -240,6 +236,9 @@ def __init__(self, name): self.recentring_file = None + # Scan number for MXLISM Scan ordering + self.next_scan_number = 0 + # # TEST mxcubeweb UI # self.gevent_event = gevent.event.Event() # self.params_dict = {} @@ -262,6 +261,7 @@ def init(self): "WorkflowAborted": self.workflow_aborted, "WorkflowCompleted": self.workflow_completed, "WorkflowFailed": self.workflow_failed, + "StartEnactment": self.start_enactment, } # Set standard configurable file paths @@ -731,7 +731,6 @@ def query_pre_strategy_params(self, choose_lattice=None): dispatcher.connect( self.receive_pre_strategy_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) responses = dispatcher.send( self.PARAMETERS_NEEDED, @@ -754,7 +753,6 @@ def query_pre_strategy_params(self, choose_lattice=None): dispatcher.disconnect( self.receive_pre_strategy_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) self._return_parameters = None @@ -853,10 +851,30 @@ def pre_execute(self, queue_entry): self._workflow_queue = gevent.queue.Queue() - def start_enactment(self, enactment_id:str): + def start_enactment(self, enactment_id:str, correlation_id:str): """Set enactment_id and initialise MXLIMS MXExperiment""" - self._queue_entry.get_data_model().enactment_id = enactment_id - self._queue_entry.start_enactment() + data_model = self._queue_entry.get_data_model() + tracking_data = data_model.tracking_data + workflow_parameters = data_model.workflow_parameters + tracking_data.uuid = enactment_id + tracking_data.workflow_uid = ( + workflow_parameters.get("workflow_uid") or enactment_id + ) + # NB it is not set it will be overwritten later + tracking_data.workflow_name = workflow_parameters.get("workflow_name") + tracking_data.workflow_type = ( + workflow_parameters.get("workflow_type") + or data_model.strategy_type + ) + tracking_data.location_id = workflow_parameters.get("workflow_position_id") + # NB first orientation only: + tracking_data.orientation_id = workflow_parameters.get( + "workflow_kappa_settings_id" + ) + tracking_data.characterisation_id = workflow_parameters.get( + "characterisation_id" + ) + self._queue_entry.init_mxlims() def execute(self): @@ -908,8 +926,6 @@ def execute(self): elif message_type == "String": if not self.settings.get("suppress_external_log_output"): func(payload, correlation_id) - elif message_type == "StartEnactment": - self.start_enactment(payload) else: logging.getLogger("HWR").info( "GΦL queue processing %s", message_type @@ -1270,10 +1286,9 @@ def query_collection_strategy(self, geometric_strategy): raise ValueError( "invalid default recentring mode '%s' " % default_recentring_mode ) - use_modes = ["sweep"] + use_modes = ["sweep", "none"] if len(orientations) > 1: use_modes.append("start") - use_modes.append("none") if is_interleaved: use_modes.append("scan") for indx in range(len(modes) - 1, -1, -1): @@ -1387,7 +1402,6 @@ def query_collection_strategy(self, geometric_strategy): dispatcher.connect( self.receive_pre_collection_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) responses = dispatcher.send( self.PARAMETERS_NEEDED, @@ -1407,7 +1421,6 @@ def query_collection_strategy(self, geometric_strategy): dispatcher.disconnect( self.receive_pre_collection_data, self.PARAMETER_RETURN_SIGNAL, - dispatcher.Any, ) self._return_parameters = None @@ -1958,7 +1971,6 @@ def collect_data(self, payload, correlation_id): last_orientation = () maxdev = -1 snapshotted_rotation_ids = set() - scan_numbers = {} for scan in scans: sweep = scan.sweep acq = queue_model_objects.Acquisition() @@ -2052,31 +2064,45 @@ def collect_data(self, payload, correlation_id): # Handle orientations and (re) centring goniostatRotation = sweep.goniostatSweepSetting - rotation_id = orientation_id = goniostatRotation.id_ - - model_workflow_parameters = gphl_workflow_model.workflow_parameters - if not model_workflow_parameters.get("workflow_name"): - model_workflow_parameters["workflow_name"] = gphl_workflow_model.wfname - if not model_workflow_parameters.get("workflow_type"): - model_workflow_parameters["workflow_type"] = gphl_workflow_model.wftype - if not model_workflow_parameters.get("workflow_uid"): - model_workflow_parameters["workflow_uid"] = str( - HWR.beamline.gphl_connection._enactment_id - ) - if not model_workflow_parameters.get("workflow_position_id"): - # As of 20240911 all workflows use a single position, - model_workflow_parameters["workflow_position_id"] = str(uuid.uuid1()) + rotation_id = goniostatRotation.id_ + + + # handle mxlims + # handle workflow parameters + new_workflow_parameters = gphl_workflow_model.workflow_parameters.copy() + wf_tracking_data = gphl_workflow_model.tracking_data + data_collection = queue_model_objects.DataCollection([acq], crystal) + # Workflow parameters for ICAT / external workflow + # The 'if' statement is to allow this to work in multiple versions + data_collection.workflow_parameters = new_workflow_parameters + tracking_data = data_collection.tracking_data + tracking_data.uuid = scan.id_ + tracking_data.workflow_name = wf_tracking_data.experiment_strategy + tracking_data.workflow_type = wf_tracking_data.workflow_type + tracking_data.workflow_uid = wf_tracking_data.uuid + tracking_data.location_id = wf_tracking_data.location_id + tracking_data.orientation_id = rotation_id + tracking_data.sweep_id = sweep.id_ if ( gphl_workflow_model.wftype == "acquisition" and not gphl_workflow_model.characterisation_done - and not model_workflow_parameters.get("workflow_characterisation_id") ): - model_workflow_parameters["workflow_characterisation_id"] = str( - sweep.id_ - ) - model_workflow_parameters["workflow_kappa_settings_id"] = str( - orientation_id - ) + characterisation_id = sweep.id_ + tracking_data.characterisation_id = characterisation_id# + wf_tracking_data.characterisation_id = characterisation_id + tracking_data.role = "Characterisation" + else: + tracking_data.characterisation_id = wf_tracking_data.characterisation_id# + tracking_data.role = "Result" + tracking_data.scan_number = self.next_scan_number + self.next_scan_number += 1 + + new_workflow_parameters["workflow_name"] = tracking_data.workflow_name + new_workflow_parameters["workflow_type"] = tracking_data.workflow_type + new_workflow_parameters["workflow_uid"] = tracking_data.workflow_uid + new_workflow_parameters["workflow_position_id"] = tracking_data.location_id + new_workflow_parameters["characterisation_id"] = tracking_data.characterisation_id + new_workflow_parameters["workflow_kappa_settings_id"] = tracking_data.orientation_id initial_settings = sweep.get_initial_settings() orientation = ( @@ -2138,11 +2164,6 @@ def collect_data(self, payload, correlation_id): acq_parameters.num_images_per_trigger * acq_parameters.osc_range - sweep_offset ) - data_collection = queue_model_objects.DataCollection([acq], crystal) - # Workflow parameters for ICAT / external workflow - # The 'if' statement is to allow this to work in multiple versions - if hasattr(data_collection, "workflow_parameters"): - data_collection.workflow_parameters.update(model_workflow_parameters) data_collections.append(data_collection) data_collection.set_enabled(True) data_collection.ispyb_group_data_collections = True diff --git a/mxcubecore/model/queue_model_objects.py b/mxcubecore/model/queue_model_objects.py index 5397c591ba..38b8cf082c 100644 --- a/mxcubecore/model/queue_model_objects.py +++ b/mxcubecore/model/queue_model_objects.py @@ -26,6 +26,9 @@ import copy import logging import os +from typing import Optional + +from pydantic import Field, BaseModel from mxcubecore.model import queue_model_enumerables @@ -51,6 +54,66 @@ __license__ = "LGPLv3+" +class TrackingData(BaseModel): + """Data to connect different tasks into workflows, LIMS input, MXLIMS, etc. + + NB Should be harmonised and merged with workflow_parameters""" + + uuid: Optional[str] = Field( + default=None, + description="Unique identifier string for this queue_model_object", + ) + workflow_name: Optional[str] = Field( + default=None, + description="Name of workflow that this queue_model_object belongs to", + ) + workflow_type: Optional[str] = Field( + default=None, + description="Type of workflow that this queue_model_object belongs to", + ) + workflow_uid: Optional[str] = Field( + default=None, + description="Unique identifier string for the workflow this queue_model_object belongs to", + ) + location_id: Optional[str] = Field( + default=None, + description="Unique identifier string for the location / LogisticalSample " + "of this queue_model_object", + ) + orientation_id: Optional[str] = Field( + default=None, + description="Unique identifier string for the orientation (kappa/phi/chi settings) " + "for this queue_model_object", + ) + characterisation_id: Optional[str] = Field( + default=None, + description="Unique identifier string for characterisation data acquisition " + "that is relevant for this queue_model_object", + ) + sweep_id: Optional[str] = Field( + default=None, + description="Unique identifier string for the sweep that this queue_model_object " + "is part of. Used to combine multiple Acquisitions as scans of a single sweep." + ) + scan_number: Optional[int] = Field( + default=None, + description="Ordinal number (starting at 0), for this queue_model_object " + "in the experiment. Defines the time ordering of acquisitions and scans.", + ) + role: Optional[str] = Field( + default=None, + description="Role of this Task result within the experiment.", + json_schema_extra={ + "examples": [ + "Result", + "Intermediate", + "Characterisation", + "Centring", + ], + }, + ) + + class TaskNode(object): """ Objects that inherit TaskNode can be added to and handled by @@ -70,6 +133,8 @@ def __init__(self, task_data=None): self._requires_centring = True self._origin = None self._task_data = task_data + # tracking data for connecting jobs into workflows, mxlims output, etrc. + self.tracking_data: TrackingData = TrackingData() @property def task_data(self): @@ -2339,6 +2404,7 @@ def init_from_task_data(self, sample_model, params): raise ValueError( "No GΦL workflow strategy named %s found" % params["strategy_name"] ) + self.tracking_data.workflow_type = self.strategy_type self.shape = params.get("shape", "") for tag in ( @@ -2493,6 +2559,11 @@ def strategy_name(self): """ "Strategy full name, e.g. "Two-wavelength MAD" """ return self.strategy_settings["title"] + @property + def strategy_short_name(self): + """ "Strategy full name, e.g. "Two-wavelength MAD" """ + return self.strategy_settings["short_name"] + # Run name equal to base_prefix def get_name(self): # Required to conform to TaskNode diff --git a/mxcubecore/queue_entry/base_queue_entry.py b/mxcubecore/queue_entry/base_queue_entry.py index a40dcdfee4..156ed5d4dc 100644 --- a/mxcubecore/queue_entry/base_queue_entry.py +++ b/mxcubecore/queue_entry/base_queue_entry.py @@ -29,6 +29,7 @@ import traceback from collections import namedtuple from typing import Optional +from datetime import datetime import gevent @@ -41,14 +42,15 @@ ) from mxlims.pydantic import crystallography as mxmodel -from utils import mxlims as mxutils +from mxcubecore.utils import mxlims as mxutils + + + __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" __category__ = "General" -from numpy.lib.shape_base import apply_along_axis - status_list = ["SUCCESS", "WARNING", "FAILED", "SKIPPED", "RUNNING", "NOT_EXECUTED"] QueueEntryStatusType = namedtuple("QueueEntryStatusType", status_list) QUEUE_ENTRY_STATUS = QueueEntryStatusType(0, 1, 2, 3, 4, 5) @@ -137,11 +139,11 @@ def swap(self, queue_entry_a, queue_entry_b): Throws a ValueError if one of the entries does not exist in the queue. - :param queue_entry: Queue entry to swap - :type queue_entry: QueueEntry + :param queue_entry_a: Queue entry to swap + :type queue_entry_a: QueueEntry - :param queue_entry: Queue entry to swap - :type queue_entry: QueueEntry + :param queue_entry_b: Queue entry to swap + :type queue_entry_b: QueueEntry """ index_a = None index_b = None @@ -181,7 +183,6 @@ def set_queue_controller(self, queue_controller): def get_queue_controller(self): """ :returns: The queue controller - :type queue_controller: QueueController """ return self._queue_controller @@ -298,13 +299,13 @@ def get_mxlims_record(self) -> mxmodel.MXExperiment: """Get MXExperiment MXLIMS record if the entry is currently running""" obj = self result = None - while obj is not None: + container = obj.get_container() + while result is None and container is not None: result = obj._mxlims_record - if result is None: - obj = obj.get_container() + obj = container + container = obj.get_container() return result - def execute(self): """ Execute method, should be overriden my subclasses, defines @@ -345,7 +346,10 @@ def post_execute(self): mxlims_record = self._mxlims_record if mxlims_record is not None: self._mxlims_record = None - mxutils.export_mxexperiment(mxlims_record, self.get_data_model()) + mxlims_record.end_time = datetime.now() + mxutils.export_mxexperiment( + mxlims_record, None, + ) # self._set_background_color() diff --git a/mxcubecore/queue_entry/data_collection.py b/mxcubecore/queue_entry/data_collection.py index db9e550e8f..e3c71d71d7 100644 --- a/mxcubecore/queue_entry/data_collection.py +++ b/mxcubecore/queue_entry/data_collection.py @@ -17,8 +17,10 @@ # along with MXCuBE. If not, see . import logging +from datetime import datetime import gevent +import uuid from mxcubecore import HardwareRepository as HWR from mxcubecore.dispatcher import dispatcher @@ -36,7 +38,7 @@ ) from mxlims.pydantic import crystallography as mxmodel -from utils import mxlims as mxutils +from mxcubecore.utils import mxlims as mxutils __credits__ = ["MXCuBE collaboration"] __license__ = "LGPLv3+" @@ -135,16 +137,58 @@ def pre_execute(self): mxexperiment: mxmodel.MXExperiment = self.get_mxlims_record() if mxexperiment is None: - mxexperiment = mxutils.create_mxexperiment(data_model) + tracking_data = data_model.tracking_data + workflow_parameters = data_model.workflow_parameters + tracking_data.workflow_uid = workflow_parameters.get("workflow_uid") + tracking_data.uuid = tracking_data.workflow_uid or uuid.uuid1() + tracking_data.workflow_name = workflow_parameters.get("workflow_name") + tracking_data.workflow_type = ( + workflow_parameters.get("workflow_type") or data_model.experiment_type + ) + tracking_data.location_id = workflow_parameters.get("workflow_position_id") + # NB first orientation only: + tracking_data.orientation_id = workflow_parameters.get( + "workflow_kappa_settings_id" + ) + tracking_data.characterisation_id = workflow_parameters.get( + "characterisation_id" + ) + mxexperiment = mxutils.create_mxexperiment( + data_model, + start_time=datetime.now(), + measured_flux=HWR.beamline.flux.get_value(), + ) self._mxlims_record = mxexperiment - mxutils.add_sweep(mxexperiment, data_model) - if data_model.get_parent(): gid = data_model.get_parent().lims_group_id data_model.lims_group_id = gid def post_execute(self): + # Done in post_execute and *before* calling BaseQueueEntry + # so that beamline values are set and can be read off + # NBNB TODO look at pre-existing sweep UUIDs + detector = HWR.beamline.detector + # NB Detector distance is taken here rather than from parameters as a more + # reliable source and in preference to the definition-dependent resolution + beam_position = detector.get_beam_position() + if None in beam_position: + beam_position = None + beam = HWR.beamline.beam + scan_position_end = HWR.beamline.diffractometer.omega.get_value() + data_model = self.get_data_model() + # This would be a good place to check that scan_pos_end matches input parameters + # There have been tricky bugs found where this was not the case + mxutils.add_sweep( + self.get_mxlims_record(), + data_model, + beam_position=beam_position, + beam_size=beam.get_beam_size(), + beam_shape=beam.get_beam_shape().value, + detector_distance=detector.distance.get_value(), + scan_position_end=scan_position_end, + ) + BaseQueueEntry.post_execute(self) qc = self.get_queue_controller() diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py index 11f5b0d94a..f2a770a28b 100644 --- a/mxcubecore/utils/mxlims.py +++ b/mxcubecore/utils/mxlims.py @@ -25,26 +25,68 @@ __author__ = "rhfogh" __date__ = "05/11/2024" +import os +import json + +from typing import Optional from mxlims.pydantic import crystallography as mxmodel from mxcubecore.model import queue_model_objects as qmo -from mxlims.pydantic.crystallography import MXExperiment +from mxlims.pydantic import core -def create_mxexperiment(datamodel: qmo.TaskNode) -> mxmodel.MXExperiment: - """Create MXExperiment mxlims record from datamodel""" +def create_mxexperiment( + datamodel: qmo.TaskNode, **parameters +) -> mxmodel.MXExperiment: + """Create MXExperiment mxlims record from datamodel - # Add MXSample and LogisticalSample + Args: + datamodel: QueueModelObject representing experiment + uuid: String containing globally unique identifier + **parameters: dict of parameters overriding/supplementing datamodel + + Returns: + + """ sample = datamodel.get_sample_node() + tracking_data = datamodel.tracking_data crystal = sample.crystals[0] if sample.crystals else None diffraction_plan = sample.diffraction_plan + initpars = {"uuid": tracking_data.uuid} + workflow_name = tracking_data.workflow_name + if not workflow_name: + if diffraction_plan: + if hasattr(diffraction_plan, "experimentType"): + workflow_name = diffraction_plan.experimentType + else: + workflow_name = diffraction_plan.get("experimentType") + workflow_name = workflow_name or datamodel.experiment_type - # LogisticalSample, not really modeled yet, so not much to put in - crystal_uuid = crystal.uuid if crystal else None - if crystal_uuid: - logistical_sample = mxmodel.LogisticalSample(uuid=crystal_uuid) - else: - logistical_sample = mxmodel.LogisticalSample() - result.logistical_sample = logistical_sample + if diffraction_plan: + # It is not clear if diffraction_plan is a dict or an object, + # and if so which kind + + if hasattr(diffraction_plan, "aimedResolution"): + resolution = diffraction_plan.aimedResolution + else: + resolution = diffraction_plan.get("aimedResolution") + if resolution: + initpars["expected_resolution"] = resolution + + if hasattr(diffraction_plan, "requiredCompleteness"): + completeness = diffraction_plan.requiredCompleteness + else: + completeness = diffraction_plan.get("requiredCompleteness") + if completeness: + initpars["target_completeness"] = completeness + + if hasattr(diffraction_plan, "requiredMultiplicity"): + multiplicity = diffraction_plan.requiredMultiplicity + else: + multiplicity = diffraction_plan.get("requiredMultiplicity") + if multiplicity: + initpars["target_multiplicity"] = multiplicity + + # Add MXSample and LogisticalSample # MXSample samplepars = {} @@ -77,68 +119,118 @@ def create_mxexperiment(datamodel: qmo.TaskNode) -> mxmodel.MXExperiment: if radiation_sensitivity: samplepars["radiation_sensitivity"] = radiation_sensitivity - if hasattr(diffraction_plan, "aimedResolution"): - resolution = diffraction_plan.aimedResolution - else: - resolution = diffraction_plan.get("aimedResolution") - if resolution: - samplepars["expected_resolution"] = resolution - - if hasattr(diffraction_plan, "requiredCompleteness"): - completeness = diffraction_plan.requiredCompleteness - else: - completeness = diffraction_plan.get("requiredCompleteness") - if completeness: - samplepars["target_completeness"] = completeness - - if hasattr(diffraction_plan, "requiredMultiplicity"): - multiplicity = diffraction_plan.requiredMultiplicity - else: - multiplicity = diffraction_plan.get("requiredMultiplicity") - if multiplicity: - samplepars["target_multiplicity"] = multiplicity sample = mxmodel.MXSample(**samplepars) + initpars["sample"] = sample - # Create MXExperiment - if isinstance(datamodel, qmo.GphlWorkflow): - # Initialise MXExperiment from GPhL workflow - prefix = "GPhL." - settings = datamodel.strategy_settings - short_name = settings.get("short_name", settings.get("strategy_type")) - result = MXExperiment( - uuid=datamodel.enactment_id, - experiment_strategy=prefix + short_name, - sample=sample, - logistical_sample=logistical_sample, - ) - - elif isinstance(datamodel, qmo.DataCollection): - # Initialise MXExperiment from single Acquisition - if diffraction_plan: - if hasattr(diffraction_plan, "experimentType"): - experiment_strategy = diffraction_plan.experimentType - else: - experiment_strategy = diffraction_plan.get("experimentType") - experiment_strategy = experiment_strategy or datamodel.experiment_type - - result = MXExperiment( - experiment_strategy=experiment_strategy, - sample=sample, - logistical_sample=logistical_sample, - ) - + # LogisticalSample, not really modeled yet, so not much to put in + crystal_uuid = crystal.crystal_uuid if crystal else None + if crystal_uuid: + logistical_sample = core.LogisticalSample(uuid=crystal_uuid) else: - raise ValueError("Unsupported queue_model_object: %s" % self) + logistical_sample = core.LogisticalSample() + logistical_sample.sample_ref = core.LogisticalSampleRef(target_uuid=sample.uuid) + initpars["logistical_sample"] = logistical_sample + sample.logistical_sample_refs.append( + core.LogisticalSampleRef(target_uuid=logistical_sample.uuid) + ) + initpars["logistical_sample_ref"] = core.LogisticalSampleRef( + target_uuid=logistical_sample.uuid + ) - # + initpars.update(parameters) + result = mxmodel.MXExperiment(**initpars) return result -def add_sweep(mxexperiment: mxmodel.MXExperiment, acquisition: qmo.Acquisition): +def add_sweep( + mxexperiment: mxmodel.MXExperiment, + sweep: qmo.DataCollection, + **parameters: dict, +) -> None: + """ + + Args: + mxexperiment: container MXExperiment + sweep: DataCollection queue_model_object to add + uuid: String containing globally unique identifier + **parameters: dict of parameters overriding/supplementing datamodel + + Returns: + + """ """Add CollectionSweep record to MXExperiment""" - pass + # ALwsy true in MXCuBE + SCAN_AXIS = "omega" + + acquisition = sweep.acquisitions[0] + path_template = acquisition.path_template + acqparams = acquisition.acquisition_parameters + + sweep_params = { + "source_ref": mxmodel.MXExperimentRef(target_uuid=mxexperiment.uuid), + "scan_axis": SCAN_AXIS, + "exposure_time": acqparams.exp_time, + "image_width": acqparams.osc_range, + "energy": acqparams.energy, + "transmission": acqparams.transmission, + "resolution": acqparams.resolution, + "detector_binning_mode": acqparams.detector_binning_mode, + "detector_roi_mode": acqparams.detector_roi_mode, + "overlap": acqparams.overlap, + "number_triggers": acqparams.num_triggers, + "number_images_per_trigger": acqparams.num_images_per_trigger, + "prefix": path_template.get_prefix(), + "file_type": path_template.suffix, + "filename_template": path_template.get_image_file_name(), + "path": path_template.directory, + } + + sweep_params["axis_positions_start"] = startpos = dict( + tpl + for tpl in acqparams.centred_position.as_dict().items() + if tpl[1] is not None + ) + startpos[SCAN_AXIS] = acqparams.osc_start + startpos["detector_distance"] = acqparams.detector_distance + + detector_distance = parameters.pop("detector_distance", None) + if detector_distance is not None: + startpos["detector_distance"] = detector_distance + scan = mxmodel.Scan( + scan_position_start=startpos[SCAN_AXIS], + first_image_number=acqparams.first_image, + number_images=acqparams.num_images, + ordinal=1, + ) + sweep_params["scans"] = [scan] + scan_pos_end = parameters.pop("scan_position_end", None) + sweep_params["axis_positions_end"] = {SCAN_AXIS: scan_pos_end} + + # NBNB interleaving, split sweeps, split characterisation + # NBNB cxheck final omega value against start + # NBNB how do we get the detector type? + # NBNB do we use MXCuBE axis names or standardised names? + # detector_type, ,, , + # , axis_positions_end, + # NBNB change from QMO to dict input + + sweep_params.update(parameters) + mxexperiment.results.append(mxmodel.CollectionSweep(**sweep_params)) -def export_mxexperiment(mxexperiment: mxmodel.MXExperiment, datamodel: qmo.TaskNode): + +def export_mxexperiment( + mxexperiment: mxmodel.MXExperiment, path_template: Optional[qmo.PathTemplate]=None +): """Export MXExperiment mxlims record to JSON file""" - pass + if path_template is None: + path = mxexperiment.results[-1].path + file_name = "MXExperiment.json" + else: + template = "MXExperiment_%s_%s.json" + file_name = template % (path_template.get_prefix(), path_template.run_number) + path = os.path.join(path_template.directory, file_name) + path = os.path.join(path, file_name) + print("@~@~ WRITING TO", path) + with open(path, "w") as fp: + json.dump(mxexperiment.model_dump(), fp) From a24472c934d88d56b4f5d9d059b3a5223d5033ff Mon Sep 17 00:00:00 2001 From: rhfogh Date: Mon, 11 Nov 2024 20:32:07 +0000 Subject: [PATCH 4/7] Model bug fixes -> 0.2.4. Should now support phasing experiments (?) --- .../HardwareObjects/Gphl/GphlWorkflow.py | 8 +- .../configuration/mockup/gphl/gphl-setup.yml | 4 +- mxcubecore/queue_entry/data_collection.py | 2 +- mxcubecore/utils/mxlims.py | 118 +++++++++++------- 4 files changed, 80 insertions(+), 52 deletions(-) diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py index 17982acf38..c35f90db5c 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py @@ -38,6 +38,7 @@ import subprocess import time from collections import OrderedDict +from uuid import uuid1 import f90nml import gevent @@ -866,7 +867,10 @@ def start_enactment(self, enactment_id:str, correlation_id:str): workflow_parameters.get("workflow_type") or data_model.strategy_type ) - tracking_data.location_id = workflow_parameters.get("workflow_position_id") + tracking_data.location_id = ( + workflow_parameters.get("workflow_position_id") + or uuid1().hex + ) # NB first orientation only: tracking_data.orientation_id = workflow_parameters.get( "workflow_kappa_settings_id" @@ -2077,7 +2081,7 @@ def collect_data(self, payload, correlation_id): data_collection.workflow_parameters = new_workflow_parameters tracking_data = data_collection.tracking_data tracking_data.uuid = scan.id_ - tracking_data.workflow_name = wf_tracking_data.experiment_strategy + tracking_data.workflow_name = wf_tracking_data.workflow_name tracking_data.workflow_type = wf_tracking_data.workflow_type tracking_data.workflow_uid = wf_tracking_data.uuid tracking_data.location_id = wf_tracking_data.location_id diff --git a/mxcubecore/configuration/mockup/gphl/gphl-setup.yml b/mxcubecore/configuration/mockup/gphl/gphl-setup.yml index 967bacfc29..5d2c0bc170 100644 --- a/mxcubecore/configuration/mockup/gphl/gphl-setup.yml +++ b/mxcubecore/configuration/mockup/gphl/gphl-setup.yml @@ -89,6 +89,6 @@ software_properties: # OPTIONAL. simcal *binary* For Mock collection emulation only. Not used by workflow co.gphl.wf.simcal.bin: - /alt/rhfogh/Software/GPhL/nightly_20240611/Files_workflow_TRUNK_alpha-bdg/autoPROC/bin/linux64/simcal + /alt/rhfogh/Software/GPhL/nightly_20241108/Files_workflow_TRUNK_alpha-bdg/autoPROC/bin/linux64/simcal co.gphl.wf.simcal.bdg_licence_dir: - /alt/rhfogh/Software/GPhL/nightly_20240611/Files_workflow_TRUNK_alpha-bdg + /alt/rhfogh/Software/GPhL/nightly_20241108/Files_workflow_TRUNK_alpha-bdg diff --git a/mxcubecore/queue_entry/data_collection.py b/mxcubecore/queue_entry/data_collection.py index e3c71d71d7..6d06b20421 100644 --- a/mxcubecore/queue_entry/data_collection.py +++ b/mxcubecore/queue_entry/data_collection.py @@ -179,7 +179,7 @@ def post_execute(self): data_model = self.get_data_model() # This would be a good place to check that scan_pos_end matches input parameters # There have been tricky bugs found where this was not the case - mxutils.add_sweep( + mxutils.add_data_collection( self.get_mxlims_record(), data_model, beam_position=beam_position, diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py index f2a770a28b..76034deb0b 100644 --- a/mxcubecore/utils/mxlims.py +++ b/mxcubecore/utils/mxlims.py @@ -34,9 +34,7 @@ from mxlims.pydantic import core -def create_mxexperiment( - datamodel: qmo.TaskNode, **parameters -) -> mxmodel.MXExperiment: +def create_mxexperiment(datamodel: qmo.TaskNode, **parameters) -> mxmodel.MXExperiment: """Create MXExperiment mxlims record from datamodel Args: @@ -59,7 +57,12 @@ def create_mxexperiment( workflow_name = diffraction_plan.experimentType else: workflow_name = diffraction_plan.get("experimentType") - workflow_name = workflow_name or datamodel.experiment_type + if not workflow_name: + try: + workflow_name = datamodel.experiment_type + except AttributeError: + workflow_name = None + initpars["experiment_strategy"] = workflow_name if diffraction_plan: # It is not clear if diffraction_plan is a dict or an object, @@ -142,17 +145,16 @@ def create_mxexperiment( return result -def add_sweep( +def add_data_collection( mxexperiment: mxmodel.MXExperiment, - sweep: qmo.DataCollection, + data_collection: qmo.DataCollection, **parameters: dict, ) -> None: """ Args: mxexperiment: container MXExperiment - sweep: DataCollection queue_model_object to add - uuid: String containing globally unique identifier + data_collection: DataCollection queue_model_object to add **parameters: dict of parameters overriding/supplementing datamodel Returns: @@ -163,64 +165,86 @@ def add_sweep( # ALwsy true in MXCuBE SCAN_AXIS = "omega" - acquisition = sweep.acquisitions[0] + acquisition = data_collection.acquisitions[0] path_template = acquisition.path_template acqparams = acquisition.acquisition_parameters - - sweep_params = { - "source_ref": mxmodel.MXExperimentRef(target_uuid=mxexperiment.uuid), - "scan_axis": SCAN_AXIS, - "exposure_time": acqparams.exp_time, - "image_width": acqparams.osc_range, - "energy": acqparams.energy, - "transmission": acqparams.transmission, - "resolution": acqparams.resolution, - "detector_binning_mode": acqparams.detector_binning_mode, - "detector_roi_mode": acqparams.detector_roi_mode, - "overlap": acqparams.overlap, - "number_triggers": acqparams.num_triggers, - "number_images_per_trigger": acqparams.num_images_per_trigger, - "prefix": path_template.get_prefix(), - "file_type": path_template.suffix, - "filename_template": path_template.get_image_file_name(), - "path": path_template.directory, - } - - sweep_params["axis_positions_start"] = startpos = dict( + tracking_data = data_collection.tracking_data + startpos = dict( tpl for tpl in acqparams.centred_position.as_dict().items() if tpl[1] is not None ) - startpos[SCAN_AXIS] = acqparams.osc_start + axis_pos_start = acqparams.osc_start + axis_pos_end = axis_pos_start + acqparams.num_images * acqparams.osc_range + startpos[SCAN_AXIS] = axis_pos_start startpos["detector_distance"] = acqparams.detector_distance - detector_distance = parameters.pop("detector_distance", None) if detector_distance is not None: startpos["detector_distance"] = detector_distance scan = mxmodel.Scan( - scan_position_start=startpos[SCAN_AXIS], + scan_position_start=axis_pos_start, first_image_number=acqparams.first_image, number_images=acqparams.num_images, - ordinal=1, + ordinal=tracking_data.scan_number or 0, ) - sweep_params["scans"] = [scan] - scan_pos_end = parameters.pop("scan_position_end", None) - sweep_params["axis_positions_end"] = {SCAN_AXIS: scan_pos_end} - # NBNB interleaving, split sweeps, split characterisation - # NBNB cxheck final omega value against start - # NBNB how do we get the detector type? - # NBNB do we use MXCuBE axis names or standardised names? - # detector_type, ,, , - # , axis_positions_end, - # NBNB change from QMO to dict input + sweep_id = tracking_data.sweep_id + sweep = None + for dataset in mxexperiment.results: + if dataset.uuid == sweep_id: + sweep = dataset + break + if sweep: + # This is a scan for an existing sweep. Add ane update + sweep.scans.append(scan) + sweep.axis_positions_start[SCAN_AXIS] = min( + sweep.axis_positions_start[SCAN_AXIS], axis_pos_start + ) + sweep.axis_positions_end[SCAN_AXIS] = max( + sweep.axis_positions_end[SCAN_AXIS], axis_pos_end + ) + + else: + sweep_params = { + "source_ref": mxmodel.MXExperimentRef( + target_uuid=tracking_data.workflow_uid + ), + "role": tracking_data.role, + "logistical_sample_ref": core.LogisticalSampleRef( + target_uuid=tracking_data.location_id + ), + "scan_axis": SCAN_AXIS, + "exposure_time": acqparams.exp_time, + "image_width": acqparams.osc_range, + "energy": acqparams.energy, + "transmission": acqparams.transmission, + "resolution": acqparams.resolution, + "detector_binning_mode": acqparams.detector_binning_mode, + "detector_roi_mode": acqparams.detector_roi_mode, + "overlap": acqparams.overlap, + "number_triggers": acqparams.num_triggers, + "number_images_per_trigger": acqparams.num_images_per_trigger, + "prefix": path_template.get_prefix(), + "file_type": path_template.suffix, + "filename_template": path_template.get_image_file_name(), + "path": path_template.directory, + "axis_positions_start": startpos, + "scans": [scan], + } + + scan_pos_end = parameters.pop("scan_position_end", None) + sweep_params["axis_positions_end"] = {SCAN_AXIS: scan_pos_end} + + # NBNB cxheck final omega value against start + # NBNB how do we get the detector type? + # NBNB do we use MXCuBE axis names or standardised names? - sweep_params.update(parameters) - mxexperiment.results.append(mxmodel.CollectionSweep(**sweep_params)) + sweep_params.update(parameters) + mxexperiment.results.append(mxmodel.CollectionSweep(**sweep_params)) def export_mxexperiment( - mxexperiment: mxmodel.MXExperiment, path_template: Optional[qmo.PathTemplate]=None + mxexperiment: mxmodel.MXExperiment, path_template: Optional[qmo.PathTemplate] = None ): """Export MXExperiment mxlims record to JSON file""" if path_template is None: From 62fc9ce977483de28b58bf3fe51a0d0b9a65a624 Mon Sep 17 00:00:00 2001 From: rhfogh Date: Tue, 12 Nov 2024 20:13:00 +0000 Subject: [PATCH 5/7] Minor model change -> 0.2.5. Now support GPhL characterisation and interleaving experiments --- .../HardwareObjects/Gphl/GphlWorkflow.py | 21 ++++++++++--------- .../mockup/gphl/gphl-workflow.yml | 2 +- mxcubecore/model/queue_model_objects.py | 3 +++ mxcubecore/queue_entry/data_collection.py | 2 -- mxcubecore/utils/mxlims.py | 4 ++-- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py index c35f90db5c..ff60de4af6 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py @@ -237,9 +237,6 @@ def __init__(self, name): self.recentring_file = None - # Scan number for MXLISM Scan ordering - self.next_scan_number = 0 - # # TEST mxcubeweb UI # self.gevent_event = gevent.event.Event() # self.params_dict = {} @@ -869,7 +866,7 @@ def start_enactment(self, enactment_id:str, correlation_id:str): ) tracking_data.location_id = ( workflow_parameters.get("workflow_position_id") - or uuid1().hex + or str(uuid1()) ) # NB first orientation only: tracking_data.orientation_id = workflow_parameters.get( @@ -1975,6 +1972,7 @@ def collect_data(self, payload, correlation_id): last_orientation = () maxdev = -1 snapshotted_rotation_ids = set() + characterisation_id = None for scan in scans: sweep = scan.sweep acq = queue_model_objects.Acquisition() @@ -2080,26 +2078,29 @@ def collect_data(self, payload, correlation_id): # The 'if' statement is to allow this to work in multiple versions data_collection.workflow_parameters = new_workflow_parameters tracking_data = data_collection.tracking_data - tracking_data.uuid = scan.id_ + tracking_data.uuid = str(scan.id_) tracking_data.workflow_name = wf_tracking_data.workflow_name tracking_data.workflow_type = wf_tracking_data.workflow_type tracking_data.workflow_uid = wf_tracking_data.uuid tracking_data.location_id = wf_tracking_data.location_id tracking_data.orientation_id = rotation_id - tracking_data.sweep_id = sweep.id_ if ( gphl_workflow_model.wftype == "acquisition" and not gphl_workflow_model.characterisation_done ): - characterisation_id = sweep.id_ - tracking_data.characterisation_id = characterisation_id# + if characterisation_id is None: + # NB this is a hack - forces tharacterisation to be a single sweep + characterisation_id = str(sweep.id_) + tracking_data.characterisation_id = characterisation_id wf_tracking_data.characterisation_id = characterisation_id tracking_data.role = "Characterisation" + tracking_data.sweep_id = characterisation_id else: tracking_data.characterisation_id = wf_tracking_data.characterisation_id# tracking_data.role = "Result" - tracking_data.scan_number = self.next_scan_number - self.next_scan_number += 1 + tracking_data.sweep_id = str(sweep.id_) + tracking_data.scan_number = gphl_workflow_model.next_scan_number + gphl_workflow_model.next_scan_number += 1 new_workflow_parameters["workflow_name"] = tracking_data.workflow_name new_workflow_parameters["workflow_type"] = tracking_data.workflow_type diff --git a/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml b/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml index ed7c523927..0fb5541bb5 100644 --- a/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml +++ b/mxcubecore/configuration/mockup/gphl/gphl-workflow.yml @@ -42,7 +42,7 @@ settings: default_beam_energy_tag: Main # NB Temporary developer option. Defaults to 1 - allow_duplicate_orientations: 0 +# allow_duplicate_orientations: 0 # Suppress log output of programs called from GPhL workflow # Default to False diff --git a/mxcubecore/model/queue_model_objects.py b/mxcubecore/model/queue_model_objects.py index 38b8cf082c..156d4d1cb5 100644 --- a/mxcubecore/model/queue_model_objects.py +++ b/mxcubecore/model/queue_model_objects.py @@ -2093,6 +2093,9 @@ def __init__(self): self.acquisition_dose = 0.0 self.strategy_length = 0.0 + # Scan number for MXLIMS Scan ordering + self.next_scan_number = 0 + # Workflow attributes - for passing to LIMS (conf Olof Svensson) self.workflow_parameters = {} diff --git a/mxcubecore/queue_entry/data_collection.py b/mxcubecore/queue_entry/data_collection.py index 6d06b20421..2c996fc54b 100644 --- a/mxcubecore/queue_entry/data_collection.py +++ b/mxcubecore/queue_entry/data_collection.py @@ -175,7 +175,6 @@ def post_execute(self): if None in beam_position: beam_position = None beam = HWR.beamline.beam - scan_position_end = HWR.beamline.diffractometer.omega.get_value() data_model = self.get_data_model() # This would be a good place to check that scan_pos_end matches input parameters # There have been tricky bugs found where this was not the case @@ -186,7 +185,6 @@ def post_execute(self): beam_size=beam.get_beam_size(), beam_shape=beam.get_beam_shape().value, detector_distance=detector.distance.get_value(), - scan_position_end=scan_position_end, ) BaseQueueEntry.post_execute(self) diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py index 76034deb0b..228bac7f07 100644 --- a/mxcubecore/utils/mxlims.py +++ b/mxcubecore/utils/mxlims.py @@ -206,6 +206,7 @@ def add_data_collection( else: sweep_params = { + "uuid": sweep_id or tracking_data.uuid, "source_ref": mxmodel.MXExperimentRef( target_uuid=tracking_data.workflow_uid ), @@ -232,8 +233,7 @@ def add_data_collection( "scans": [scan], } - scan_pos_end = parameters.pop("scan_position_end", None) - sweep_params["axis_positions_end"] = {SCAN_AXIS: scan_pos_end} + sweep_params["axis_positions_end"] = {SCAN_AXIS: axis_pos_end} # NBNB cxheck final omega value against start # NBNB how do we get the detector type? From 7f75486435dbaa02ab1d90e3aacb10fdeb225e73 Mon Sep 17 00:00:00 2001 From: rhfogh Date: Tue, 12 Nov 2024 20:54:00 +0000 Subject: [PATCH 6/7] Added line indent to final MXLIMS JSON output --- mxcubecore/utils/mxlims.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py index 228bac7f07..fbcf45d348 100644 --- a/mxcubecore/utils/mxlims.py +++ b/mxcubecore/utils/mxlims.py @@ -257,4 +257,4 @@ def export_mxexperiment( path = os.path.join(path, file_name) print("@~@~ WRITING TO", path) with open(path, "w") as fp: - json.dump(mxexperiment.model_dump(), fp) + json.dump(mxexperiment.model_dump(), fp, indent=4) From 1743afec827d89027693079f4eea08bfff026ee7 Mon Sep 17 00:00:00 2001 From: rhfogh Date: Thu, 30 Jan 2025 14:05:44 +0000 Subject: [PATCH 7/7] Added updated mxlims record export --- .../HardwareObjects/Gphl/GphlQueueEntry.py | 15 +- .../HardwareObjects/Gphl/GphlWorkflow.py | 3 +- mxcubecore/queue_entry/base_queue_entry.py | 12 +- mxcubecore/queue_entry/data_collection.py | 16 +- mxcubecore/utils/mxlims.py | 164 ++++++++++-------- 5 files changed, 114 insertions(+), 96 deletions(-) diff --git a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py index 12521ec32c..0bcaa4771b 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py @@ -23,12 +23,11 @@ import logging -from datetime import datetime from mxcubecore import HardwareRepository as HWR from mxcubecore.queue_entry.base_queue_entry import BaseQueueEntry -from mxlims.pydantic import crystallography as mxmodel +from mxlims.pydantic import mxmodel from mxcubecore.utils import mxlims as mxutils __credits__ = ["MXCuBE collaboration"] @@ -58,7 +57,7 @@ def pre_execute(self): def post_execute(self): BaseQueueEntry.post_execute(self) - msg = "Finishing GΦL workflow (%s)" % (self.get_data_model().strategy_name) + msg = "Finishing GΦL workflow (%s)" % self.get_data_model().strategy_name logging.getLogger("user_level_log").info(msg) HWR.beamline.gphl_workflow.post_execute() @@ -69,13 +68,13 @@ def stop(self): self.get_view().setText(1, "Stopped") def init_mxlims(self): - """Initialise MXLIMS MXExperiment if it is not already set""" + """Initialise MXLIMS MxExperimentMessage if it is not already set""" - mxexperiment: mxmodel.MXExperiment = self.get_mxlims_record() + mxexperiment: mxmodel.MxExperimentMessage = self.get_mxlims_record() if mxexperiment is None: data_model = self.get_data_model() - self._mxlims_record = mxutils.create_mxexperiment( - data_model, - start_time = datetime.now(), + self._mxlims_record = mxutils.create_mxrecord( + sample=data_model.get_sample_node(), + tracking_data=data_model.tracking_data, measured_flux = HWR.beamline.flux.get_value() ) diff --git a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py index ff60de4af6..4c10838b3e 100644 --- a/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py +++ b/mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py @@ -850,7 +850,7 @@ def pre_execute(self, queue_entry): self._workflow_queue = gevent.queue.Queue() def start_enactment(self, enactment_id:str, correlation_id:str): - """Set enactment_id and initialise MXLIMS MXExperiment""" + """Set enactment_id and initialise MXLIMS MxExperimentMessage""" data_model = self._queue_entry.get_data_model() tracking_data = data_model.tracking_data workflow_parameters = data_model.workflow_parameters @@ -1973,6 +1973,7 @@ def collect_data(self, payload, correlation_id): maxdev = -1 snapshotted_rotation_ids = set() characterisation_id = None + scan_numbers = {} for scan in scans: sweep = scan.sweep acq = queue_model_objects.Acquisition() diff --git a/mxcubecore/queue_entry/base_queue_entry.py b/mxcubecore/queue_entry/base_queue_entry.py index 156ed5d4dc..0c92a8ea02 100644 --- a/mxcubecore/queue_entry/base_queue_entry.py +++ b/mxcubecore/queue_entry/base_queue_entry.py @@ -41,7 +41,7 @@ EXPERIMENT_TYPE, ) -from mxlims.pydantic import crystallography as mxmodel +from mxlims.pydantic import mxmodel from mxcubecore.utils import mxlims as mxutils @@ -221,7 +221,7 @@ def __init__(self, view=None, data_model=None, view_set_queue_entry=True): self._data_model.lims_session_id = HWR.beamline.session.session_id # MXLIMS record for currently running experiment - self._mxlims_record: Optional[mxmodel.MXExperiment] = None + self._mxlims_record: Optional[mxmodel.MxExperimentMessage] = None def is_failed(self): """Returns True if failed""" @@ -295,8 +295,8 @@ def set_enabled(self, state): """ self._checked_for_exec = state - def get_mxlims_record(self) -> mxmodel.MXExperiment: - """Get MXExperiment MXLIMS record if the entry is currently running""" + def get_mxlims_record(self) -> mxmodel.MxExperimentMessage: + """Get MxExperiment MXLIMS record if the entry is currently running""" obj = self result = None container = obj.get_container() @@ -346,8 +346,8 @@ def post_execute(self): mxlims_record = self._mxlims_record if mxlims_record is not None: self._mxlims_record = None - mxlims_record.end_time = datetime.now() - mxutils.export_mxexperiment( + mxlims_record.job.end_time = datetime.now() + mxutils.export_mxrecord( mxlims_record, None, ) diff --git a/mxcubecore/queue_entry/data_collection.py b/mxcubecore/queue_entry/data_collection.py index 2c996fc54b..885701f857 100644 --- a/mxcubecore/queue_entry/data_collection.py +++ b/mxcubecore/queue_entry/data_collection.py @@ -17,7 +17,6 @@ # along with MXCuBE. If not, see . import logging -from datetime import datetime import gevent import uuid @@ -37,7 +36,7 @@ center_before_collect, ) -from mxlims.pydantic import crystallography as mxmodel +from mxlims.pydantic import mxmodel from mxcubecore.utils import mxlims as mxutils __credits__ = ["MXCuBE collaboration"] @@ -135,7 +134,7 @@ def pre_execute(self): data_model = self.get_data_model() - mxexperiment: mxmodel.MXExperiment = self.get_mxlims_record() + mxexperiment: mxmodel.MxExperimentMessage = self.get_mxlims_record() if mxexperiment is None: tracking_data = data_model.tracking_data workflow_parameters = data_model.workflow_parameters @@ -153,12 +152,11 @@ def pre_execute(self): tracking_data.characterisation_id = workflow_parameters.get( "characterisation_id" ) - mxexperiment = mxutils.create_mxexperiment( - data_model, - start_time=datetime.now(), - measured_flux=HWR.beamline.flux.get_value(), + self._mxlims_record = mxutils.create_mxrecord( + sample=data_model.get_sample_node(), + tracking_data=tracking_data, + measured_flux = HWR.beamline.flux.get_value() ) - self._mxlims_record = mxexperiment if data_model.get_parent(): gid = data_model.get_parent().lims_group_id @@ -176,8 +174,6 @@ def post_execute(self): beam_position = None beam = HWR.beamline.beam data_model = self.get_data_model() - # This would be a good place to check that scan_pos_end matches input parameters - # There have been tricky bugs found where this was not the case mxutils.add_data_collection( self.get_mxlims_record(), data_model, diff --git a/mxcubecore/utils/mxlims.py b/mxcubecore/utils/mxlims.py index fbcf45d348..985d990091 100644 --- a/mxcubecore/utils/mxlims.py +++ b/mxcubecore/utils/mxlims.py @@ -27,29 +27,41 @@ import os import json +from datetime import datetime from typing import Optional -from mxlims.pydantic import crystallography as mxmodel +from uuid import uuid1 + +from mxcubecore.HardwareObjects.Native import xmlrpc_prefix +from mxlims.pydantic import mxmodel from mxcubecore.model import queue_model_objects as qmo -from mxlims.pydantic import core -def create_mxexperiment(datamodel: qmo.TaskNode, **parameters) -> mxmodel.MXExperiment: - """Create MXExperiment mxlims record from datamodel +def create_mxrecord( + sample: qmo.Sample, + tracking_data: dict, + start_time: datetime = None, + end_time: datetime = None, + job_status: str = None, + **parameters +) -> mxmodel.MxExperimentMessage: + """Create MxExperimentMessage mxlims record from datamodel Args: - datamodel: QueueModelObject representing experiment - uuid: String containing globally unique identifier - **parameters: dict of parameters overriding/supplementing datamodel + sample: QueueModelObject representing sample + tracking_data: Dictionary with uuid etc. connecting sweeps and workflows + start_time: Experiment start time + end_time: Experiment end time + job_status: Job status (enumerated string) + **parameters: dict of parameters overriding/supplementing MxExperimentData Returns: """ - sample = datamodel.get_sample_node() - tracking_data = datamodel.tracking_data + start_time = start_time or datetime.now() crystal = sample.crystals[0] if sample.crystals else None diffraction_plan = sample.diffraction_plan - initpars = {"uuid": tracking_data.uuid} + jobuuid = tracking_data.uuid workflow_name = tracking_data.workflow_name if not workflow_name: if diffraction_plan: @@ -58,11 +70,8 @@ def create_mxexperiment(datamodel: qmo.TaskNode, **parameters) -> mxmodel.MXExpe else: workflow_name = diffraction_plan.get("experimentType") if not workflow_name: - try: - workflow_name = datamodel.experiment_type - except AttributeError: - workflow_name = None - initpars["experiment_strategy"] = workflow_name + workflow_name = parameters.pop("experiment_type", None) + jobpars = {"experiment_strategy": workflow_name} if diffraction_plan: # It is not clear if diffraction_plan is a dict or an object, @@ -73,33 +82,32 @@ def create_mxexperiment(datamodel: qmo.TaskNode, **parameters) -> mxmodel.MXExpe else: resolution = diffraction_plan.get("aimedResolution") if resolution: - initpars["expected_resolution"] = resolution + jobpars["expected_resolution"] = resolution if hasattr(diffraction_plan, "requiredCompleteness"): completeness = diffraction_plan.requiredCompleteness else: completeness = diffraction_plan.get("requiredCompleteness") if completeness: - initpars["target_completeness"] = completeness + jobpars["target_completeness"] = completeness if hasattr(diffraction_plan, "requiredMultiplicity"): multiplicity = diffraction_plan.requiredMultiplicity else: multiplicity = diffraction_plan.get("requiredMultiplicity") if multiplicity: - initpars["target_multiplicity"] = multiplicity - - # Add MXSample and LogisticalSample + jobpars["target_multiplicity"] = multiplicity + jobpars.update(parameters) + jobdata = mxmodel.MxExperimentData(**jobpars) - # MXSample + # CrystallographicSample + crystal_form = None samplepars = {} samplepars["name"] = ( sample.name or sample.get_name() or (crystal and crystal.acronym) ) if crystal: space_group_name = crystal.space_group - if space_group_name: - samplepars["space_group_name"] = space_group_name dd1 = { "a": crystal.cell_a, "b": crystal.cell_b, @@ -108,9 +116,16 @@ def create_mxexperiment(datamodel: qmo.TaskNode, **parameters) -> mxmodel.MXExpe "beta": crystal.cell_beta, "gamma": crystal.cell_gamma, } - if all(dd1.values()): - samplepars["unit_cell"] = mxmodel.UnitCell(**dd1) - + unit_cell = mxmodel.UnitCell(**dd1) if all(dd1.values()) else None + if space_group_name or unit_cell: + samplepars["crystal_form"] = mxmodel.CrystalForm( + space_group_name=space_group_name, crystal_form=crystal_form + ) + + # LogisticalSample, not really modeled yet, so not much to put in + crystal_uuid = crystal.crystal_uuid + else: + crystal_uuid = None # Set parameters from diffraction plan if diffraction_plan: # It is not clear if diffraction_plan is a dict or an object, @@ -121,49 +136,51 @@ def create_mxexperiment(datamodel: qmo.TaskNode, **parameters) -> mxmodel.MXExpe radiation_sensitivity = diffraction_plan.get("radiationSensitivity") if radiation_sensitivity: samplepars["radiation_sensitivity"] = radiation_sensitivity + sampledata = mxmodel.CrystallographicSampleData(**samplepars) + sample = mxmodel.CrystallographicSample(uuid=uuid1(), data=sampledata) - sample = mxmodel.MXSample(**samplepars) - initpars["sample"] = sample - - # LogisticalSample, not really modeled yet, so not much to put in - crystal_uuid = crystal.crystal_uuid if crystal else None + crystaldata = mxmodel.CrystalData() if crystal_uuid: - logistical_sample = core.LogisticalSample(uuid=crystal_uuid) + logistical_sample = mxmodel.Crystal( + uuid=crystal_uuid, sample_id=sample.uuid, data=crystaldata + ) else: - logistical_sample = core.LogisticalSample() - logistical_sample.sample_ref = core.LogisticalSampleRef(target_uuid=sample.uuid) - initpars["logistical_sample"] = logistical_sample - sample.logistical_sample_refs.append( - core.LogisticalSampleRef(target_uuid=logistical_sample.uuid) + logistical_sample = mxmodel.Crystal(uuid=uuid1(), sample_id=sample.uuid, data=crystaldata) + + experiment = mxmodel.MxExperiment( + uuid=jobuuid, + data=jobdata, + start_time=start_time, + end_time=end_time, + job_status=job_status, + sample_id=sample.uuid, + logistical_sample_id=logistical_sample.uuid ) - initpars["logistical_sample_ref"] = core.LogisticalSampleRef( - target_uuid=logistical_sample.uuid + # + return mxmodel.MxExperimentMessage( + job=experiment, sample=sample, logistical_sample=logistical_sample ) - initpars.update(parameters) - result = mxmodel.MXExperiment(**initpars) - return result - def add_data_collection( - mxexperiment: mxmodel.MXExperiment, + mxrecord: mxmodel.MxExperimentMessage, data_collection: qmo.DataCollection, **parameters: dict, ) -> None: - """ + """Add CollectionSweep record to MxExperiment in mxrecord Args: - mxexperiment: container MXExperiment + mxrecord: container MxExperimentMessage data_collection: DataCollection queue_model_object to add - **parameters: dict of parameters overriding/supplementing datamodel + **parameters: dict of parameters overriding/supplementing MxlimsData Returns: """ - """Add CollectionSweep record to MXExperiment""" # ALwsy true in MXCuBE SCAN_AXIS = "omega" + mxexperiment = mxrecord.job acquisition = data_collection.acquisitions[0] path_template = acquisition.path_template @@ -190,30 +207,30 @@ def add_data_collection( sweep_id = tracking_data.sweep_id sweep = None + if not mxexperiment.results: + mxexperiment.results = [] for dataset in mxexperiment.results: - if dataset.uuid == sweep_id: + if str(dataset.uuid) == sweep_id: sweep = dataset break if sweep: # This is a scan for an existing sweep. Add ane update - sweep.scans.append(scan) - sweep.axis_positions_start[SCAN_AXIS] = min( - sweep.axis_positions_start[SCAN_AXIS], axis_pos_start + sweep.data.scans.append(scan) + sweep.data.axis_positions_start[SCAN_AXIS] = min( + sweep.data.axis_positions_start[SCAN_AXIS], axis_pos_start ) - sweep.axis_positions_end[SCAN_AXIS] = max( - sweep.axis_positions_end[SCAN_AXIS], axis_pos_end + sweep.data.axis_positions_end[SCAN_AXIS] = max( + sweep.data.axis_positions_end[SCAN_AXIS], axis_pos_end ) else: sweep_params = { "uuid": sweep_id or tracking_data.uuid, - "source_ref": mxmodel.MXExperimentRef( - target_uuid=tracking_data.workflow_uid - ), + "source_id": mxexperiment.uuid, + "logistical_sample_id": mxrecord.logistical_sample.uuid, "role": tracking_data.role, - "logistical_sample_ref": core.LogisticalSampleRef( - target_uuid=tracking_data.location_id - ), + } + sweepdata = { "scan_axis": SCAN_AXIS, "exposure_time": acqparams.exp_time, "image_width": acqparams.osc_range, @@ -233,28 +250,33 @@ def add_data_collection( "scans": [scan], } - sweep_params["axis_positions_end"] = {SCAN_AXIS: axis_pos_end} + sweepdata["axis_positions_end"] = {SCAN_AXIS: axis_pos_end} - # NBNB cxheck final omega value against start # NBNB how do we get the detector type? # NBNB do we use MXCuBE axis names or standardised names? - sweep_params.update(parameters) - mxexperiment.results.append(mxmodel.CollectionSweep(**sweep_params)) + sweepdata.update(parameters) + dataset = mxmodel.CollectionSweep( + data=mxmodel.CollectionSweepData(**sweepdata), + **sweep_params + ) + mxexperiment.results.append(dataset) + -def export_mxexperiment( - mxexperiment: mxmodel.MXExperiment, path_template: Optional[qmo.PathTemplate] = None +def export_mxrecord( + mxrecord: mxmodel.MxExperimentMessage, + path_template: Optional[qmo.PathTemplate] = None ): - """Export MXExperiment mxlims record to JSON file""" + """Export MxExperiment mxlims record to JSON file""" if path_template is None: - path = mxexperiment.results[-1].path - file_name = "MXExperiment.json" + path = mxrecord.job.results[-1].data.path + file_name = "MxExperiment.json" else: template = "MXExperiment_%s_%s.json" file_name = template % (path_template.get_prefix(), path_template.run_number) path = os.path.join(path_template.directory, file_name) path = os.path.join(path, file_name) - print("@~@~ WRITING TO", path) + print("@~@~ WRITING JSON TO", path) with open(path, "w") as fp: - json.dump(mxexperiment.model_dump(), fp, indent=4) + fp.write(mxrecord.model_dump_json(indent=4, exclude_none=True))