Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Camera trigger #771

Open
wants to merge 8 commits into
base: iblrigv8dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 119 additions & 46 deletions iblrig/base_choice_world.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Extends the base_tasks modules by providing task logic around the Choice World protocol."""

import abc
import enum
import logging
import math
import random
import subprocess
import time
from pathlib import Path
from string import ascii_letters
from typing import Annotated, Any
from typing import Annotated, Any, final

import numpy as np
import pandas as pd
Expand All @@ -29,6 +30,7 @@
NTRIALS_INIT = 2000
NBLOCKS_INIT = 100


# TODO: task parameters should be verified through a pydantic model
#
# Probability = Annotated[float, Field(ge=0.0, le=1.0)]
Expand Down Expand Up @@ -179,37 +181,125 @@ def start_hardware(self):
self.start_mixin_bonsai_visual_stimulus()
self.bpod.register_softcodes(self.softcode_dictionary())

@final
def _wait_for_camera_and_initial_delay(self):
initial_delay = self.task_params.get('SESSION_DELAY_START', 0)

# temporary IntEnum for storing softcodes
# SOFTCODE.TRIGGER_CAMERA is being reused, we add three more unique values
class TemporarySoftcodes(enum.IntEnum):
START_CAMERA_RECORDING = SOFTCODE.TRIGGER_CAMERA.value
WAIT_FOR_CAMERA_TRIGGER = enum.auto()
CAMERA_TRIGGER_RECEIVED = enum.auto()
STARTING_INITIAL_DELAY = enum.auto()

# store the original softcode handler
original_softcode_handler = self.bpod.softcode_handler_function

# define temporary softcode handler
def temporary_softcode_handler(softcode: int):
match softcode:
case TemporarySoftcodes.START_CAMERA_RECORDING:
original_softcode_handler(softcode) # pass to original handler
case TemporarySoftcodes.WAIT_FOR_CAMERA_TRIGGER:
log.info('Waiting to receive first camera trigger ...')
case TemporarySoftcodes.CAMERA_TRIGGER_RECEIVED:
log.info('Camera trigger received')
case TemporarySoftcodes.STARTING_INITIAL_DELAY:
if initial_delay > 0:
log.info(f'Waiting for {initial_delay} s')
else:
log.info('No initial delay defined')

Copy link
Contributor Author

@bimac bimac Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above looks pretty complex for what it is doing. The goal was to make things as robust and clear as possible whilst keeping things backwards compatible.

# overwrite softcode handler
self.bpod.softcode_handler_function = temporary_softcode_handler

# define and run state machine
sma = StateMachine(self.bpod)
sma.add_state(
state_name='start_camera_workflow',
output_actions=[('SoftCode', TemporarySoftcodes.START_CAMERA_RECORDING)],
state_change_conditions={'Tup': 'wait_for_camera_trigger'},
)
sma.add_state(
state_name='wait_for_camera_trigger',
output_actions=[('SoftCode', TemporarySoftcodes.WAIT_FOR_CAMERA_TRIGGER)],
state_change_conditions={'Port1In': 'camera_trigger_received'},
)
sma.add_state(
state_name='camera_trigger_received',
output_actions=[('SoftCode', TemporarySoftcodes.CAMERA_TRIGGER_RECEIVED), ('BNC1', 255)],
state_change_conditions={'Tup': 'delay_initiation'},
)
sma.add_state(
state_name='delay_initiation',
state_timer=self.task_params.get('SESSION_DELAY_START', 0),
output_actions=[('SoftCode', TemporarySoftcodes.STARTING_INITIAL_DELAY)],
state_change_conditions={'Tup': 'exit'},
)
self.bpod.send_state_machine(sma)
self.bpod.run_state_machine(sma) # blocking until state-machine is finished
if initial_delay > 0:
log.info('Initial delay has passed')

# restore original softcode handler
self.bpod.softcode_handler_function = original_softcode_handler

def _run(self):
"""Run the task with the actual state machine."""
time_last_trial_end = time.time()
for i in range(self.task_params.NTRIALS): # Main loop
# t_overhead = time.time()
for trial_number in range(self.task_params.NTRIALS): # Main loop
# obtain state machine definition
self.next_trial()
log.info(f'Starting trial: {i}')
# =============================================================================
# Start state machine definition
# =============================================================================
sma = self.get_state_machine_trial(i)
sma = self.get_state_machine_trial(trial_number)

# Waiting for camera / initial delay will be handled just prior to the first trial
# This is done here to allow for backward compatibility with unadapted tasks
if trial_number == 0:
# warn if state machine uses deprecated way of waiting for camera / initial delay
if (5, SOFTCODE.TRIGGER_CAMERA) in sma.output_matrix[0] and sma.state_names[1] == 'delay_initiation':
log.warning('')
log.warning('**********************************************')
log.warning('ATTENTION: YOUR TASK DEFINITION NEEDS UPDATING')
log.warning('**********************************************')
log.warning('Camera and initial delay should not be handled')
log.warning('within the `get_state_machine_trial()` method.')
log.warning("Please see IBLRIG's documentation for details.")
log.warning('**********************************************')
log.warning('')
log.info('Waiting for 10s so you actually read this message ;-)')
time.sleep(10)
else:
self._wait_for_camera_and_initial_delay()

# send state machine description to Bpod device
log.debug('Sending state machine to bpod')
# Send state machine description to Bpod device
self.bpod.send_state_machine(sma)
# t_overhead = time.time() - t_overhead
# The ITI_DELAY_SECS defines the grey screen period within the state machine, where the
# Bpod TTL is HIGH. The DEAD_TIME param defines the time between last trial and the next
dead_time = self.task_params.get('DEAD_TIME', 0.5)
dt = self.task_params.ITI_DELAY_SECS - dead_time - (time.time() - time_last_trial_end)
# wait to achieve the desired ITI duration
if dt > 0:
time.sleep(dt)
# Run state machine

# handle ITI durations
if trial_number > 0:
# The ITI_DELAY_SECS defines the grey screen period within the state machine, where the
# Bpod TTL is HIGH. The DEAD_TIME param defines the time between last trial and the next
dead_time = self.task_params.get('DEAD_TIME', 0.5)
dt = self.task_params.ITI_DELAY_SECS - dead_time - (time.time() - time_last_trial_end)

# wait to achieve the desired ITI duration
if dt > 0:
log.debug(f'Waiting {dt} s to achieve an ITI duration of {self.task_params.ITI_DELAY_SECS} s')
time.sleep(dt)

Copy link
Contributor Author

@bimac bimac Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above won't be needed in the first trial - hence the conditional. We want to start the state machine asap after finishing the initial delay

# run state machine
log.info('-----------------------')
log.info(f'Starting trial: {trial_number}')
log.debug('running state machine')
self.bpod.run_state_machine(sma) # Locks until state machine 'exit' is reached
time_last_trial_end = time.time()

# handle pause event
flag_pause = self.paths.SESSION_FOLDER.joinpath('.pause')
flag_stop = self.paths.SESSION_FOLDER.joinpath('.stop')
if flag_pause.exists() and i < (self.task_params.NTRIALS - 1):
log.info(f'Pausing session inbetween trials {i} and {i + 1}')
if flag_pause.exists() and trial_number < (self.task_params.NTRIALS - 1):
log.info(f'Pausing session inbetween trials {trial_number} and {trial_number + 1}')
while flag_pause.exists() and not flag_stop.exists():
time.sleep(1)
self.trials_table.at[self.trial_num, 'pause_duration'] = time.time() - time_last_trial_end
Expand All @@ -218,12 +308,12 @@ def _run(self):

# save trial and update log
self.trial_completed(self.bpod.session.current_trial.export())
self.ambient_sensor_table.loc[i] = self.bpod.get_ambient_sensor_reading()
self.ambient_sensor_table.loc[trial_number] = self.bpod.get_ambient_sensor_reading()
self.show_trial_log()

# handle stop event
if flag_stop.exists():
log.info('Stopping session after trial %d', i)
log.info('Stopping session after trial %d', trial_number)
flag_stop.unlink()
break

Expand Down Expand Up @@ -315,30 +405,13 @@ def get_state_machine_trial(self, i):
# we define the trial number here for subclasses that may need it
sma = self._instantiate_state_machine(trial_number=i)

if i == 0: # First trial exception start camera
session_delay_start = self.task_params.get('SESSION_DELAY_START', 0)
log.info('First trial initializing, will move to next trial only if:')
log.info('1. camera is detected')
log.info(f'2. {session_delay_start} sec have elapsed')
sma.add_state(
state_name='trial_start',
state_timer=0,
state_change_conditions={'Port1In': 'delay_initiation'},
output_actions=[('SoftCode', SOFTCODE.TRIGGER_CAMERA), ('BNC1', 255)],
) # start camera
sma.add_state(
state_name='delay_initiation',
state_timer=session_delay_start,
output_actions=[],
state_change_conditions={'Tup': 'reset_rotary_encoder'},
)
else:
sma.add_state(
state_name='trial_start',
state_timer=0, # ~100µs hardware irreducible delay
state_change_conditions={'Tup': 'reset_rotary_encoder'},
output_actions=[self.bpod.actions.stop_sound, ('BNC1', 255)],
) # stop all sounds
# Signal trial start and stop all sounds
sma.add_state(
state_name='trial_start',
state_timer=0, # ~100µs hardware irreducible delay
state_change_conditions={'Tup': 'reset_rotary_encoder'},
output_actions=[self.bpod.actions.stop_sound, ('BNC1', 255)],
)

# Reset the rotary encoder by sending the following opcodes via the modules serial interface
# - 'Z' (ASCII 90): Set current rotary encoder position to zero
Expand Down
31 changes: 6 additions & 25 deletions iblrig_tasks/_iblrig_tasks_neuroModulatorChoiceWorld/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import iblrig.misc
from iblrig.base_choice_world import BiasedChoiceWorldSession, BiasedChoiceWorldTrialData
from iblrig.hardware import SOFTCODE
from pybpodapi.protocol import StateMachine

REWARD_AMOUNTS_UL = (1, 3)
Expand Down Expand Up @@ -54,30 +53,12 @@ def choice_to_feedback_delay(self):
def get_state_machine_trial(self, i):
sma = StateMachine(self.bpod)

if i == 0: # First trial exception start camera
session_delay_start = self.task_params.get('SESSION_DELAY_START', 0)
log.info('First trial initializing, will move to next trial only if:')
log.info('1. camera is detected')
log.info(f'2. {session_delay_start} sec have elapsed')
sma.add_state(
state_name='trial_start',
state_timer=0,
state_change_conditions={'Port1In': 'delay_initiation'},
output_actions=[('SoftCode', SOFTCODE.TRIGGER_CAMERA), ('BNC1', 255)],
) # start camera
sma.add_state(
state_name='delay_initiation',
state_timer=session_delay_start,
output_actions=[],
state_change_conditions={'Tup': 'reset_rotary_encoder'},
)
else:
sma.add_state(
state_name='trial_start',
state_timer=0, # ~100µs hardware irreducible delay
state_change_conditions={'Tup': 'reset_rotary_encoder'},
output_actions=[self.bpod.actions.stop_sound, ('BNC1', 255)],
) # stop all sounds
sma.add_state(
state_name='trial_start',
state_timer=0, # ~100µs hardware irreducible delay
state_change_conditions={'Tup': 'reset_rotary_encoder'},
output_actions=[self.bpod.actions.stop_sound, ('BNC1', 255)],
)

sma.add_state(
state_name='reset_rotary_encoder',
Expand Down
Loading