diff --git a/siriuspy/siriuspy/pwrsupply/siggen.py b/siriuspy/siriuspy/pwrsupply/siggen.py new file mode 100644 index 000000000..0de9eee00 --- /dev/null +++ b/siriuspy/siriuspy/pwrsupply/siggen.py @@ -0,0 +1,159 @@ +"""Power Supply Signal Generator.""" + + +import numpy as _np + + +class SigGenConfig: + """Signal Generator config class.""" + + TYPES = ('Sine', 'DampedSine', 'Trapezoidal') + + def __init__(self, data=None, + type=None, # Sine, DampedSine, Trapezoidal + nr_cycles=None, # Sine, DampedSine, Trapezoidal + freq=None, # [Hz] Sine, DampedSine + amplitude=None, # [A] Sine, DampedSine, Trapezoidal + offset=None, # [A] Sine, DampedSine, Trapezoidal + aux_param=None, # Sine, DampedSine, Trapezoidal + rampup_time=None, # [s] Trapezoidal + rampdown_time=None, # [s] Trapezoidal + plateau_time=None, # [s] Trapezoidal + theta_begin=None, # Sine, DampedSine + theta_end=None, # Sine, DampedSine + decay_time=None): # DampedSine + """Init method.""" + # set default values + self._set_default_config() + # process input arguments + if data is not None: + self.type = str(data[0]) + self.nr_cycles = int(data[1]) + self.freq = float(data[2]) + self.amplitude = float(data[3]) + self.offset = float(data[4]) + self.aux_param = [float(d) for d in data[5:9]] + self.type = str(type) if type is not None else self.type + self.nr_cycles = int(nr_cycles) if nr_cycles is not None \ + else self.nr_cycles + self.freq = float(freq) if freq is not None else self.freq + self.amplitude = float(amplitude) if amplitude is not None \ + else self.amplitude + self.offset = float(offset) if offset is not None \ + else self.offset + if aux_param is not None: + self.aux_param = [float(d) for d in aux_param[0:4]] + self.aux_param[0] = float(rampup_time) if rampup_time is not None \ + else self.aux_param[0] + self.aux_param[1] = float(rampdown_time) if rampdown_time is not None \ + else self.aux_param[1] + self.aux_param[2] = float(plateau_time) if plateau_time is not None \ + else self.aux_param[2] + self.aux_param[0] = float(theta_begin) if theta_begin is not None \ + else self.aux_param[0] + self.aux_param[1] = float(theta_end) if theta_end is not None \ + else self.aux_param[1] + self.aux_param[2] = float(decay_time) if decay_time is not None \ + else self.aux_param[2] + + # --- public methods --- + + @property + def duration(self): + """Duration of signal [s].""" + return self.nr_cycles / self.freq + + @property + def rampup_time(self): + """Rampup time for Trapezoidal signals.""" + return self.aux_param[0] + + @rampup_time.setter + def rampup_time(self, value): + """Set Rampup time for Trapezoidal signals.""" + self.aux_param[0] = value + return value + + @property + def theta_begin(self): + """Initial phase for Sine or DampedSine signals.""" + return self.aux_param[0] + + @theta_begin.setter + def theta_begin(self, value): + """Set Initial phase for Sine or DampedSine signals.""" + self.aux_param[0] = value + return value + + @property + def rampdown_time(self): + """Rampdown time for Trapezoidal signals.""" + return self.aux_param[1] + + @rampdown_time.setter + def rampdown_time(self, value): + """Set Rampdown time for Trapezoidal signals.""" + self.aux_param[1] = value + return value + + @property + def theta_end(self): + """Final phase for Sine or DampedSine signals.""" + return self.aux_param[1] + + @theta_end.setter + def theta_end(self, value): + """Set Final phase for Sine or DampedSine signals.""" + self.aux_param[1] = value + return value + + @property + def plateau_time(self): + """Plateau time for Trapezoidal signals.""" + return self.aux_param[2] + + @plateau_time.setter + def plateau_time(self, value): + """Set Plateau time for Trapezoidal signals.""" + self.aux_param[2] = value + return value + + @property + def decay_time(self): + """Decay time constant for DampedSine signals.""" + return self.aux_param[2] + + @decay_time.setter + def decay_time(self, value): + """Set Decay time constant for DampedSine signals.""" + self.aux_param[2] = value + return value + + def get_waveform(self, nr_points=100): + """Return list with signal waveform.""" + t = _np.linspace(0.0, self.duration, nr_points) + if self.type in ('Sine', 'DampedSine'): + # TODO: confirm! + if self.type == 'Sine': + amp = self.amplitude * _np.ones(t.shape) + else: + amp = self.amplitude * _np.exp(-t/self.decay_time) + wfm = self.offset * _np.ones(t.shape) + phase = self.freq*t % (2*_np.pi) + sel_in = (phase >= self.theta_begin) and (phase <= self.theta_end) + wfm_delta = amp * _np.sin(phase) + wfm[sel_in] += wfm_delta[sel_in] + else: + # TODO: implement get_waveform for 'Trapezoidal' type. + wfm = _np.zeros(t.shape) + self.offset + return wfm + + # --- private methods --- + + def _set_default_config(self): + self.type = 'Sine' + self.nr_cycles = 1 + self.freq = 100.0 # [A] + self.amplitude = 0.0 + self.offset = 0.0 # [A] + self.aux_param = [0.0, 0.0, 0.0, 0.0] diff --git a/siriuspy/siriuspy/search.py b/siriuspy/siriuspy/search.py index 60e05169e..20713c134 100644 --- a/siriuspy/siriuspy/search.py +++ b/siriuspy/siriuspy/search.py @@ -5,6 +5,7 @@ from siriuspy.namesys import Filter as _Filter from siriuspy.namesys import SiriusPVName as _SiriusPVName from siriuspy import servweb as _web +from siriuspy.pwrsupply.siggen import SigGenConfig as _SigGenConfig from siriuspy.magnet.excdata import ExcitationData as _ExcitationData @@ -22,6 +23,7 @@ class PSSearch: _pstype_2_excdat_dict = dict() _psname_2_psmodel_dict = None _psname_2_bbbname_dict = None + _psname_2_siggen_dict = None _bbbname_2_psnames_dict = None @staticmethod @@ -153,6 +155,13 @@ def conv_psname_2_psmodel(psname): PSSearch._reload_psname_2_psmodel_dict() return PSSearch._psname_2_psmodel_dict[psname] + @staticmethod + def conv_psname_2_siggenconf(psname): + """Convert psname to corresponding SigGenConf object.""" + if PSSearch._psname_2_siggen_dict is None: + PSSearch._reload_psname_2_siggen_dict() + return PSSearch._psname_2_siggen_dict[psname] + @staticmethod def check_pstype_ispulsed(pstype): """Return True if pstype is of a pulsed pwrsupply type, False o.w.""" @@ -295,6 +304,20 @@ def _reload_psname_2_psmodel_dict(): else: raise Exception('could not read psmodels from web server') + @staticmethod + def _reload_psname_2_siggen_dict(): + """Load siggen config by psname to a dict.""" + if _web.server_online(): + text = _web.ps_siggen_configuration_read() + data, _ = _util.read_text_data(text) + PSSearch._psname_2_siggen_dict = dict() + for datum in data: + psname, *siggen_data = datum + siggen_config = _SigGenConfig(data=siggen_data) + PSSearch._psname_2_siggen_dict[psname] = siggen_config + else: + raise Exception('could not read siggen config from web server') + @staticmethod def _reload_bbb_2_psname_dict(): """Load psnames mapped to BBB names and vice versa.""" diff --git a/siriuspy/tests/test_search.py b/siriuspy/tests/test_search.py index 900a6a4a5..d723735ce 100755 --- a/siriuspy/tests/test_search.py +++ b/siriuspy/tests/test_search.py @@ -9,6 +9,7 @@ from siriuspy import search from siriuspy.search import PSSearch from siriuspy.search import MASearch +from siriuspy.pwrsupply.siggen import SigGenConfig from siriuspy.magnet.excdata import ExcitationData mock_flag = True @@ -64,6 +65,7 @@ class TestPSSearch(unittest.TestCase): 'conv_psname_2_excdata', 'check_psname_ispulsed', 'conv_psname_2_psmodel', + 'conv_psname_2_siggenconf', 'check_pstype_ispulsed', 'conv_psname_2_bbbname', 'conv_bbbname_2_psnames', @@ -161,6 +163,8 @@ def setUp(self): read_test_ps_pstypes self.mock_web.ps_pstype_setpoint_limits.return_value = \ read_test_file('pwrsupply/pstypes-setpoint-limits.txt') + self.mock_web.ps_siggen_configuration_read.return_value = \ + read_test_file('pwrsupply/siggen-configuration.txt') self.mock_web.pu_pstype_setpoint_limits.return_value = \ read_test_file('pwrsupply/putypes-setpoint-limits.txt') self.mock_web.ps_psmodels_read.return_value = \ @@ -333,6 +337,12 @@ def test_conv_psname_2_psmodel(self): model = PSSearch.conv_psname_2_psmodel(psname=ps) self.assertIsInstance(model, str) + def test_conv_psname_2_siggenconf(self): + """Test conv_psname_2_siggenconf.""" + for ps, pstype in TestPSSearch.sample.items(): + siggenconf = PSSearch.conv_psname_2_siggenconf(psname=ps) + self.assertIsInstance(siggenconf, SigGenConfig) + def test_check_pstype_ispulsed(self): """Test check_pstype_isplused.""" pstypes = PSSearch.get_pstype_names()