diff --git a/src/haven/devices/ion_chamber.py b/src/haven/devices/ion_chamber.py index 7b28c8ba..c4c64653 100644 --- a/src/haven/devices/ion_chamber.py +++ b/src/haven/devices/ion_chamber.py @@ -49,6 +49,7 @@ class IonChamber(StandardReadable, Triggerable): _ophyd_labels_ = {"ion_chambers", "detectors"} _trigger_statuses = {} + _clock_register_width = 32 # bits in the register def __init__( self, @@ -270,6 +271,43 @@ async def connect( # Update the labjack's input's .DESC field to match the scaler channel await self.voltmeter_channel.description.set(desc) + async def default_timeout(self) -> float: + """Calculate the expected timeout for triggering this ion chamber. + + If ``self.mcs.scaler.preset_time`` is set and the first + (clock) channel is serving as a gate + (``self.mcs.scaler.channels[0].is_gate.get_value() == True``) + then the timeout is a little longer than the preset + time. + + Otherwise the timeout is calculated based on the longest time + the scaler can count for based on when the channel 0 clock + register would fill up. For example, a 32-bit scaler with a + 9.6 MHz external clock could count for up to: + + 2**32 / 9600000 = 447.4 seconds + + Returns + ======= + timeout + The optimal timeout for trigger this ion chamber's scaler. + + """ + aws = [ + self.mcs.scaler.channels[0].is_gate.get_value(), + self.mcs.scaler.preset_time.get_value(), + self.mcs.scaler.clock_frequency.get_value(), + ] + is_time_limited, count_time, clock_freq = await asyncio.gather(*aws) + if is_time_limited: + # We're using the preset time to decide when to stop + return count_time + DEFAULT_TIMEOUT + else: + # Use the maximum time the scaler could possibly count + max_counts = 2**self._clock_register_width + max_count_time = max_counts / clock_freq + return max_count_time + DEFAULT_TIMEOUT + @AsyncStatus.wrap async def trigger(self, record_dark_current=False): """Instruct the ion chamber's scaler to capture one data point. @@ -287,6 +325,8 @@ async def trigger(self, record_dark_current=False): if record_dark_current: await self.record_dark_current() return + # Calculate expected timeout value + timeout = await self.default_timeout() # Check if we've seen this signal before signal = self.mcs.scaler.count last_status = self._trigger_statuses.get(signal.source) @@ -295,7 +335,7 @@ async def trigger(self, record_dark_current=False): await last_status return # Nothing to wait on yet, so trigger the scaler and stash the result - st = signal.set(True) + st = signal.set(True, timeout=timeout) self._trigger_statuses[signal.source] = st await st diff --git a/src/haven/tests/test_ion_chamber.py b/src/haven/tests/test_ion_chamber.py index 6fcd9d8f..bfa9f79e 100644 --- a/src/haven/tests/test_ion_chamber.py +++ b/src/haven/tests/test_ion_chamber.py @@ -123,6 +123,7 @@ async def test_readables(ion_chamber): @pytest.mark.asyncio async def test_trigger(ion_chamber): await ion_chamber.connect(mock=True) + set_mock_value(ion_chamber.mcs.scaler.clock_frequency, 50e6) assert ion_chamber._trigger_statuses == {} # Does the same trigger twice return the same status1 = ion_chamber.trigger() @@ -146,6 +147,31 @@ async def test_trigger_dark_current(ion_chamber, monkeypatch): assert ion_chamber.mcs.scaler.record_dark_current.trigger.called +async def test_default_timeout_with_time(ion_chamber): + await ion_chamber.connect(mock=True) + await ion_chamber.mcs.scaler.channels[0].is_gate.set(True) + await ion_chamber.mcs.scaler.preset_time.set(17) + timeout = await ion_chamber.default_timeout() + assert timeout == pytest.approx(27) + + +async def test_default_timeout_with_gate(ion_chamber): + """If another channel is gated, then use the maximum timeout. + + Assume the internal register is 32-bit, then we can count for at most: + + 2**32 / clock_freq + + """ + clock_freq = 50e6 + await ion_chamber.connect(mock=True) + await ion_chamber.mcs.scaler.channels[0].is_gate.set(False) + await ion_chamber.mcs.scaler.clock_frequency.set(clock_freq) + timeout = await ion_chamber.default_timeout() + buff_size = 2**32 + assert timeout == pytest.approx(buff_size / clock_freq + 10) + + @pytest.mark.asyncio async def test_net_current_signal(ion_chamber): """Test that scaler tick counts get properly converted to ion chamber current."""