Skip to content

Commit

Permalink
Merge pull request #313 from spc-group/ion_chamber_timeout
Browse files Browse the repository at this point in the history
Proper Ion chamber timeout calculation
  • Loading branch information
canismarko authored Nov 26, 2024
2 parents 35bd217 + e5a78c1 commit c2697e9
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
42 changes: 41 additions & 1 deletion src/haven/devices/ion_chamber.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class IonChamber(StandardReadable, Triggerable):

_ophyd_labels_ = {"ion_chambers", "detectors"}
_trigger_statuses = {}
_clock_register_width = 32 # bits in the register

def __init__(
self,
Expand Down Expand Up @@ -270,6 +271,43 @@ async def connect(
# Update the labjack's input's .DESC field to match the scaler channel
await self.voltmeter_channel.description.set(desc)

async def default_timeout(self) -> float:
"""Calculate the expected timeout for triggering this ion chamber.
If ``self.mcs.scaler.preset_time`` is set and the first
(clock) channel is serving as a gate
(``self.mcs.scaler.channels[0].is_gate.get_value() == True``)
then the timeout is a little longer than the preset
time.
Otherwise the timeout is calculated based on the longest time
the scaler can count for based on when the channel 0 clock
register would fill up. For example, a 32-bit scaler with a
9.6 MHz external clock could count for up to:
2**32 / 9600000 = 447.4 seconds
Returns
=======
timeout
The optimal timeout for trigger this ion chamber's scaler.
"""
aws = [
self.mcs.scaler.channels[0].is_gate.get_value(),
self.mcs.scaler.preset_time.get_value(),
self.mcs.scaler.clock_frequency.get_value(),
]
is_time_limited, count_time, clock_freq = await asyncio.gather(*aws)
if is_time_limited:
# We're using the preset time to decide when to stop
return count_time + DEFAULT_TIMEOUT
else:
# Use the maximum time the scaler could possibly count
max_counts = 2**self._clock_register_width
max_count_time = max_counts / clock_freq
return max_count_time + DEFAULT_TIMEOUT

@AsyncStatus.wrap
async def trigger(self, record_dark_current=False):
"""Instruct the ion chamber's scaler to capture one data point.
Expand All @@ -287,6 +325,8 @@ async def trigger(self, record_dark_current=False):
if record_dark_current:
await self.record_dark_current()
return
# Calculate expected timeout value
timeout = await self.default_timeout()
# Check if we've seen this signal before
signal = self.mcs.scaler.count
last_status = self._trigger_statuses.get(signal.source)
Expand All @@ -295,7 +335,7 @@ async def trigger(self, record_dark_current=False):
await last_status
return
# Nothing to wait on yet, so trigger the scaler and stash the result
st = signal.set(True)
st = signal.set(True, timeout=timeout)
self._trigger_statuses[signal.source] = st
await st

Expand Down
26 changes: 26 additions & 0 deletions src/haven/tests/test_ion_chamber.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def test_readables(ion_chamber):
@pytest.mark.asyncio
async def test_trigger(ion_chamber):
await ion_chamber.connect(mock=True)
set_mock_value(ion_chamber.mcs.scaler.clock_frequency, 50e6)
assert ion_chamber._trigger_statuses == {}
# Does the same trigger twice return the same
status1 = ion_chamber.trigger()
Expand All @@ -146,6 +147,31 @@ async def test_trigger_dark_current(ion_chamber, monkeypatch):
assert ion_chamber.mcs.scaler.record_dark_current.trigger.called


async def test_default_timeout_with_time(ion_chamber):
await ion_chamber.connect(mock=True)
await ion_chamber.mcs.scaler.channels[0].is_gate.set(True)
await ion_chamber.mcs.scaler.preset_time.set(17)
timeout = await ion_chamber.default_timeout()
assert timeout == pytest.approx(27)


async def test_default_timeout_with_gate(ion_chamber):
"""If another channel is gated, then use the maximum timeout.
Assume the internal register is 32-bit, then we can count for at most:
2**32 / clock_freq
"""
clock_freq = 50e6
await ion_chamber.connect(mock=True)
await ion_chamber.mcs.scaler.channels[0].is_gate.set(False)
await ion_chamber.mcs.scaler.clock_frequency.set(clock_freq)
timeout = await ion_chamber.default_timeout()
buff_size = 2**32
assert timeout == pytest.approx(buff_size / clock_freq + 10)


@pytest.mark.asyncio
async def test_net_current_signal(ion_chamber):
"""Test that scaler tick counts get properly converted to ion chamber current."""
Expand Down

0 comments on commit c2697e9

Please sign in to comment.