Skip to content

Commit

Permalink
Merge branch 'add-trims-idff' into remove-psioc-mode-sofb
Browse files Browse the repository at this point in the history
  • Loading branch information
xresende committed Dec 4, 2024
2 parents c0105e6 + d029379 commit ac5dd0c
Show file tree
Hide file tree
Showing 5 changed files with 562 additions and 255 deletions.
118 changes: 101 additions & 17 deletions siriuspy/siriuspy/devices/idff.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class _ParamPVs:
CORRQS1CURRENT_MON = None
CORRQS2CURRENT_MON = None
CORRLCHCURRENT_MON = None
CORRQA1CURRENT_MON = None
CORRQB1CURRENT_MON = None
CORRQC1CURRENT_MON = None
CORRQC2CURRENT_MON = None
CORRQB2CURRENT_MON = None
CORRQA2CURRENT_MON = None

def __str__(self):
"""Print parameters."""
Expand Down Expand Up @@ -75,47 +81,83 @@ def correctors_status(self):
return self[curr_sts] if curr_sts else None

@property
def calculated_ch1_current(self):
def calc_corr_current_ch1(self):
"""Return calculated CH1 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRCH1CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calculated_ch2_current(self):
def calc_corr_current_ch2(self):
"""Return calculated CH2 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRCH2CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calculated_ps_current_cv1(self):
def calc_corr_current_cv1(self):
"""Return calculated CV1 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRCV1CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calculated_ps_current_cv2(self):
def calc_corr_current_cv2(self):
"""Return calculated CV2 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRCV2CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calculated_ps_current_qs1(self):
def calc_corr_current_qs1(self):
"""Return calculated QS1 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQS1CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calculated_ps_current_qs2(self):
def calc_corr_current_qs2(self):
"""Return calculated Q2 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQS2CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calculated_ps_current_lch(self):
def calc_corr_current_lch(self):
"""Return calculated LCH power supply current [A]."""
curr_name = self.PARAM_PVS.CORRLCHCURRENT_MON
return self[curr_name] if curr_name else None

@property
def calc_corr_current_qa1(self):
"""Return calculated QA1 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQA1CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calc_corr_current_qb1(self):
"""Return calculated QB1 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQB1CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calc_corr_current_qc1(self):
"""Return calculated QC1 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQC1CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calc_corr_current_qc2(self):
"""Return calculated QC2 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQC2CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calc_corr_current_qb2(self):
"""Return calculated QB2 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQB2CURRENT_MON
return self[curr_name] if curr_name else None

@property
def calc_corr_current_qa2(self):
"""Return calculated QA2 power supply current [A]."""
curr_name = self.PARAM_PVS.CORRQA2CURRENT_MON
return self[curr_name] if curr_name else None

@property
def loopfreq(self):
"""Return FF loop frequency [Hz]."""
Expand Down Expand Up @@ -173,7 +215,6 @@ class DEVICES:
CORRCV2CURRENT_MON = 'CorrCV2Current-Mon'
CORRQS1CURRENT_MON = 'CorrQS1Current-Mon'
CORRQS2CURRENT_MON = 'CorrQS2Current-Mon'
CORRLCHCURRENT_MON = 'CorrLCHCurrent-Mon'

PROPERTIES_DEFAULT = \
tuple(set(
Expand Down Expand Up @@ -254,9 +295,10 @@ class IDFF(_DeviceSet):
class DEVICES(_ID.DEVICES):
"""."""

def __init__(self, devname):
def __init__(self, devname, with_devctrl=True):
"""."""
devname = _SiriusPVName(devname)
self._with_devctrl = with_devctrl

# check if device exists
if devname not in IDFF.DEVICES.ALL:
Expand All @@ -270,13 +312,22 @@ def __init__(self, devname):
self._kparametername = \
_IDSearch.conv_idname_2_kparameter_propty(devname)

alldevs = self._create_devices(devname)
(self._devctrl, self._devid, self._devsch, self._devscv,
self._devsqs, self._devslc) = self._create_devices(devname)
self._devsqs, self._devslc, self._devsqn) = alldevs

devices = [self._devctrl, self._devid, ]
self._lab2corrdevs = self._create_labels_2_corrdevs_dict()

devices = list()
if self._with_devctrl:
devices += [self._devctrl, self._devid]
else:
devices += [self._devid, ]
devices += self._devsch
devices += self._devscv
devices += self._devsqs
devices += self._devslc
devices += self._devsqn
super().__init__(devices, devname=devname)

@property
Expand All @@ -299,6 +350,11 @@ def lcnames(self):
"""Return LC corrector power supply names."""
return _IDSearch.conv_idname_2_idff_lcnames(self.devname)

@property
def qnnames(self):
"""Return QD corrector power supply names."""
return _IDSearch.conv_idname_2_idff_qnnames(self.devname)

@property
def ctrldev(self):
"""Return IDFFCtrl device."""
Expand Down Expand Up @@ -329,6 +385,11 @@ def lcdevs(self):
"""Return LC corrector power supply names."""
return self._devslc

@property
def qndevs(self):
"""Return QD corrector power supply names."""
return self._devsqn

@property
def pparametername(self):
"""Return corresponding to ID pparameter."""
Expand Down Expand Up @@ -367,6 +428,13 @@ def idffconfig(self):
"""."""
return self._idffconfig

def read_corr_offset_values(self):
"""Read current corrector values."""
offsets = dict()
for label, corrdev in self._lab2corrdevs.items():
offsets[label] = corrdev.current
return offsets

def find_configs(self):
"""Find si_idff configurations in configdb."""
return self._idffconfig.configdbclient.find_configs()
Expand All @@ -386,14 +454,14 @@ def calculate_setpoints(
polarization - a string defining the required polarization for
setpoint calculation.
"""
if not self._idffconfig:
ValueError('IDFFConfig is not loaded!')

polarization, pparameter_value, kparameter_value = \
self.get_polarization_state(
pparameter_value=pparameter_value,
kparameter_value=kparameter_value)

if not self._idffconfig:
ValueError('IDFFConfig is not loaded!')

if polarization not in self.idffconfig.polarizations:
raise ValueError('Polarization is not compatible with ID.')
if pparameter_value is None:
Expand All @@ -416,7 +484,8 @@ def implement_setpoints(
polarization, pparameter_value, kparameter_value = [None, ] * 3
if corrdevs is None:
corrdevs = \
self._devsch + self._devscv + self._devsqs + self._devslc
self._devsch + self._devscv + \
self._devsqs + self._devslc + self._devsqn
for pvname, value in setpoints.items():
# find corrdev corresponding to pvname
for dev in corrdevs:
Expand Down Expand Up @@ -547,7 +616,7 @@ def rampup_corr_currents(
_time.sleep(time_interval / (nrpts - 1))

def _create_devices(self, devname):
devctrl = IDFFCtrl(devname=devname)
devctrl = IDFFCtrl(devname=devname) if self._with_devctrl else None
pol_mon = _ID.get_idclass(devname).PARAM_PVS.POL_MON
params = (
self._pparametername, self._kparametername, pol_mon)
Expand All @@ -559,4 +628,19 @@ def _create_devices(self, devname):
devscv = [_PowerSupplyFBP(devname=dev) for dev in self.cvnames]
devsqs = [_PowerSupplyFBP(devname=dev) for dev in self.qsnames]
devslc = [_PowerSupplyFBP(devname=dev) for dev in self.lcnames]
return devctrl, devid, devsch, devscv, devsqs, devslc
devsqn = [_PowerSupplyFBP(devname=dev) for dev in self.qnnames]
return devctrl, devid, devsch, devscv, devsqs, devslc, devsqn

def _create_labels_2_corrdevs_dict(self):
ch_labels = _IDSearch.IDFF_CH_LABELS
cv_labels = _IDSearch.IDFF_CV_LABELS
qs_labels = _IDSearch.IDFF_QS_LABELS
lc_labels = _IDSearch.IDFF_LC_LABELS
qn_labels = _IDSearch.IDFF_QN_LABELS
devs = dict()
devs.update({lab: dev for lab, dev in zip(ch_labels, self._devsch)})
devs.update({lab: dev for lab, dev in zip(cv_labels, self._devscv)})
devs.update({lab: dev for lab, dev in zip(qs_labels, self._devsqs)})
devs.update({lab: dev for lab, dev in zip(lc_labels, self._devslc)})
devs.update({lab: dev for lab, dev in zip(qn_labels, self._devsqn)})
return devs
36 changes: 30 additions & 6 deletions siriuspy/siriuspy/idff/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class IDFFConfig(_ConfigDBDocument):
"""Insertion Device Feedforward Configuration."""
"""ID Feedforward ConfigDB configuration."""

CONFIGDB_TYPE = 'si_idff'

Expand Down Expand Up @@ -73,6 +73,11 @@ def lc_pvnames(self):
"""Return LC corrector power supply pvnames."""
return self._get_corr_pvnames('lch', '')

@property
def qn_pvnames(self):
"""Return QD corrector power supply pvnames."""
return self._get_corr_pvnames('qa1', 'qa2', 'qb1', 'qb2', 'qc1', 'qc2')

@property
def polarizations(self):
"""Return list of light polarizations in the IDFF config."""
Expand All @@ -91,6 +96,12 @@ def value(self, value):
"""Set configuration."""
self._set_value(value)

@property
def offsets(self):
"""Correctors offsets."""
val = self._value
return val['offsets'] if 'offsets' in val else dict()

def calculate_setpoints(
self, polarization, pparameter_value, kparameter_value):
"""Return correctors setpoints for a particular ID config.
Expand All @@ -108,10 +119,13 @@ def calculate_setpoints(
params = idff['kparameter']
param_value = kparameter_value
setpoints = dict()
offsets = self.offsets
for corrlabel, table in idff.items():
if corrlabel not in ('pparameter', 'kparameter'):
# linear interpolation
setpoint = _np.interp(param_value, params, table)
curr = _np.interp(param_value, params, table)
offset = offsets.get(corrlabel, 0)
setpoint = curr + offset
corr_pvname = self._value['pvnames'][corrlabel]
setpoints[corr_pvname] = setpoint
return setpoints
Expand Down Expand Up @@ -162,6 +176,7 @@ def __str__(self):
"""."""
stg = ''
stg += f'name: {self.name}'
stg += f'\nidname: {self.idname}'
value = self.value
if value is None:
return stg
Expand Down Expand Up @@ -212,11 +227,13 @@ def check_valid_value(self, value):
getcv = _IDSearch.conv_idname_2_idff_cvnames
getqs = _IDSearch.conv_idname_2_idff_qsnames
getlc = _IDSearch.conv_idname_2_idff_lcnames
getqn = _IDSearch.conv_idname_2_idff_qnnames
chnames = [corr + ':Current-SP' for corr in getch(self.idname)]
cvnames = [corr + ':Current-SP' for corr in getcv(self.idname)]
qsnames = [corr + ':Current-SP' for corr in getqs(self.idname)]
lcnames = [corr + ':Current-SP' for corr in getlc(self.idname)]
pvsidsearch = set(chnames + cvnames + qsnames + lcnames)
qnnames = [corr + ':Current-SP' for corr in getqn(self.idname)]
pvsidsearch = set(chnames + cvnames + qsnames + lcnames + qnnames)
symm_diff = pvsconfig ^ pvsidsearch

if symm_diff:
Expand Down Expand Up @@ -266,12 +283,19 @@ def check_valid_value(self, value):
'are not consistent')
return True

def _get_corr_pvnames(self, cname1, cname2):
def _get_corr_pvnames(
self,
cname_a1=None, cname_a2=None,
cname_b1=None, cname_b2=None,
cname_c1=None, cname_c2=None):
"""Return corrector power supply pvnames."""
if self._value:
pvnames = self._value['pvnames']
corr1, corr2 = pvnames.get(cname1), pvnames.get(cname2)
return corr1, corr2
corr_a1, corr_a2 = pvnames.get(cname_a1), pvnames.get(cname_a2)
corr_b1, corr_b2 = pvnames.get(cname_b1), pvnames.get(cname_b2)
corr_c1, corr_c2 = pvnames.get(cname_c1), pvnames.get(cname_c2)
allc = corr_a1, corr_a2, corr_b1, corr_b2, corr_c1, corr_c2
return [corr for corr in allc if corr is not None]
else:
raise ValueError('Configuration not defined!')

Expand Down
Loading

0 comments on commit ac5dd0c

Please sign in to comment.