From f922263ee2fd2ed73ac1ad42980c277cf583ee1d Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Thu, 20 Jul 2023 10:14:01 -0700 Subject: [PATCH 01/10] WIP: incorporation of spike encoder classes --- src/miv_simulator/clamps/network.py | 2 +- src/miv_simulator/plotting.py | 21 +- src/miv_simulator/spike_encoder.py | 101 +++++ src/miv_simulator/stimulus.py | 567 ++-------------------------- 4 files changed, 152 insertions(+), 539 deletions(-) create mode 100644 src/miv_simulator/spike_encoder.py diff --git a/src/miv_simulator/clamps/network.py b/src/miv_simulator/clamps/network.py index 75184d9..7597dfd 100644 --- a/src/miv_simulator/clamps/network.py +++ b/src/miv_simulator/clamps/network.py @@ -1427,7 +1427,7 @@ def init_rate_dist_objfun( np.isclose(target_rate_vector, 0.0, atol=1e-3, rtol=1e-3) ] = 0.0 - trj_x, trj_y, trj_d, trj_t = stimulus.read_stimulus( + trj_d, trj_t = stimulus.read_stimulus( input_features_path if input_features_path is not None else spike_events_path, diff --git a/src/miv_simulator/plotting.py b/src/miv_simulator/plotting.py index cb6d7a2..6cbda83 100644 --- a/src/miv_simulator/plotting.py +++ b/src/miv_simulator/plotting.py @@ -881,16 +881,17 @@ def plot_spike_raster( sct = None if len(pop_spkinds) > 0: - sct = axes[i].scatter( - pop_spkts, - pop_spkinds, - s=1, - linewidths=fig_options.lw, - marker=marker, - c=pop_colors[pop_name], - alpha=0.5, - label=pop_name, - ) + for this_pop_spkts, this_pop_spkinds in zip(pop_spkts, pop_spkinds): + sct = axes[i].scatter( + this_pop_spkts, + this_pop_spkinds, + s=1, + linewidths=fig_options.lw, + marker=marker, + c=pop_colors[pop_name], + alpha=0.5, + label=pop_name, + ) axes[i].spines["top"].set_visible(False) axes[i].spines["bottom"].set_visible(False) diff --git a/src/miv_simulator/spike_encoder.py b/src/miv_simulator/spike_encoder.py new file mode 100644 index 0000000..e312d8e --- /dev/null +++ b/src/miv_simulator/spike_encoder.py @@ -0,0 +1,101 @@ +import numpy as np +from numpy import ndarray +from typing import Tuple + + +class RateEncoder: + def __init__( + self, + time_window: int = 100, + input_range: Tuple[int, int] = (0, 255), + output_freq_range: Tuple[int, int] = (0, 200), + ) -> None: + assert input_range[1] - input_range[0] > 0 + assert output_freq_range[1] - output_freq_range[0] > 0 + assert time_window > 0 + self.min_input, self.max_input = input_range[0], input_range[1] + self.min_output, self.max_output = ( + output_freq_range[0], + output_freq_range[1], + ) + self.time_window = time_window + + def encode(self, signal: ndarray) -> ndarray: + assert ( + len(signal.shape) == 2 + ), "encode requires input signal of shape number_samples x input_dimensions" + + nsamples = signal.shape[0] + ndim = signal.shape[1] + + total_spikes = [] + for r in range(ndim): + s = signal[:, r] + freq = np.interp( + s, + [self.min_input, self.max_input], + [self.min_output, self.max_output], + ) + nz = np.argwhere(freq > 0) + period = np.zeros(nsamples) + period[nz] = (1 / freq[nz]) * 1000 # ms + spikes = np.zeros((nsamples, self.time_window)) + for i in range(nsamples): + if period[i] > 0: + stride = int(period[i]) + spikes[i, 0 : self.time_window : stride] = 1 + total_spikes.append(spikes) + + return np.stack(total_spikes, axis=1) + + +class PoissonRateEncoder: + def __init__( + self, + time_window: int = 100, + input_range: Tuple[int, int] = (0, 255), + output_freq_range: Tuple[int, int] = (0, 200), + generator: None = None, + ) -> None: + assert input_range[1] - input_range[0] > 0 + assert output_freq_range[1] - output_freq_range[0] > 0 + assert time_window > 0 + self.min_input, self.max_input = input_range[0], input_range[1] + self.min_output, self.max_output = ( + output_freq_range[0], + output_freq_range[1], + ) + self.time_window = time_window + if generator is None: + generator = np.random + self.generator = generator + + def encode(self, signal: ndarray) -> ndarray: + assert ( + len(signal.shape) == 2 + ), "encode requires input signal of shape number_samples x input_dimensions" + + nsamples = signal.shape[0] + ndim = signal.shape[1] + + total_spikes = [] + for r in range(ndim): + spike_train = [] + s = signal[:, r] + freq = np.interp( + s, + [self.min_input, self.max_input], + [self.min_output, self.max_output], + ) + + spikes = np.random.uniform( + 0, 1, nsamples * self.time_window + ).reshape((nsamples, self.time_window)) + dt = 0.001 # second + for i in range(nsamples): + spikes[i, np.where(spikes < freq[i] * dt)] = 1 + spikes[i, np.where(spikes[i] != 1)] = 0 + + total_spikes.append(spikes) + + return np.stack(total_spikes, axis=1) diff --git a/src/miv_simulator/stimulus.py b/src/miv_simulator/stimulus.py index c8831a0..601b53b 100644 --- a/src/miv_simulator/stimulus.py +++ b/src/miv_simulator/stimulus.py @@ -3,8 +3,13 @@ from collections import namedtuple import numpy as np -from miv_simulator.stgen import get_inhom_poisson_spike_times_by_thinning from miv_simulator.utils import AbstractEnv, Struct, get_module_logger +from miv_simulator.spike_encoder import ( + RateEncoder, + PoissonRateEncoder, + RankOrderEncoder, +) +from miv_simulator.stgen import get_inhom_poisson_spike_times_by_thinning from mpi4py import MPI from mpi4py.MPI import Intracomm from neuroh5.io import ( @@ -24,11 +29,14 @@ ) -class ConstantInputCellConfig: +class InputSource: + pass + + +class ConstantInputSource(InputSource): def __init__( self, selectivity_type: None = None, - arena: None = None, peak_rate: None = None, local_random: None = None, selectivity_attr_dict: Optional[Dict[str, ndarray]] = None, @@ -36,7 +44,6 @@ def __init__( ) -> None: """ :param selectivity_type: int - :param arena: namedtuple :param peak_rate: float :param local_random: :class:'np.random.RandomState' :param selectivity_attr_dict: dict @@ -63,9 +70,9 @@ def __init__( if selectivity_attr_dict is not None: self.init_from_attr_dict(selectivity_attr_dict) - elif any([arg is None for arg in [selectivity_type, peak_rate, arena]]): + elif any([arg is None for arg in [selectivity_type, peak_rate]]): raise RuntimeError( - "ConstantInputCellConfig: missing argument(s) required for object construction" + "ConstantInputSource: missing argument(s) required for object construction" ) else: if local_random is None: @@ -87,11 +94,9 @@ def get_selectivity_attr_dict(self): "Peak Rate": np.array([self.peak_rate], dtype=np.float32), } - def get_rate_map( + def call( self, - x: ndarray, - y: ndarray, - velocity: None = None, + signal: ndarray, initial_phase: float = 0.0, ) -> ndarray: """ @@ -101,20 +106,10 @@ def get_rate_map( :return: array """ - if (velocity is None) and (self.phase_mod_function is not None): - raise RuntimeError( - "ConstantInputCellConfig.get_rate_map: when phase config is provided, get_rate_map must be provided with velocity" - ) - - rate_array = np.ones_like(x, dtype=np.float32) * self.peak_rate + rate_array = np.ones_like(signal, dtype=np.float32) * self.peak_rate mean_rate = np.mean(rate_array) - if (velocity is not None) and (self.phase_mod_function is not None): - d = np.insert( - np.cumsum(np.sqrt(np.diff(x) ** 2.0 + np.diff(y) ** 2.0)), - 0, - 0.0, - ) - t = d / velocity + if self.phase_mod_function is not None: + # t = TODO rate_array *= self.phase_mod_function( t, initial_phase=initial_phase ) @@ -125,53 +120,51 @@ def get_rate_map( return rate_array -def get_input_cell_config( +def make_input_source( selectivity_type: uint8, selectivity_type_names: Dict[int, str], population: None = None, stimulus_config: None = None, - arena: None = None, distance: None = None, local_random: None = None, selectivity_attr_dict: Optional[Dict[str, ndarray]] = None, phase_mod_config: None = None, noise_gen_dict: None = None, comm: None = None, -) -> ConstantInputCellConfig: +) -> InputSource: """ :param selectivity_type: int :param selectivity_type_names: dict: {int: str} :param population: str :param stimulus_config: dict - :param arena: namedtuple :param distance: float; u arc distance normalized to reference layer :param local_random: :class:'np.random.RandomState' :param selectivity_attr_dict: dict :param phase_mod_config: dict; oscillatory phase modulation configuration - :return: instance of one of various InputCell classes + :return: instance of one of various InputSource classes """ if selectivity_type not in selectivity_type_names: raise RuntimeError( - "get_input_cell_config: enumerated selectivity type: %i not recognized" + "make_input_source: enumerated selectivity type: %i not recognized" % selectivity_type ) selectivity_type_name = selectivity_type_names[selectivity_type] if selectivity_attr_dict is not None: if selectivity_type_name == "constant": - input_cell_config = ConstantInputCellConfig( + input_source = ConstantInputSource( selectivity_attr_dict=selectivity_attr_dict, phase_mod_config=phase_mod_config, ) else: RuntimeError( - "get_input_cell_config: selectivity type %s is not supported" + "make_input_source: selectivity type %s is not supported" % selectivity_type_name ) - elif any([arg is None for arg in [population, stimulus_config, arena]]): + elif any([arg is None for arg in [population, stimulus_config]]): raise RuntimeError( - f"get_input_cell_config: missing argument(s) required to construct {selectivity_type_name} cell config object: population: {population} arena: {arena} stimulus_config: {stimulus_config}" + f"make_input_source: missing argument(s) required to construct {selectivity_type_name} cell config object: population: {population} stimulus_config: {stimulus_config}" ) else: if ( @@ -179,24 +172,23 @@ def get_input_cell_config( or selectivity_type not in stimulus_config["Peak Rate"][population] ): raise RuntimeError( - "get_input_cell_config: peak rate not specified for population: %s, selectivity type: " + "make_input_source: peak rate not specified for population: %s, selectivity type: " "%s" % (population, selectivity_type_name) ) peak_rate = stimulus_config["Peak Rate"][population][selectivity_type] if selectivity_type_name == "constant": - input_cell_config = ConstantInputCellConfig( + input_source = ConstantInputSource( selectivity_type=selectivity_type, - arena=arena, peak_rate=peak_rate, phase_mod_config=phase_mod_config, ) else: RuntimeError( - f"get_input_cell_config: selectivity type: {selectivity_type_name} not implemented" + f"make_input_source: selectivity type: {selectivity_type_name} not implemented" ) - return input_cell_config + return input_source def get_equilibration(env: AbstractEnv) -> Tuple[ndarray, int]: @@ -218,149 +210,11 @@ def get_equilibration(env: AbstractEnv) -> Tuple[ndarray, int]: return equilibrate -def get_2D_arena_bounds(arena, margin=0.0, margin_fraction=None): - """ - - :param arena: namedtuple - :return: tuple of (tuple of float) - """ - - vertices_x = np.asarray( - [v[0] for v in arena.domain.vertices], dtype=np.float32 - ) - vertices_y = np.asarray( - [v[1] for v in arena.domain.vertices], dtype=np.float32 - ) - if margin_fraction is not None: - extent_x = np.abs(np.max(vertices_x) - np.min(vertices_x)) - extent_y = np.abs(np.max(vertices_y) - np.min(vertices_y)) - margin = max(margin_fraction * extent_x, margin_fraction * extent_y) - arena_x_bounds = (np.min(vertices_x) - margin, np.max(vertices_x) + margin) - arena_y_bounds = (np.min(vertices_y) - margin, np.max(vertices_y) + margin) - - return arena_x_bounds, arena_y_bounds - - -def get_2D_arena_extents(arena): - """ - - :param arena: namedtuple - :return: tuple of (tuple of float) - """ - - vertices_x = np.asarray( - [v[0] for v in arena.domain.vertices], dtype=np.float32 - ) - vertices_y = np.asarray( - [v[1] for v in arena.domain.vertices], dtype=np.float32 - ) - extent_x = np.abs(np.max(vertices_x) - np.min(vertices_x)) - extent_y = np.abs(np.max(vertices_y) - np.min(vertices_y)) - - return extent_x, extent_y - - -def get_2D_arena_spatial_mesh( - arena, spatial_resolution=5.0, margin=0.0, indexing="ij" -): - """ - - :param arena: namedtuple - :param spatial_resolution: float (cm) - :param margin: float - :return: tuple of array - """ - arena_x_bounds, arena_y_bounds = get_2D_arena_bounds( - arena=arena, margin=margin - ) - arena_x = np.arange( - arena_x_bounds[0], - arena_x_bounds[1] + spatial_resolution / 2.0, - spatial_resolution, - ) - arena_y = np.arange( - arena_y_bounds[0], - arena_y_bounds[1] + spatial_resolution / 2.0, - spatial_resolution, - ) - - return np.meshgrid(arena_x, arena_y, indexing=indexing) - - -def get_2D_arena_grid(arena, spatial_resolution=5.0, margin=0.0, indexing="ij"): - """ - - :param arena: namedtuple - :param spatial_resolution: float (cm) - :param margin: float - :return: tuple of array - """ - arena_x_bounds, arena_y_bounds = get_2D_arena_bounds( - arena=arena, margin=margin - ) - arena_x = np.arange( - arena_x_bounds[0], - arena_x_bounds[1] + spatial_resolution / 2.0, - spatial_resolution, - ) - arena_y = np.arange( - arena_y_bounds[0], - arena_y_bounds[1] + spatial_resolution / 2.0, - spatial_resolution, - ) - - return arena_x, arena_y - - -def generate_linear_trajectory( - trajectory, temporal_resolution=1.0, equilibration_duration=None -): - """ - Construct coordinate arrays for a spatial trajectory, considering run velocity to interpolate at the specified - temporal resolution. Optionally, the trajectory can be prepended with extra distance traveled for a specified - network equilibration time, with the intention that the user discards spikes generated during this period before - analysis. - :param trajectory: namedtuple - :param temporal_resolution: float (ms) - :param equilibration_duration: float (ms) - :return: tuple of array - """ - velocity = trajectory.velocity # (cm / s) - spatial_resolution = velocity / 1000.0 * temporal_resolution - x = trajectory.path[:, 0] - y = trajectory.path[:, 1] - - if equilibration_duration is not None: - equilibration_distance = velocity / 1000.0 * equilibration_duration - x = np.insert(x, 0, x[0] - equilibration_distance) - y = np.insert(y, 0, y[0]) - else: - equilibration_duration = 0.0 - equilibration_distance = 0.0 - - segment_lengths = np.sqrt(np.diff(x) ** 2.0 + np.diff(y) ** 2.0) - distance = np.insert(np.cumsum(segment_lengths), 0, 0.0) - - interp_distance = np.arange( - distance.min(), - distance.max() + spatial_resolution / 2.0, - spatial_resolution, - ) - interp_x = np.interp(interp_distance, distance, x) - interp_y = np.interp(interp_distance, distance, y) - t = interp_distance / (velocity / 1000.0) # ms - - t = np.subtract(t, equilibration_duration) - interp_distance -= equilibration_distance - - return t, interp_x, interp_y, interp_distance - - def generate_input_spike_trains( env: AbstractEnv, population: str, selectivity_type_names: Dict[int, str], - trajectory: Tuple[ndarray, ndarray, ndarray, ndarray], + signal: ndarray, gid: int, selectivity_attr_dict: Dict[str, ndarray], spike_train_attr_name: str = "Spike Train", @@ -368,7 +222,7 @@ def generate_input_spike_trains( spike_hist_resolution: int = 1000, equilibrate: Optional[Tuple[ndarray, int]] = None, phase_mod_config: None = None, - initial_phases: None = None, + initial_phases: ndarray = None, spike_hist_sum: None = None, return_selectivity_features: bool = True, n_trials: int = 1, @@ -394,10 +248,8 @@ def generate_input_spike_trains( if time_range[0] is None: time_range[0] = 0.0 - t, x, y, d = trajectory - abs_d = d - d[0] + t, d = trajectory abs_t = (t - t[0]) / 1000.0 - velocity = np.insert(abs_d[1:] / abs_t[1:], 0, abs_d[1] / abs_t[1]) equilibration_duration = float( env.stimulus_config["Equilibration Duration"] @@ -423,7 +275,7 @@ def generate_input_spike_trains( if selectivity_type_name is None: selectivity_type_name = this_selectivity_type_name - input_cell_config = get_input_cell_config( + input_source = make_input_source( selectivity_type=this_selectivity_type, selectivity_type_names=selectivity_type_names, selectivity_attr_dict=selectivity_attr_dict, @@ -443,10 +295,8 @@ def generate_input_spike_trains( for i in range(n_trials): if initial_phases is not None: initial_phase = initial_phases[i] - rate_map = input_cell_config.get_rate_map( - x=x, - y=y, - velocity=velocity if phase_mod_config is not None else None, + rate_map = input_source.get_rate_map( + d=d, initial_phase=initial_phase, ) if (selectivity_type_name != "constant") and (equilibrate is not None): @@ -525,15 +375,11 @@ def choose_input_selectivity_type(p, local_random): def generate_input_features( env, population, - arena, - arena_x, - arena_y, gid, norm_distances, selectivity_type_names, selectivity_type_namespaces, noise_gen_dict=None, - rate_map_sum=None, debug=False, ): """ @@ -542,11 +388,10 @@ def generate_input_features( argument selectivity_type_namespaces. The set of selectivity attributes is determined by procedure get_selectivity_attr_dict in the respective input cell configuration class - (e.g. ConstantInputCellConfig). + (e.g. ConstantInputSource). :param env :param population: str - :param arena: str :param gid: int :param distances: (float, float) :param selectivity_type_names: @@ -571,12 +416,11 @@ def generate_input_features( local_random=local_random, ) - input_cell_config = get_input_cell_config( + input_source = make_input_source( population=population, selectivity_type=this_selectivity_type, selectivity_type_names=selectivity_type_names, stimulus_config=env.stimulus_config, - arena=arena, distance=norm_u_arc_distance, local_random=local_random, noise_gen_dict=noise_gen_dict, @@ -584,8 +428,7 @@ def generate_input_features( ) this_selectivity_type_name = selectivity_type_names[this_selectivity_type] - selectivity_attr_dict = input_cell_config.get_selectivity_attr_dict() - rate_map = input_cell_config.get_rate_map(x=arena_x, y=arena_y) + selectivity_attr_dict = input_source.get_selectivity_attr_dict() if debug and rank == 0: callback, context = debug @@ -593,9 +436,6 @@ def generate_input_features( this_context.update(dict(locals())) callback(this_context) - if rate_map_sum is not None: - rate_map_sum[this_selectivity_type_name] += rate_map - return this_selectivity_type_name, selectivity_attr_dict @@ -628,258 +468,6 @@ def read_stimulus(stimulus_path, stimulus_namespace, population, module=None): return ratemap_lst -def read_feature(feature_path, feature_namespace, population): - feature_lst = [] - - attr_gen = read_cell_attributes( - feature_path, population, namespace=feature_namespace - ) - for gid, feature_dict in attr_gen: - if "Module ID" in feature_dict: - gid_module = feature_dict["Module ID"][0] - else: - gid_module = None - rate = feature_dict["Arena Rate Map"] - feature_lst.append((gid, rate, gid_module)) - - return feature_lst - - -def bin_stimulus_features(features, t, bin_size, time_range): - """ - Continuous stimulus feature binning. - - Parameters - ---------- - features: matrix of size "number of times each feature was recorded" x "number of features" - t: a vector of size "number of times each feature was recorded" - bin_size: size of time bins - time_range: the start and end times for binning the stimulus - - - Returns - ------- - matrix of size "number of time bins" x "number of features in the output" - the average value of each output feature in every time bin - """ - - t_start, t_end = time_range - - edges = np.arange(t_start, t_end, bin_size) - nbins = edges.shape[0] - 1 - nfeatures = features.shape[1] - binned_features = np.empty([nbins, nfeatures]) - for i in range(nbins): - for j in range(nfeatures): - delta = edges[i + 1] - edges[i] - bin_range = np.arange(edges[i], edges[i + 1], delta / 5.0) - ip_vals = np.interp(bin_range, t, features[:, j]) - binned_features[i, j] = np.mean(ip_vals) - - return binned_features - - -def rate_maps_from_features( - env, - population, - cell_index_set, - input_features_path=None, - input_features_namespace=None, - input_features_dict=None, - arena_id=None, - trajectory_id=None, - time_range=None, - include_time=False, - phase_mod_config=None, -): - """Initializes presynaptic spike sources from a file with input selectivity features represented as firing rates.""" - - if input_features_dict is not None: - if (input_features_path is not None) or ( - input_features_namespace is not None - ): - raise RuntimeError( - "rate_maps_from_features: when input_features_dict is provided, input_features_path must be None" - ) - else: - if (input_features_path is None) or (input_features_namespace is None): - raise RuntimeError( - "rate_maps_from_features: either input_features_dict has to be provided, or input_features_path and input_features_namespace" - ) - - if time_range is not None: - if time_range[0] is None: - time_range[0] = 0.0 - - if arena_id is None: - arena_id = env.arena_id - if trajectory_id is None: - trajectory_id = env.trajectory_id - - spatial_resolution = float(env.stimulus_config["Spatial Resolution"]) - temporal_resolution = float(env.stimulus_config["Temporal Resolution"]) - - input_features_attr_names = [ - "Selectivity Type", - "Num Fields", - "Field Width", - "Peak Rate", - "Module ID", - "Grid Spacing", - "Grid Orientation", - "Field Width Concentration Factor", - "X Offset", - "Y Offset", - ] - - selectivity_type_names = {i: n for n, i in env.selectivity_types.items()} - - arena = env.stimulus_config["Arena"][arena_id] - - trajectory = arena.trajectories[trajectory_id] - equilibration_duration = float( - env.stimulus_config.get("Equilibration Duration", 0.0) - ) - - t, x, y, d = generate_linear_trajectory( - trajectory, - temporal_resolution=temporal_resolution, - equilibration_duration=equilibration_duration, - ) - if time_range is not None: - t_range_inds = np.where((t < time_range[1]) & (t >= time_range[0]))[0] - t = t[t_range_inds] - x = x[t_range_inds] - y = y[t_range_inds] - d = d[t_range_inds] - - input_rate_map_dict = {} - - if len(d) == 0: - return input_rate_map_dict - - abs_d = d - d[0] - abs_t = (t - t[0]) / 1000.0 - velocity = np.insert(abs_d[1:] / abs_t[1:], 0, abs_d[1] / abs_t[1]) - - pop_index = int(env.Populations[population]) - - if input_features_path is not None: - this_input_features_namespace = f"{input_features_namespace} {arena_id}" - input_features_iter = scatter_read_cell_attribute_selection( - input_features_path, - population, - selection=cell_index_set, - namespace=this_input_features_namespace, - mask=set(input_features_attr_names), - comm=env.comm, - io_size=env.io_size, - ) - else: - input_features_iter = input_features_dict.items() - - for gid, selectivity_attr_dict in input_features_iter: - this_selectivity_type = selectivity_attr_dict["Selectivity Type"][0] - this_selectivity_type_name = selectivity_type_names[ - this_selectivity_type - ] - - input_cell_config = get_input_cell_config( - selectivity_type=this_selectivity_type, - selectivity_type_names=selectivity_type_names, - selectivity_attr_dict=selectivity_attr_dict, - phase_mod_config=phase_mod_config, - ) - rate_map = input_cell_config.get_rate_map( - x=x, - y=y, - velocity=velocity if phase_mod_config is not None else None, - ) - rate_map[np.isclose(rate_map, 0.0, atol=1e-3, rtol=1e-3)] = 0.0 - - if include_time: - input_rate_map_dict[gid] = (t, rate_map) - else: - input_rate_map_dict[gid] = rate_map - - return input_rate_map_dict - - -def arena_rate_maps_from_features( - env, - population, - input_features_path, - input_features_namespace, - cell_index_set, - arena_id=None, - time_range=None, - n_trials=1, -): - """Initializes presynaptic spike sources from a file with input selectivity features represented as firing rates.""" - - if time_range is not None: - if time_range[0] is None: - time_range[0] = 0.0 - - if arena_id is None: - arena_id = env.arena_id - - spatial_resolution = float(env.stimulus_config["Spatial Resolution"]) - temporal_resolution = float(env.stimulus_config["Temporal Resolution"]) - - this_input_features_namespace = f"{input_features_namespace} {arena_id}" - - input_features_attr_names = [ - "Selectivity Type", - "Num Fields", - "Field Width", - "Peak Rate", - "Module ID", - "Grid Spacing", - "Grid Orientation", - "Field Width Concentration Factor", - "X Offset", - "Y Offset", - ] - - selectivity_type_names = {i: n for n, i in env.selectivity_types.items()} - - arena = env.stimulus_config["Arena"][arena_id] - arena_x, arena_y = get_2D_arena_spatial_mesh( - arena=arena, spatial_resolution=spatial_resolution - ) - - input_rate_map_dict = {} - pop_index = int(env.Populations[population]) - - input_features_iter = scatter_read_cell_attribute_selection( - input_features_path, - population, - selection=cell_index_set, - namespace=this_input_features_namespace, - mask=set(input_features_attr_names), - comm=env.comm, - io_size=env.io_size, - ) - for gid, selectivity_attr_dict in input_features_iter: - this_selectivity_type = selectivity_attr_dict["Selectivity Type"][0] - this_selectivity_type_name = selectivity_type_names[ - this_selectivity_type - ] - input_cell_config = get_input_cell_config( - population=population, - selectivity_type=this_selectivity_type, - selectivity_type_names=selectivity_type_names, - selectivity_attr_dict=selectivity_attr_dict, - ) - if input_cell_config.num_fields > 0: - rate_map = input_cell_config.get_rate_map(x=arena_x, y=arena_y) - rate_map[np.isclose(rate_map, 0.0, atol=1e-3, rtol=1e-3)] = 0.0 - input_rate_map_dict[gid] = rate_map - - return input_rate_map_dict - - def oscillation_phase_mod_config( env, population, soma_positions, local_random=None ): @@ -1001,80 +589,3 @@ def stationary_phase_mod( mod = s * mod_depth / 2.0 + (1.0 - mod_depth) return mod - - -def spatial_phase_mod( - x, - velocity, - field_width, - phase_range, - phase_entry, - phase_offset, - mod_depth, - freq, -): - """Non-stationary phase modulation for spatial receptive fields. - Calculates modulation according to the equation: - - s = cos(r*x/field_width + 2*pi*freq*x/velocity - phase_entry - phase_offset + r/2.) + 1 - mod = s*mod_depth/2. + (1. - mod_depth) - - - position: spatial position - - velocity: movement velocity - - field_width: receptive field - - phase_range: range of preferred phase - - phase_entry: receptive field entry phase - - mod_depth: modulation depth - - freq: frequency of global oscillation - """ - r = np.deg2rad(phase_range[1] - phase_range[0]) - s = ( - np.cos( - r * x / field_width - + 2 * np.pi * freq * x / velocity - - np.deg2rad(phase_entry + phase_offset) - + r / 2.0 - ) - + 1 - ) - d = np.clip(mod_depth, 0.0, 1.0) - m = s * mod_depth / 2.0 + (1.0 - mod_depth) - - return m - - -def spatial2d_phase_mod( - x, - y, - velocity, - field_width, - phase_range, - phase_entry, - phase_offset, - mod_depth, - freq, -): - x_mod = spatial_phase_mod( - x, - velocity, - field_width, - phase_range, - phase_entry, - phase_offset, - mod_depth, - freq, - ) - y_mod = spatial_phase_mod( - y, - velocity, - field_width, - phase_range, - phase_entry, - phase_offset, - mod_depth, - freq, - ) - - m = (x_mod + y_mod) / 2.0 - - return m From a20e09054996bee6c039c15d2d88b8f1be0595bd Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Thu, 20 Jul 2023 11:10:41 -0700 Subject: [PATCH 02/10] refactoring encoder for 1-D input signal for now --- src/miv_simulator/spike_encoder.py | 75 ++++++++++++++---------------- 1 file changed, 35 insertions(+), 40 deletions(-) diff --git a/src/miv_simulator/spike_encoder.py b/src/miv_simulator/spike_encoder.py index e312d8e..62054a2 100644 --- a/src/miv_simulator/spike_encoder.py +++ b/src/miv_simulator/spike_encoder.py @@ -1,12 +1,12 @@ import numpy as np from numpy import ndarray -from typing import Tuple +from typing import Tuple, Optional class RateEncoder: def __init__( self, - time_window: int = 100, + dt: float = 0.02, input_range: Tuple[int, int] = (0, 255), output_freq_range: Tuple[int, int] = (0, 200), ) -> None: @@ -19,7 +19,8 @@ def __init__( output_freq_range[1], ) self.time_window = time_window - + self.ndim = 1 + def encode(self, signal: ndarray) -> ndarray: assert ( len(signal.shape) == 2 @@ -27,26 +28,23 @@ def encode(self, signal: ndarray) -> ndarray: nsamples = signal.shape[0] ndim = signal.shape[1] - - total_spikes = [] - for r in range(ndim): - s = signal[:, r] - freq = np.interp( - s, + assert ndim == self.ndim, f"input signal has dimension {ndim} but encoder has input dimension {self.ndim}" + + freq = np.interp( + signal, [self.min_input, self.max_input], [self.min_output, self.max_output], ) - nz = np.argwhere(freq > 0) - period = np.zeros(nsamples) - period[nz] = (1 / freq[nz]) * 1000 # ms - spikes = np.zeros((nsamples, self.time_window)) - for i in range(nsamples): - if period[i] > 0: - stride = int(period[i]) - spikes[i, 0 : self.time_window : stride] = 1 - total_spikes.append(spikes) + nz = np.argwhere(freq > 0) + period = np.zeros(nsamples) + period[nz] = (1 / freq[nz]) * 1000 # ms + spikes = np.zeros((nsamples, self.time_window)) + for i in range(nsamples): + if period[i] > 0: + stride = int(period[i]) + spikes[i, 0 : self.time_window : stride] = 1 - return np.stack(total_spikes, axis=1) + return spikes class PoissonRateEncoder: @@ -55,7 +53,7 @@ def __init__( time_window: int = 100, input_range: Tuple[int, int] = (0, 255), output_freq_range: Tuple[int, int] = (0, 200), - generator: None = None, + generator: Optional[np.random.RandomState] = None, ) -> None: assert input_range[1] - input_range[0] > 0 assert output_freq_range[1] - output_freq_range[0] > 0 @@ -69,7 +67,8 @@ def __init__( if generator is None: generator = np.random self.generator = generator - + self.ndim = 1 + def encode(self, signal: ndarray) -> ndarray: assert ( len(signal.shape) == 2 @@ -77,25 +76,21 @@ def encode(self, signal: ndarray) -> ndarray: nsamples = signal.shape[0] ndim = signal.shape[1] + assert ndim == self.ndim, f"input signal has dimension {ndim} but encoder has input dimension {self.ndim}" - total_spikes = [] - for r in range(ndim): - spike_train = [] - s = signal[:, r] - freq = np.interp( - s, - [self.min_input, self.max_input], - [self.min_output, self.max_output], - ) - - spikes = np.random.uniform( - 0, 1, nsamples * self.time_window - ).reshape((nsamples, self.time_window)) - dt = 0.001 # second - for i in range(nsamples): - spikes[i, np.where(spikes < freq[i] * dt)] = 1 - spikes[i, np.where(spikes[i] != 1)] = 0 + spike_train = [] + freq = np.interp( + signal, + [self.min_input, self.max_input], + [self.min_output, self.max_output], + ) - total_spikes.append(spikes) + spikes = self.generator.uniform( + 0, 1, nsamples * self.time_window + ).reshape((nsamples, self.time_window)) + dt = 0.001 # second + for i in range(nsamples): + spikes[i, np.where(spikes < freq[i] * dt)] = 1 + spikes[i, np.where(spikes[i] != 1)] = 0 - return np.stack(total_spikes, axis=1) + return spikes From e64b2800533ba420224933b74f83545407cae458 Mon Sep 17 00:00:00 2001 From: Frithjof Gressmann Date: Sat, 16 Sep 2023 14:12:07 -0500 Subject: [PATCH 03/10] Add spike train types --- src/miv_simulator/coding.py | 52 ++++++++++++++++++++++++++++++++ src/miv_simulator/typing.py | 60 +++++++++++++++++++++++++++++++++++++ tests/test_coding.py | 40 +++++++++++++++++++++++++ tests/test_typing.py | 10 +++++++ 4 files changed, 162 insertions(+) create mode 100644 src/miv_simulator/coding.py create mode 100644 src/miv_simulator/typing.py create mode 100644 tests/test_coding.py create mode 100644 tests/test_typing.py diff --git a/src/miv_simulator/coding.py b/src/miv_simulator/coding.py new file mode 100644 index 0000000..3ebbc01 --- /dev/null +++ b/src/miv_simulator/coding.py @@ -0,0 +1,52 @@ +from miv_simulator import typing as st +import numpy as np + + +def spike_times_2_binary_sparse_spike_train( + array: st.SpikeTimesLike, temporal_resolution: float +) -> st.BinarySparseSpikeTrain: + a = st.cast_spike_times(array) + bins = np.floor(a / temporal_resolution).astype(int) + # since a is sorted, maximum is last value + spike_train = np.zeros(bins[-1] + 1, dtype=np.int8) + spike_train[bins] = 1 + return spike_train + + +def binary_sparse_spike_train_2_spike_times( + array: st.BinarySparseSpikeTrainLike, temporal_resolution: float +) -> st.SpikeTimes: + a = st.cast_binary_sparse_spike_train(array) + spike_indices = np.where(a == 1)[0] + spike_times = spike_indices * temporal_resolution + return spike_times + + +def adjust_temporal_resolution( + array: st.BinarySparseSpikeTrainLike, + original_resolution: float, + target_resolution: float, +) -> st.BinarySparseSpikeTrain: + a = st.cast_binary_sparse_spike_train(array) + + ratio = target_resolution / original_resolution + if ratio == 1: + return a + + new_length = int(a.shape[0] * ratio) + new_spike_train = np.zeros(new_length, dtype=np.int8) + + # up + if ratio > 1: + for idx, val in enumerate(a): + start = int(idx * ratio) + end = int((idx + 1) * ratio) + new_spike_train[start:end] = val + + # down + elif ratio < 1: + for idx in range(0, len(a), int(1 / ratio)): + if np.any(a[idx : idx + int(1 / ratio)]): + new_spike_train[idx // int(1 / ratio)] = 1 + + return new_spike_train diff --git a/src/miv_simulator/typing.py b/src/miv_simulator/typing.py new file mode 100644 index 0000000..bf78c17 --- /dev/null +++ b/src/miv_simulator/typing.py @@ -0,0 +1,60 @@ +from numpy.typing import NDArray +import numpy as np +from typing import Annotated as EventArray, Dict + +"""Potentially unsorted or scalar data that can be transformed into `SpikeTimes`""" +SpikeTimesLike = EventArray[NDArray[np.float_], "SpikeTimesLike ..."] + +"""Sorted array of absolute spike times""" +SpikeTimes = EventArray[NDArray[np.float_], "SpikeTimes T ..."] + +# spike train encodings (RLE, delta encoding, variable time binning etc.) + +"""Binary data that can be cast to the `BinarySparseSpikeTrain` format""" +BinarySparseSpikeTrainLike = EventArray[ + NDArray, "BinarySparseSpikeTrainLike ..." +] + +"""Binary spike train representation for a given temporal resolution""" +BinarySparseSpikeTrain = EventArray[ + NDArray[np.int8], "BinarySparseSpikeTrain t_bin ..." +] + + +def _inspect(type_) -> Dict: + annotation = type_.__metadata__[0] + name, *dims = annotation.split(" ") + + return { + "annotation": annotation, + "name": name, + "dims": dims, + "dtype": type_.__origin__.__args__[1].__args__[0], + } + + +def _cast(a, a_type, r_type): # -> r_type + a_t, r_t = _inspect(a_type), _inspect(r_type) + if a_t["name"].replace("Like", "") != r_t["name"]: + raise ValueError( + f"Expected miv_simulator.typing.{r_t['name']}Like but found {a_t['name']}" + ) + v = np.array(a, dtype=r_t["dtype"]) + if len(v.shape) == 0: + return np.reshape( + v, + [ + 1, + ], + ) + return v + + +def cast_spike_times(a: SpikeTimesLike) -> SpikeTimes: + return np.sort(_cast(a, SpikeTimesLike, SpikeTimes), axis=0) + + +def cast_binary_sparse_spike_train( + a: BinarySparseSpikeTrainLike, +) -> BinarySparseSpikeTrain: + return _cast(a, BinarySparseSpikeTrainLike, BinarySparseSpikeTrain) diff --git a/tests/test_coding.py b/tests/test_coding.py new file mode 100644 index 0000000..044e51d --- /dev/null +++ b/tests/test_coding.py @@ -0,0 +1,40 @@ +from miv_simulator import coding as t +import numpy as np +import miv_simulator.typing as st + + +def test_coding_spike_times_vs_binary_sparse_spike_train(): + for a, b in [ + ([0.1, 0.3, 0.4, 0.85], [1, 1]), + ([0.8], [0, 1]), + ]: + result = t.spike_times_2_binary_sparse_spike_train(a, 0.5) + expected = np.array(b, dtype=np.int8) + assert np.array_equal(result, expected) + + for a, b in [ + ([1, 0, 1], [0.0, 1.0]), + ([0, 1], [0.5]), + ]: + spike_train = np.array(a, dtype=np.int8) + result = t.binary_sparse_spike_train_2_spike_times(spike_train, 0.5) + expected = np.array(b) + assert np.array_equal(result, expected) + + +def test_coding_adjust_temporal_resolution(): + spike_train = np.array([0, 1, 0, 1, 0], dtype=np.int8) + + # identity + adjusted = t.adjust_temporal_resolution(spike_train, 1, 1) + assert np.array_equal(adjusted, spike_train) + + # up + adjusted = t.adjust_temporal_resolution(spike_train, 0.5, 1) + expected = np.array([0, 0, 1, 1, 0, 0, 1, 1, 0, 0], dtype=np.int8) + assert np.array_equal(adjusted, expected) + + # down + adjusted = t.adjust_temporal_resolution(spike_train, 2, 1) + expected = np.array([1, 1], dtype=np.int8) + assert np.array_equal(adjusted, expected) diff --git a/tests/test_typing.py b/tests/test_typing.py new file mode 100644 index 0000000..694205d --- /dev/null +++ b/tests/test_typing.py @@ -0,0 +1,10 @@ +from miv_simulator import typing as t +import numpy as np + + +def test_typing_cast(): + assert t.cast_spike_times(0.5).shape == (1,) + assert t.cast_spike_times([0.5, 0.1])[1] == 0.5 + assert t.cast_spike_times(int(1))[0] == float(1.0) + + assert t.cast_binary_sparse_spike_train(0.1)[0] == 0 From bf968b193b1340e7bbe8563d73501da4657cd2ef Mon Sep 17 00:00:00 2001 From: Frithjof Gressmann Date: Mon, 18 Sep 2023 13:25:52 -0500 Subject: [PATCH 04/10] Fix doc string annottions --- src/miv_simulator/config.py | 16 ++++++++-------- src/miv_simulator/spike_encoder.py | 22 +++++++++++++--------- src/miv_simulator/typing.py | 12 +++++++----- src/miv_simulator/utils/utils.py | 12 ------------ 4 files changed, 28 insertions(+), 34 deletions(-) diff --git a/src/miv_simulator/config.py b/src/miv_simulator/config.py index ea48363..d1e9bbc 100644 --- a/src/miv_simulator/config.py +++ b/src/miv_simulator/config.py @@ -141,16 +141,20 @@ def _cast(v) -> int: ParametricCoordinate = Tuple[U_Coordinate, V_Coordinate, L_Coordinate] Rotation = Tuple[float, float, float] -"""One of the layers defined in the LayersDef enum.""" + LayerName = str -"""For a given neuron kind, this defines the distribution (i.e. numbers) of neurons accross the different layers.""" +"""One of the layers defined in the LayersDef enum.""" + CellDistribution = Dict[LayerName, int] -"""Describes a volume extent""" +"""For a given neuron kind, this defines the distribution (i.e. numbers) of neurons accross the different layers.""" + LayerExtents = Dict[LayerName, List[ParametricCoordinate]] -"""Describes constraints on the distribution of neurons in a given layer.""" +"""Describes a volume extent""" + CellConstraints = Optional[ Dict[PopulationName, Dict[LayerName, Tuple[float, float]]] ] +"""Describes constraints on the distribution of neurons in a given layer.""" # Pydantic data models @@ -262,7 +266,3 @@ def probabilities_sum_to_one(x): CellDistributions = Dict[PopulationName, CellDistribution] - - -def path(*append) -> str: - return os.path.join(os.path.dirname(__file__), *append) diff --git a/src/miv_simulator/spike_encoder.py b/src/miv_simulator/spike_encoder.py index 62054a2..736b9f0 100644 --- a/src/miv_simulator/spike_encoder.py +++ b/src/miv_simulator/spike_encoder.py @@ -20,7 +20,7 @@ def __init__( ) self.time_window = time_window self.ndim = 1 - + def encode(self, signal: ndarray) -> ndarray: assert ( len(signal.shape) == 2 @@ -28,13 +28,15 @@ def encode(self, signal: ndarray) -> ndarray: nsamples = signal.shape[0] ndim = signal.shape[1] - assert ndim == self.ndim, f"input signal has dimension {ndim} but encoder has input dimension {self.ndim}" - + assert ( + ndim == self.ndim + ), f"input signal has dimension {ndim} but encoder has input dimension {self.ndim}" + freq = np.interp( - signal, - [self.min_input, self.max_input], - [self.min_output, self.max_output], - ) + signal, + [self.min_input, self.max_input], + [self.min_output, self.max_output], + ) nz = np.argwhere(freq > 0) period = np.zeros(nsamples) period[nz] = (1 / freq[nz]) * 1000 # ms @@ -68,7 +70,7 @@ def __init__( generator = np.random self.generator = generator self.ndim = 1 - + def encode(self, signal: ndarray) -> ndarray: assert ( len(signal.shape) == 2 @@ -76,7 +78,9 @@ def encode(self, signal: ndarray) -> ndarray: nsamples = signal.shape[0] ndim = signal.shape[1] - assert ndim == self.ndim, f"input signal has dimension {ndim} but encoder has input dimension {self.ndim}" + assert ( + ndim == self.ndim + ), f"input signal has dimension {ndim} but encoder has input dimension {self.ndim}" spike_train = [] freq = np.interp( diff --git a/src/miv_simulator/typing.py b/src/miv_simulator/typing.py index bf78c17..a989186 100644 --- a/src/miv_simulator/typing.py +++ b/src/miv_simulator/typing.py @@ -2,23 +2,25 @@ import numpy as np from typing import Annotated as EventArray, Dict -"""Potentially unsorted or scalar data that can be transformed into `SpikeTimes`""" SpikeTimesLike = EventArray[NDArray[np.float_], "SpikeTimesLike ..."] +"""Potentially unsorted or scalar data that can be transformed into `SpikeTimes`""" -"""Sorted array of absolute spike times""" SpikeTimes = EventArray[NDArray[np.float_], "SpikeTimes T ..."] +"""Sorted array of absolute spike times""" -# spike train encodings (RLE, delta encoding, variable time binning etc.) -"""Binary data that can be cast to the `BinarySparseSpikeTrain` format""" +# Spike train encodings (RLE, delta encoding, variable time binning etc.) + BinarySparseSpikeTrainLike = EventArray[ NDArray, "BinarySparseSpikeTrainLike ..." ] +"""Binary data that can be cast to the `BinarySparseSpikeTrain` format""" + -"""Binary spike train representation for a given temporal resolution""" BinarySparseSpikeTrain = EventArray[ NDArray[np.int8], "BinarySparseSpikeTrain t_bin ..." ] +"""Binary spike train representation for a given temporal resolution""" def _inspect(type_) -> Dict: diff --git a/src/miv_simulator/utils/utils.py b/src/miv_simulator/utils/utils.py index 5084ef8..11e881a 100644 --- a/src/miv_simulator/utils/utils.py +++ b/src/miv_simulator/utils/utils.py @@ -27,7 +27,6 @@ import click import numpy as np import yaml -from miv_simulator.config import path as config_path from mpi4py import MPI from numpy import float64, uint32 from scipy import signal, sparse @@ -316,19 +315,8 @@ def include(self, node: ScalarNode) -> Dict[str, Any]: with open(filename) as f: return yaml.load(f, IncludeLoader) - def include_default(self, node: ScalarNode) -> Dict[str, Any]: - """ - - :param node: - :return: - """ - filename = os.path.join(config_path(), self.construct_scalar(node)) - with open(filename) as f: - return yaml.load(f, IncludeLoader) - IncludeLoader.add_constructor("!include", IncludeLoader.include) -IncludeLoader.add_constructor("!include_default", IncludeLoader.include_default) class ExplicitDumper(yaml.SafeDumper): From c076b47aa4864662b6ae5913d3835fb78b616067 Mon Sep 17 00:00:00 2001 From: Frithjof Gressmann Date: Tue, 19 Sep 2023 13:57:20 -0500 Subject: [PATCH 05/10] Merge coding and typing module --- src/miv_simulator/coding.py | 80 ++++++++++++++++++++++++++++++++----- src/miv_simulator/typing.py | 62 ---------------------------- tests/test_coding.py | 9 ++++- tests/test_typing.py | 10 ----- 4 files changed, 78 insertions(+), 83 deletions(-) delete mode 100644 tests/test_typing.py diff --git a/src/miv_simulator/coding.py b/src/miv_simulator/coding.py index 3ebbc01..7e53062 100644 --- a/src/miv_simulator/coding.py +++ b/src/miv_simulator/coding.py @@ -1,11 +1,71 @@ -from miv_simulator import typing as st import numpy as np +from numpy.typing import NDArray +from typing import Annotated as EventArray, Dict + +SpikeTimesLike = EventArray[NDArray[np.float_], "SpikeTimesLike ..."] +"""Potentially unsorted or scalar data that can be transformed into `SpikeTimes`""" + +SpikeTimes = EventArray[NDArray[np.float_], "SpikeTimes T ..."] +"""Sorted array of absolute spike times""" + + +# Spike train encodings (RLE, delta encoding, variable time binning etc.) + +BinarySparseSpikeTrainLike = EventArray[ + NDArray, "BinarySparseSpikeTrainLike ..." +] +"""Binary data that can be cast to the `BinarySparseSpikeTrain` format""" + + +BinarySparseSpikeTrain = EventArray[ + NDArray[np.int8], "BinarySparseSpikeTrain t_bin ..." +] +"""Binary spike train representation for a given temporal resolution""" + + +def _inspect(type_) -> Dict: + annotation = type_.__metadata__[0] + name, *dims = annotation.split(" ") + + return { + "annotation": annotation, + "name": name, + "dims": dims, + "dtype": type_.__origin__.__args__[1].__args__[0], + } + + +def _cast(a, a_type, r_type): # -> r_type + a_t, r_t = _inspect(a_type), _inspect(r_type) + if a_t["name"].replace("Like", "") != r_t["name"]: + raise ValueError( + f"Expected miv_simulator.typing.{r_t['name']}Like but found {a_t['name']}" + ) + v = np.array(a, dtype=r_t["dtype"]) + if len(v.shape) == 0: + return np.reshape( + v, + [ + 1, + ], + ) + return v + + +def cast_spike_times(a: SpikeTimesLike) -> SpikeTimes: + return np.sort(_cast(a, SpikeTimesLike, SpikeTimes), axis=0) + + +def cast_binary_sparse_spike_train( + a: BinarySparseSpikeTrainLike, +) -> BinarySparseSpikeTrain: + return _cast(a, BinarySparseSpikeTrainLike, BinarySparseSpikeTrain) def spike_times_2_binary_sparse_spike_train( - array: st.SpikeTimesLike, temporal_resolution: float -) -> st.BinarySparseSpikeTrain: - a = st.cast_spike_times(array) + array: SpikeTimesLike, temporal_resolution: float +) -> BinarySparseSpikeTrain: + a = cast_spike_times(array) bins = np.floor(a / temporal_resolution).astype(int) # since a is sorted, maximum is last value spike_train = np.zeros(bins[-1] + 1, dtype=np.int8) @@ -14,20 +74,20 @@ def spike_times_2_binary_sparse_spike_train( def binary_sparse_spike_train_2_spike_times( - array: st.BinarySparseSpikeTrainLike, temporal_resolution: float -) -> st.SpikeTimes: - a = st.cast_binary_sparse_spike_train(array) + array: BinarySparseSpikeTrainLike, temporal_resolution: float +) -> SpikeTimes: + a = cast_binary_sparse_spike_train(array) spike_indices = np.where(a == 1)[0] spike_times = spike_indices * temporal_resolution return spike_times def adjust_temporal_resolution( - array: st.BinarySparseSpikeTrainLike, + array: BinarySparseSpikeTrainLike, original_resolution: float, target_resolution: float, -) -> st.BinarySparseSpikeTrain: - a = st.cast_binary_sparse_spike_train(array) +) -> BinarySparseSpikeTrain: + a = cast_binary_sparse_spike_train(array) ratio = target_resolution / original_resolution if ratio == 1: diff --git a/src/miv_simulator/typing.py b/src/miv_simulator/typing.py index a989186..e69de29 100644 --- a/src/miv_simulator/typing.py +++ b/src/miv_simulator/typing.py @@ -1,62 +0,0 @@ -from numpy.typing import NDArray -import numpy as np -from typing import Annotated as EventArray, Dict - -SpikeTimesLike = EventArray[NDArray[np.float_], "SpikeTimesLike ..."] -"""Potentially unsorted or scalar data that can be transformed into `SpikeTimes`""" - -SpikeTimes = EventArray[NDArray[np.float_], "SpikeTimes T ..."] -"""Sorted array of absolute spike times""" - - -# Spike train encodings (RLE, delta encoding, variable time binning etc.) - -BinarySparseSpikeTrainLike = EventArray[ - NDArray, "BinarySparseSpikeTrainLike ..." -] -"""Binary data that can be cast to the `BinarySparseSpikeTrain` format""" - - -BinarySparseSpikeTrain = EventArray[ - NDArray[np.int8], "BinarySparseSpikeTrain t_bin ..." -] -"""Binary spike train representation for a given temporal resolution""" - - -def _inspect(type_) -> Dict: - annotation = type_.__metadata__[0] - name, *dims = annotation.split(" ") - - return { - "annotation": annotation, - "name": name, - "dims": dims, - "dtype": type_.__origin__.__args__[1].__args__[0], - } - - -def _cast(a, a_type, r_type): # -> r_type - a_t, r_t = _inspect(a_type), _inspect(r_type) - if a_t["name"].replace("Like", "") != r_t["name"]: - raise ValueError( - f"Expected miv_simulator.typing.{r_t['name']}Like but found {a_t['name']}" - ) - v = np.array(a, dtype=r_t["dtype"]) - if len(v.shape) == 0: - return np.reshape( - v, - [ - 1, - ], - ) - return v - - -def cast_spike_times(a: SpikeTimesLike) -> SpikeTimes: - return np.sort(_cast(a, SpikeTimesLike, SpikeTimes), axis=0) - - -def cast_binary_sparse_spike_train( - a: BinarySparseSpikeTrainLike, -) -> BinarySparseSpikeTrain: - return _cast(a, BinarySparseSpikeTrainLike, BinarySparseSpikeTrain) diff --git a/tests/test_coding.py b/tests/test_coding.py index 044e51d..a08ef62 100644 --- a/tests/test_coding.py +++ b/tests/test_coding.py @@ -1,6 +1,5 @@ from miv_simulator import coding as t import numpy as np -import miv_simulator.typing as st def test_coding_spike_times_vs_binary_sparse_spike_train(): @@ -38,3 +37,11 @@ def test_coding_adjust_temporal_resolution(): adjusted = t.adjust_temporal_resolution(spike_train, 2, 1) expected = np.array([1, 1], dtype=np.int8) assert np.array_equal(adjusted, expected) + + +def test_coding_typing_cast(): + assert t.cast_spike_times(0.5).shape == (1,) + assert t.cast_spike_times([0.5, 0.1])[1] == 0.5 + assert t.cast_spike_times(int(1))[0] == float(1.0) + + assert t.cast_binary_sparse_spike_train(0.1)[0] == 0 diff --git a/tests/test_typing.py b/tests/test_typing.py deleted file mode 100644 index 694205d..0000000 --- a/tests/test_typing.py +++ /dev/null @@ -1,10 +0,0 @@ -from miv_simulator import typing as t -import numpy as np - - -def test_typing_cast(): - assert t.cast_spike_times(0.5).shape == (1,) - assert t.cast_spike_times([0.5, 0.1])[1] == 0.5 - assert t.cast_spike_times(int(1))[0] == float(1.0) - - assert t.cast_binary_sparse_spike_train(0.1)[0] == 0 From c5e99bb7159a8fc4d342ffcc6557857345c3105e Mon Sep 17 00:00:00 2001 From: Frithjof Gressmann Date: Thu, 21 Sep 2023 22:44:26 -0500 Subject: [PATCH 06/10] Preserve legacy stimulus code path --- src/miv_simulator/stimulus.py | 567 +++++++++++++++++++++++++++++++--- src/miv_simulator/typing.py | 0 2 files changed, 528 insertions(+), 39 deletions(-) delete mode 100644 src/miv_simulator/typing.py diff --git a/src/miv_simulator/stimulus.py b/src/miv_simulator/stimulus.py index 601b53b..c8831a0 100644 --- a/src/miv_simulator/stimulus.py +++ b/src/miv_simulator/stimulus.py @@ -3,13 +3,8 @@ from collections import namedtuple import numpy as np -from miv_simulator.utils import AbstractEnv, Struct, get_module_logger -from miv_simulator.spike_encoder import ( - RateEncoder, - PoissonRateEncoder, - RankOrderEncoder, -) from miv_simulator.stgen import get_inhom_poisson_spike_times_by_thinning +from miv_simulator.utils import AbstractEnv, Struct, get_module_logger from mpi4py import MPI from mpi4py.MPI import Intracomm from neuroh5.io import ( @@ -29,14 +24,11 @@ ) -class InputSource: - pass - - -class ConstantInputSource(InputSource): +class ConstantInputCellConfig: def __init__( self, selectivity_type: None = None, + arena: None = None, peak_rate: None = None, local_random: None = None, selectivity_attr_dict: Optional[Dict[str, ndarray]] = None, @@ -44,6 +36,7 @@ def __init__( ) -> None: """ :param selectivity_type: int + :param arena: namedtuple :param peak_rate: float :param local_random: :class:'np.random.RandomState' :param selectivity_attr_dict: dict @@ -70,9 +63,9 @@ def __init__( if selectivity_attr_dict is not None: self.init_from_attr_dict(selectivity_attr_dict) - elif any([arg is None for arg in [selectivity_type, peak_rate]]): + elif any([arg is None for arg in [selectivity_type, peak_rate, arena]]): raise RuntimeError( - "ConstantInputSource: missing argument(s) required for object construction" + "ConstantInputCellConfig: missing argument(s) required for object construction" ) else: if local_random is None: @@ -94,9 +87,11 @@ def get_selectivity_attr_dict(self): "Peak Rate": np.array([self.peak_rate], dtype=np.float32), } - def call( + def get_rate_map( self, - signal: ndarray, + x: ndarray, + y: ndarray, + velocity: None = None, initial_phase: float = 0.0, ) -> ndarray: """ @@ -106,10 +101,20 @@ def call( :return: array """ - rate_array = np.ones_like(signal, dtype=np.float32) * self.peak_rate + if (velocity is None) and (self.phase_mod_function is not None): + raise RuntimeError( + "ConstantInputCellConfig.get_rate_map: when phase config is provided, get_rate_map must be provided with velocity" + ) + + rate_array = np.ones_like(x, dtype=np.float32) * self.peak_rate mean_rate = np.mean(rate_array) - if self.phase_mod_function is not None: - # t = TODO + if (velocity is not None) and (self.phase_mod_function is not None): + d = np.insert( + np.cumsum(np.sqrt(np.diff(x) ** 2.0 + np.diff(y) ** 2.0)), + 0, + 0.0, + ) + t = d / velocity rate_array *= self.phase_mod_function( t, initial_phase=initial_phase ) @@ -120,51 +125,53 @@ def call( return rate_array -def make_input_source( +def get_input_cell_config( selectivity_type: uint8, selectivity_type_names: Dict[int, str], population: None = None, stimulus_config: None = None, + arena: None = None, distance: None = None, local_random: None = None, selectivity_attr_dict: Optional[Dict[str, ndarray]] = None, phase_mod_config: None = None, noise_gen_dict: None = None, comm: None = None, -) -> InputSource: +) -> ConstantInputCellConfig: """ :param selectivity_type: int :param selectivity_type_names: dict: {int: str} :param population: str :param stimulus_config: dict + :param arena: namedtuple :param distance: float; u arc distance normalized to reference layer :param local_random: :class:'np.random.RandomState' :param selectivity_attr_dict: dict :param phase_mod_config: dict; oscillatory phase modulation configuration - :return: instance of one of various InputSource classes + :return: instance of one of various InputCell classes """ if selectivity_type not in selectivity_type_names: raise RuntimeError( - "make_input_source: enumerated selectivity type: %i not recognized" + "get_input_cell_config: enumerated selectivity type: %i not recognized" % selectivity_type ) selectivity_type_name = selectivity_type_names[selectivity_type] if selectivity_attr_dict is not None: if selectivity_type_name == "constant": - input_source = ConstantInputSource( + input_cell_config = ConstantInputCellConfig( selectivity_attr_dict=selectivity_attr_dict, phase_mod_config=phase_mod_config, ) else: RuntimeError( - "make_input_source: selectivity type %s is not supported" + "get_input_cell_config: selectivity type %s is not supported" % selectivity_type_name ) - elif any([arg is None for arg in [population, stimulus_config]]): + elif any([arg is None for arg in [population, stimulus_config, arena]]): raise RuntimeError( - f"make_input_source: missing argument(s) required to construct {selectivity_type_name} cell config object: population: {population} stimulus_config: {stimulus_config}" + f"get_input_cell_config: missing argument(s) required to construct {selectivity_type_name} cell config object: population: {population} arena: {arena} stimulus_config: {stimulus_config}" ) else: if ( @@ -172,23 +179,24 @@ def make_input_source( or selectivity_type not in stimulus_config["Peak Rate"][population] ): raise RuntimeError( - "make_input_source: peak rate not specified for population: %s, selectivity type: " + "get_input_cell_config: peak rate not specified for population: %s, selectivity type: " "%s" % (population, selectivity_type_name) ) peak_rate = stimulus_config["Peak Rate"][population][selectivity_type] if selectivity_type_name == "constant": - input_source = ConstantInputSource( + input_cell_config = ConstantInputCellConfig( selectivity_type=selectivity_type, + arena=arena, peak_rate=peak_rate, phase_mod_config=phase_mod_config, ) else: RuntimeError( - f"make_input_source: selectivity type: {selectivity_type_name} not implemented" + f"get_input_cell_config: selectivity type: {selectivity_type_name} not implemented" ) - return input_source + return input_cell_config def get_equilibration(env: AbstractEnv) -> Tuple[ndarray, int]: @@ -210,11 +218,149 @@ def get_equilibration(env: AbstractEnv) -> Tuple[ndarray, int]: return equilibrate +def get_2D_arena_bounds(arena, margin=0.0, margin_fraction=None): + """ + + :param arena: namedtuple + :return: tuple of (tuple of float) + """ + + vertices_x = np.asarray( + [v[0] for v in arena.domain.vertices], dtype=np.float32 + ) + vertices_y = np.asarray( + [v[1] for v in arena.domain.vertices], dtype=np.float32 + ) + if margin_fraction is not None: + extent_x = np.abs(np.max(vertices_x) - np.min(vertices_x)) + extent_y = np.abs(np.max(vertices_y) - np.min(vertices_y)) + margin = max(margin_fraction * extent_x, margin_fraction * extent_y) + arena_x_bounds = (np.min(vertices_x) - margin, np.max(vertices_x) + margin) + arena_y_bounds = (np.min(vertices_y) - margin, np.max(vertices_y) + margin) + + return arena_x_bounds, arena_y_bounds + + +def get_2D_arena_extents(arena): + """ + + :param arena: namedtuple + :return: tuple of (tuple of float) + """ + + vertices_x = np.asarray( + [v[0] for v in arena.domain.vertices], dtype=np.float32 + ) + vertices_y = np.asarray( + [v[1] for v in arena.domain.vertices], dtype=np.float32 + ) + extent_x = np.abs(np.max(vertices_x) - np.min(vertices_x)) + extent_y = np.abs(np.max(vertices_y) - np.min(vertices_y)) + + return extent_x, extent_y + + +def get_2D_arena_spatial_mesh( + arena, spatial_resolution=5.0, margin=0.0, indexing="ij" +): + """ + + :param arena: namedtuple + :param spatial_resolution: float (cm) + :param margin: float + :return: tuple of array + """ + arena_x_bounds, arena_y_bounds = get_2D_arena_bounds( + arena=arena, margin=margin + ) + arena_x = np.arange( + arena_x_bounds[0], + arena_x_bounds[1] + spatial_resolution / 2.0, + spatial_resolution, + ) + arena_y = np.arange( + arena_y_bounds[0], + arena_y_bounds[1] + spatial_resolution / 2.0, + spatial_resolution, + ) + + return np.meshgrid(arena_x, arena_y, indexing=indexing) + + +def get_2D_arena_grid(arena, spatial_resolution=5.0, margin=0.0, indexing="ij"): + """ + + :param arena: namedtuple + :param spatial_resolution: float (cm) + :param margin: float + :return: tuple of array + """ + arena_x_bounds, arena_y_bounds = get_2D_arena_bounds( + arena=arena, margin=margin + ) + arena_x = np.arange( + arena_x_bounds[0], + arena_x_bounds[1] + spatial_resolution / 2.0, + spatial_resolution, + ) + arena_y = np.arange( + arena_y_bounds[0], + arena_y_bounds[1] + spatial_resolution / 2.0, + spatial_resolution, + ) + + return arena_x, arena_y + + +def generate_linear_trajectory( + trajectory, temporal_resolution=1.0, equilibration_duration=None +): + """ + Construct coordinate arrays for a spatial trajectory, considering run velocity to interpolate at the specified + temporal resolution. Optionally, the trajectory can be prepended with extra distance traveled for a specified + network equilibration time, with the intention that the user discards spikes generated during this period before + analysis. + :param trajectory: namedtuple + :param temporal_resolution: float (ms) + :param equilibration_duration: float (ms) + :return: tuple of array + """ + velocity = trajectory.velocity # (cm / s) + spatial_resolution = velocity / 1000.0 * temporal_resolution + x = trajectory.path[:, 0] + y = trajectory.path[:, 1] + + if equilibration_duration is not None: + equilibration_distance = velocity / 1000.0 * equilibration_duration + x = np.insert(x, 0, x[0] - equilibration_distance) + y = np.insert(y, 0, y[0]) + else: + equilibration_duration = 0.0 + equilibration_distance = 0.0 + + segment_lengths = np.sqrt(np.diff(x) ** 2.0 + np.diff(y) ** 2.0) + distance = np.insert(np.cumsum(segment_lengths), 0, 0.0) + + interp_distance = np.arange( + distance.min(), + distance.max() + spatial_resolution / 2.0, + spatial_resolution, + ) + interp_x = np.interp(interp_distance, distance, x) + interp_y = np.interp(interp_distance, distance, y) + t = interp_distance / (velocity / 1000.0) # ms + + t = np.subtract(t, equilibration_duration) + interp_distance -= equilibration_distance + + return t, interp_x, interp_y, interp_distance + + def generate_input_spike_trains( env: AbstractEnv, population: str, selectivity_type_names: Dict[int, str], - signal: ndarray, + trajectory: Tuple[ndarray, ndarray, ndarray, ndarray], gid: int, selectivity_attr_dict: Dict[str, ndarray], spike_train_attr_name: str = "Spike Train", @@ -222,7 +368,7 @@ def generate_input_spike_trains( spike_hist_resolution: int = 1000, equilibrate: Optional[Tuple[ndarray, int]] = None, phase_mod_config: None = None, - initial_phases: ndarray = None, + initial_phases: None = None, spike_hist_sum: None = None, return_selectivity_features: bool = True, n_trials: int = 1, @@ -248,8 +394,10 @@ def generate_input_spike_trains( if time_range[0] is None: time_range[0] = 0.0 - t, d = trajectory + t, x, y, d = trajectory + abs_d = d - d[0] abs_t = (t - t[0]) / 1000.0 + velocity = np.insert(abs_d[1:] / abs_t[1:], 0, abs_d[1] / abs_t[1]) equilibration_duration = float( env.stimulus_config["Equilibration Duration"] @@ -275,7 +423,7 @@ def generate_input_spike_trains( if selectivity_type_name is None: selectivity_type_name = this_selectivity_type_name - input_source = make_input_source( + input_cell_config = get_input_cell_config( selectivity_type=this_selectivity_type, selectivity_type_names=selectivity_type_names, selectivity_attr_dict=selectivity_attr_dict, @@ -295,8 +443,10 @@ def generate_input_spike_trains( for i in range(n_trials): if initial_phases is not None: initial_phase = initial_phases[i] - rate_map = input_source.get_rate_map( - d=d, + rate_map = input_cell_config.get_rate_map( + x=x, + y=y, + velocity=velocity if phase_mod_config is not None else None, initial_phase=initial_phase, ) if (selectivity_type_name != "constant") and (equilibrate is not None): @@ -375,11 +525,15 @@ def choose_input_selectivity_type(p, local_random): def generate_input_features( env, population, + arena, + arena_x, + arena_y, gid, norm_distances, selectivity_type_names, selectivity_type_namespaces, noise_gen_dict=None, + rate_map_sum=None, debug=False, ): """ @@ -388,10 +542,11 @@ def generate_input_features( argument selectivity_type_namespaces. The set of selectivity attributes is determined by procedure get_selectivity_attr_dict in the respective input cell configuration class - (e.g. ConstantInputSource). + (e.g. ConstantInputCellConfig). :param env :param population: str + :param arena: str :param gid: int :param distances: (float, float) :param selectivity_type_names: @@ -416,11 +571,12 @@ def generate_input_features( local_random=local_random, ) - input_source = make_input_source( + input_cell_config = get_input_cell_config( population=population, selectivity_type=this_selectivity_type, selectivity_type_names=selectivity_type_names, stimulus_config=env.stimulus_config, + arena=arena, distance=norm_u_arc_distance, local_random=local_random, noise_gen_dict=noise_gen_dict, @@ -428,7 +584,8 @@ def generate_input_features( ) this_selectivity_type_name = selectivity_type_names[this_selectivity_type] - selectivity_attr_dict = input_source.get_selectivity_attr_dict() + selectivity_attr_dict = input_cell_config.get_selectivity_attr_dict() + rate_map = input_cell_config.get_rate_map(x=arena_x, y=arena_y) if debug and rank == 0: callback, context = debug @@ -436,6 +593,9 @@ def generate_input_features( this_context.update(dict(locals())) callback(this_context) + if rate_map_sum is not None: + rate_map_sum[this_selectivity_type_name] += rate_map + return this_selectivity_type_name, selectivity_attr_dict @@ -468,6 +628,258 @@ def read_stimulus(stimulus_path, stimulus_namespace, population, module=None): return ratemap_lst +def read_feature(feature_path, feature_namespace, population): + feature_lst = [] + + attr_gen = read_cell_attributes( + feature_path, population, namespace=feature_namespace + ) + for gid, feature_dict in attr_gen: + if "Module ID" in feature_dict: + gid_module = feature_dict["Module ID"][0] + else: + gid_module = None + rate = feature_dict["Arena Rate Map"] + feature_lst.append((gid, rate, gid_module)) + + return feature_lst + + +def bin_stimulus_features(features, t, bin_size, time_range): + """ + Continuous stimulus feature binning. + + Parameters + ---------- + features: matrix of size "number of times each feature was recorded" x "number of features" + t: a vector of size "number of times each feature was recorded" + bin_size: size of time bins + time_range: the start and end times for binning the stimulus + + + Returns + ------- + matrix of size "number of time bins" x "number of features in the output" + the average value of each output feature in every time bin + """ + + t_start, t_end = time_range + + edges = np.arange(t_start, t_end, bin_size) + nbins = edges.shape[0] - 1 + nfeatures = features.shape[1] + binned_features = np.empty([nbins, nfeatures]) + for i in range(nbins): + for j in range(nfeatures): + delta = edges[i + 1] - edges[i] + bin_range = np.arange(edges[i], edges[i + 1], delta / 5.0) + ip_vals = np.interp(bin_range, t, features[:, j]) + binned_features[i, j] = np.mean(ip_vals) + + return binned_features + + +def rate_maps_from_features( + env, + population, + cell_index_set, + input_features_path=None, + input_features_namespace=None, + input_features_dict=None, + arena_id=None, + trajectory_id=None, + time_range=None, + include_time=False, + phase_mod_config=None, +): + """Initializes presynaptic spike sources from a file with input selectivity features represented as firing rates.""" + + if input_features_dict is not None: + if (input_features_path is not None) or ( + input_features_namespace is not None + ): + raise RuntimeError( + "rate_maps_from_features: when input_features_dict is provided, input_features_path must be None" + ) + else: + if (input_features_path is None) or (input_features_namespace is None): + raise RuntimeError( + "rate_maps_from_features: either input_features_dict has to be provided, or input_features_path and input_features_namespace" + ) + + if time_range is not None: + if time_range[0] is None: + time_range[0] = 0.0 + + if arena_id is None: + arena_id = env.arena_id + if trajectory_id is None: + trajectory_id = env.trajectory_id + + spatial_resolution = float(env.stimulus_config["Spatial Resolution"]) + temporal_resolution = float(env.stimulus_config["Temporal Resolution"]) + + input_features_attr_names = [ + "Selectivity Type", + "Num Fields", + "Field Width", + "Peak Rate", + "Module ID", + "Grid Spacing", + "Grid Orientation", + "Field Width Concentration Factor", + "X Offset", + "Y Offset", + ] + + selectivity_type_names = {i: n for n, i in env.selectivity_types.items()} + + arena = env.stimulus_config["Arena"][arena_id] + + trajectory = arena.trajectories[trajectory_id] + equilibration_duration = float( + env.stimulus_config.get("Equilibration Duration", 0.0) + ) + + t, x, y, d = generate_linear_trajectory( + trajectory, + temporal_resolution=temporal_resolution, + equilibration_duration=equilibration_duration, + ) + if time_range is not None: + t_range_inds = np.where((t < time_range[1]) & (t >= time_range[0]))[0] + t = t[t_range_inds] + x = x[t_range_inds] + y = y[t_range_inds] + d = d[t_range_inds] + + input_rate_map_dict = {} + + if len(d) == 0: + return input_rate_map_dict + + abs_d = d - d[0] + abs_t = (t - t[0]) / 1000.0 + velocity = np.insert(abs_d[1:] / abs_t[1:], 0, abs_d[1] / abs_t[1]) + + pop_index = int(env.Populations[population]) + + if input_features_path is not None: + this_input_features_namespace = f"{input_features_namespace} {arena_id}" + input_features_iter = scatter_read_cell_attribute_selection( + input_features_path, + population, + selection=cell_index_set, + namespace=this_input_features_namespace, + mask=set(input_features_attr_names), + comm=env.comm, + io_size=env.io_size, + ) + else: + input_features_iter = input_features_dict.items() + + for gid, selectivity_attr_dict in input_features_iter: + this_selectivity_type = selectivity_attr_dict["Selectivity Type"][0] + this_selectivity_type_name = selectivity_type_names[ + this_selectivity_type + ] + + input_cell_config = get_input_cell_config( + selectivity_type=this_selectivity_type, + selectivity_type_names=selectivity_type_names, + selectivity_attr_dict=selectivity_attr_dict, + phase_mod_config=phase_mod_config, + ) + rate_map = input_cell_config.get_rate_map( + x=x, + y=y, + velocity=velocity if phase_mod_config is not None else None, + ) + rate_map[np.isclose(rate_map, 0.0, atol=1e-3, rtol=1e-3)] = 0.0 + + if include_time: + input_rate_map_dict[gid] = (t, rate_map) + else: + input_rate_map_dict[gid] = rate_map + + return input_rate_map_dict + + +def arena_rate_maps_from_features( + env, + population, + input_features_path, + input_features_namespace, + cell_index_set, + arena_id=None, + time_range=None, + n_trials=1, +): + """Initializes presynaptic spike sources from a file with input selectivity features represented as firing rates.""" + + if time_range is not None: + if time_range[0] is None: + time_range[0] = 0.0 + + if arena_id is None: + arena_id = env.arena_id + + spatial_resolution = float(env.stimulus_config["Spatial Resolution"]) + temporal_resolution = float(env.stimulus_config["Temporal Resolution"]) + + this_input_features_namespace = f"{input_features_namespace} {arena_id}" + + input_features_attr_names = [ + "Selectivity Type", + "Num Fields", + "Field Width", + "Peak Rate", + "Module ID", + "Grid Spacing", + "Grid Orientation", + "Field Width Concentration Factor", + "X Offset", + "Y Offset", + ] + + selectivity_type_names = {i: n for n, i in env.selectivity_types.items()} + + arena = env.stimulus_config["Arena"][arena_id] + arena_x, arena_y = get_2D_arena_spatial_mesh( + arena=arena, spatial_resolution=spatial_resolution + ) + + input_rate_map_dict = {} + pop_index = int(env.Populations[population]) + + input_features_iter = scatter_read_cell_attribute_selection( + input_features_path, + population, + selection=cell_index_set, + namespace=this_input_features_namespace, + mask=set(input_features_attr_names), + comm=env.comm, + io_size=env.io_size, + ) + for gid, selectivity_attr_dict in input_features_iter: + this_selectivity_type = selectivity_attr_dict["Selectivity Type"][0] + this_selectivity_type_name = selectivity_type_names[ + this_selectivity_type + ] + input_cell_config = get_input_cell_config( + population=population, + selectivity_type=this_selectivity_type, + selectivity_type_names=selectivity_type_names, + selectivity_attr_dict=selectivity_attr_dict, + ) + if input_cell_config.num_fields > 0: + rate_map = input_cell_config.get_rate_map(x=arena_x, y=arena_y) + rate_map[np.isclose(rate_map, 0.0, atol=1e-3, rtol=1e-3)] = 0.0 + input_rate_map_dict[gid] = rate_map + + return input_rate_map_dict + + def oscillation_phase_mod_config( env, population, soma_positions, local_random=None ): @@ -589,3 +1001,80 @@ def stationary_phase_mod( mod = s * mod_depth / 2.0 + (1.0 - mod_depth) return mod + + +def spatial_phase_mod( + x, + velocity, + field_width, + phase_range, + phase_entry, + phase_offset, + mod_depth, + freq, +): + """Non-stationary phase modulation for spatial receptive fields. + Calculates modulation according to the equation: + + s = cos(r*x/field_width + 2*pi*freq*x/velocity - phase_entry - phase_offset + r/2.) + 1 + mod = s*mod_depth/2. + (1. - mod_depth) + + - position: spatial position + - velocity: movement velocity + - field_width: receptive field + - phase_range: range of preferred phase + - phase_entry: receptive field entry phase + - mod_depth: modulation depth + - freq: frequency of global oscillation + """ + r = np.deg2rad(phase_range[1] - phase_range[0]) + s = ( + np.cos( + r * x / field_width + + 2 * np.pi * freq * x / velocity + - np.deg2rad(phase_entry + phase_offset) + + r / 2.0 + ) + + 1 + ) + d = np.clip(mod_depth, 0.0, 1.0) + m = s * mod_depth / 2.0 + (1.0 - mod_depth) + + return m + + +def spatial2d_phase_mod( + x, + y, + velocity, + field_width, + phase_range, + phase_entry, + phase_offset, + mod_depth, + freq, +): + x_mod = spatial_phase_mod( + x, + velocity, + field_width, + phase_range, + phase_entry, + phase_offset, + mod_depth, + freq, + ) + y_mod = spatial_phase_mod( + y, + velocity, + field_width, + phase_range, + phase_entry, + phase_offset, + mod_depth, + freq, + ) + + m = (x_mod + y_mod) / 2.0 + + return m diff --git a/src/miv_simulator/typing.py b/src/miv_simulator/typing.py deleted file mode 100644 index e69de29..0000000 From 37bc20807bb0478412d43a7c3c2c64d367fec9b2 Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Mon, 25 Sep 2023 17:13:15 -0700 Subject: [PATCH 07/10] WIP: generator interface for spike encoders --- src/miv_simulator/spike_encoder.py | 43 +++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/src/miv_simulator/spike_encoder.py b/src/miv_simulator/spike_encoder.py index 736b9f0..2a3ecbf 100644 --- a/src/miv_simulator/spike_encoder.py +++ b/src/miv_simulator/spike_encoder.py @@ -1,6 +1,47 @@ import numpy as np from numpy import ndarray -from typing import Tuple, Optional +from typing import Tuple, Optional, Iterable, Iterator + +def rate_generator( + signal: Union[ndarray, Iterable[ndarray]], + time_window: int = 100, + dt: float = 0.02, + **kwargs, +) -> Iterator[ndarray]: + """ + Lazily invokes ``RateEncoder`` to iteratively encode a sequence of + data. + + :param data: NDarray of shape ``[n_samples, n_1, ..., n_k]``. + :param time_window: Length of Poisson spike train per input variable. + :param dt: Spike generator time step. + :return: NDarray of shape ``[time, n_1, ..., n_k]`` of rate-encoded spikes. + """ + encoder = RateEncoder(time_window=time_window, dt=dt) + for chunk in signal: + yield encoder.encode(chunk) + + +def poisson_rate_generator( + signal: Union[ndarray, Iterable[ndarray]], + time_window: int = 100, + dt: float = 0.02, + **kwargs, +) -> Iterator[ndarray]: + """ + Lazily invokes ``PoissonEncoder`` to iteratively encode a sequence of + data. + + :param data: NDarray of shape ``[n_samples, n_1, ..., n_k]``. + :param time_window: Length of Poisson spike train per input variable. + :param dt: Spike generator time step. + :return: NDarray of shape ``[time, n_1, ..., n_k]`` of Poisson-distributed spikes. + """ + encoder = PoissonRateEncoder(time_window=time_window, dt=dt) + for chunk in signal: + yield encoder.encode(chunk) + + class RateEncoder: From 7de2731185f4ddef4f1c271263b5752d304ab0d4 Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Fri, 29 Sep 2023 10:16:08 -0700 Subject: [PATCH 08/10] formatting fixes --- src/miv_simulator/spike_encoder.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/miv_simulator/spike_encoder.py b/src/miv_simulator/spike_encoder.py index 2a3ecbf..0293787 100644 --- a/src/miv_simulator/spike_encoder.py +++ b/src/miv_simulator/spike_encoder.py @@ -2,6 +2,7 @@ from numpy import ndarray from typing import Tuple, Optional, Iterable, Iterator + def rate_generator( signal: Union[ndarray, Iterable[ndarray]], time_window: int = 100, @@ -42,8 +43,6 @@ def poisson_rate_generator( yield encoder.encode(chunk) - - class RateEncoder: def __init__( self, From 6f9a3a08065cb4bded33b9837b4fda44142a0f90 Mon Sep 17 00:00:00 2001 From: Ivan Raikov Date: Fri, 6 Oct 2023 16:17:39 -0700 Subject: [PATCH 09/10] further updates to spike encoder --- src/miv_simulator/spike_encoder.py | 108 ++++++++++++++++++++++++----- 1 file changed, 91 insertions(+), 17 deletions(-) diff --git a/src/miv_simulator/spike_encoder.py b/src/miv_simulator/spike_encoder.py index 2a3ecbf..b97a10a 100644 --- a/src/miv_simulator/spike_encoder.py +++ b/src/miv_simulator/spike_encoder.py @@ -2,8 +2,10 @@ from numpy import ndarray from typing import Tuple, Optional, Iterable, Iterator + def rate_generator( signal: Union[ndarray, Iterable[ndarray]], + t_start: float = 0.0, time_window: int = 100, dt: float = 0.02, **kwargs, @@ -17,13 +19,17 @@ def rate_generator( :param dt: Spike generator time step. :return: NDarray of shape ``[time, n_1, ..., n_k]`` of rate-encoded spikes. """ - encoder = RateEncoder(time_window=time_window, dt=dt) + t_start_ = t_start + encoder = RateEncoder(time_window=time_window, dt=dt, **kwargs) for chunk in signal: - yield encoder.encode(chunk) + output, t_next = encoder.encode(chunk, t_start=t_start_) + yield output + t_start_ = t_next def poisson_rate_generator( signal: Union[ndarray, Iterable[ndarray]], + t_start: float = 0.0, time_window: int = 100, dt: float = 0.02, **kwargs, @@ -37,16 +43,18 @@ def poisson_rate_generator( :param dt: Spike generator time step. :return: NDarray of shape ``[time, n_1, ..., n_k]`` of Poisson-distributed spikes. """ - encoder = PoissonRateEncoder(time_window=time_window, dt=dt) + t_start_ = t_start + encoder = PoissonRateEncoder(time_window=time_window, dt=dt, **kwargs) for chunk in signal: - yield encoder.encode(chunk) - - + output, t_next = encoder.encode(chunk, t_start=t_start_) + yield output + t_start_ = t_next class RateEncoder: def __init__( self, + time_window: int = 100, dt: float = 0.02, input_range: Tuple[int, int] = (0, 255), output_freq_range: Tuple[int, int] = (0, 200), @@ -61,8 +69,14 @@ def __init__( ) self.time_window = time_window self.ndim = 1 + self.spike_array = None - def encode(self, signal: ndarray) -> ndarray: + def encode( + self, + signal: ndarray, + return_spike_array: bool = False, + t_start: Optional[float] = None, + ) -> ndarray: assert ( len(signal.shape) == 2 ), "encode requires input signal of shape number_samples x input_dimensions" @@ -81,13 +95,45 @@ def encode(self, signal: ndarray) -> ndarray: nz = np.argwhere(freq > 0) period = np.zeros(nsamples) period[nz] = (1 / freq[nz]) * 1000 # ms - spikes = np.zeros((nsamples, self.time_window)) + if ( + (self.spike_array is None) + or (self.spike_array.shape[0] != nsamples) + or (self.spike_array.shape[1] != ndim) + ): + self.spike_array = np.zeros( + (nsamples, self.time_window), dtype=bool + ) + else: + self.spike_array.fill(0) for i in range(nsamples): if period[i] > 0: stride = int(period[i]) - spikes[i, 0 : self.time_window : stride] = 1 - - return spikes + self.spike_array[i, 0 : self.time_window : stride] = 1 + + t_next = None + if t_start is not None: + t_next = t_start + self.time_window * nsamples * self.dt + + if return_spike_array: + return np.copy(self.spike_array), t_next + else: + if t_start is None: + t_start = 0.0 + spike_times = [] + spike_inds = np.argwhere[spike_array[i] == 1] + for i in range(nsamples): + this_spike_inds = spike_inds[ + np.argwhere(spike_inds[:, 0] == i).flat + ] + this_spike_times = [] + if len(this_spike_inds) > 0: + this_spike_times = ( + t_start + + np.asarray(this_spike_inds[:, 1], dtype=np.float32) + * self.dt + ) + spike_times.append(this_spike_times) + return spike_times, t_next class PoissonRateEncoder: @@ -112,7 +158,12 @@ def __init__( self.generator = generator self.ndim = 1 - def encode(self, signal: ndarray) -> ndarray: + def encode( + self, + signal: ndarray, + return_spike_array: bool = False, + t_start: Optional[float] = None, + ) -> ndarray: assert ( len(signal.shape) == 2 ), "encode requires input signal of shape number_samples x input_dimensions" @@ -130,12 +181,35 @@ def encode(self, signal: ndarray) -> ndarray: [self.min_output, self.max_output], ) - spikes = self.generator.uniform( + spike_array = self.generator.uniform( 0, 1, nsamples * self.time_window ).reshape((nsamples, self.time_window)) dt = 0.001 # second for i in range(nsamples): - spikes[i, np.where(spikes < freq[i] * dt)] = 1 - spikes[i, np.where(spikes[i] != 1)] = 0 - - return spikes + spike_array[i, np.where(spike_array < freq[i] * dt)] = 1 + spike_array[i, np.where(spike_array[i] != 1)] = 0 + + t_next = None + if t_start is not None: + t_next = t_start + self.time_window * nsamples * self.dt + + if return_spike_array: + return np.copy(self.spike_array), t_next + else: + if t_start is None: + t_start = 0.0 + spike_times = [] + spike_inds = np.argwhere[spike_array[i] == 1] + for i in range(nsamples): + this_spike_inds = spike_inds[ + np.argwhere(spike_inds[:, 0] == i).flat + ] + this_spike_times = [] + if len(this_spike_inds) > 0: + this_spike_times = ( + t_start + + np.asarray(this_spike_inds[:, 1], dtype=np.float32) + * self.dt + ) + spike_times.append(this_spike_times) + return spike_times, t_next From bb38a27389032e126343fb0508ad1064bc3d3c8e Mon Sep 17 00:00:00 2001 From: Frithjof Gressmann Date: Thu, 19 Oct 2023 15:05:40 -0500 Subject: [PATCH 10/10] Reject erroneous extra configuration --- src/miv_simulator/interface/connections.py | 4 +++- src/miv_simulator/interface/distances.py | 4 +++- src/miv_simulator/interface/h5_types.py | 4 +++- src/miv_simulator/interface/network_architecture.py | 4 +++- src/miv_simulator/interface/synapse_forest.py | 4 +++- src/miv_simulator/interface/synapses.py | 4 +++- 6 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/miv_simulator/interface/connections.py b/src/miv_simulator/interface/connections.py index 0048c47..17eb47f 100644 --- a/src/miv_simulator/interface/connections.py +++ b/src/miv_simulator/interface/connections.py @@ -3,7 +3,7 @@ import logging from machinable import Component -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict from miv_simulator import config, simulator from typing import Optional, Dict from miv_simulator.utils import from_yaml @@ -12,6 +12,8 @@ class Connections(Component): class Config(BaseModel): + model_config = ConfigDict(extra="forbid") + filepath: str = Field("???") forest_filepath: str = Field("???") axon_extents: config.AxonExtents = Field("???") diff --git a/src/miv_simulator/interface/distances.py b/src/miv_simulator/interface/distances.py index 7fda203..9e491fe 100644 --- a/src/miv_simulator/interface/distances.py +++ b/src/miv_simulator/interface/distances.py @@ -1,7 +1,7 @@ from typing import Optional, Tuple import logging -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict from machinable import Component from machinable.config import Field @@ -11,6 +11,8 @@ class MeasureDistances(Component): class Config(BaseModel): + model_config = ConfigDict(extra="forbid") + filepath: str = Field("???") cell_distributions: config.CellDistributions = Field("???") layer_extents: config.LayerExtents = Field("???") diff --git a/src/miv_simulator/interface/h5_types.py b/src/miv_simulator/interface/h5_types.py index 345f894..711e5ab 100644 --- a/src/miv_simulator/interface/h5_types.py +++ b/src/miv_simulator/interface/h5_types.py @@ -1,5 +1,5 @@ from machinable import Component -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict from miv_simulator import config from mpi4py import MPI from miv_simulator.utils import io as io_utils, from_yaml @@ -8,6 +8,8 @@ class H5Types(Component): class Config(BaseModel): + model_config = ConfigDict(extra="forbid") + cell_distributions: config.CellDistributions = Field("???") synapses: config.Synapses = Field("???") diff --git a/src/miv_simulator/interface/network_architecture.py b/src/miv_simulator/interface/network_architecture.py index 64980b5..5394fc1 100644 --- a/src/miv_simulator/interface/network_architecture.py +++ b/src/miv_simulator/interface/network_architecture.py @@ -8,13 +8,15 @@ from miv_simulator import config, simulator from miv_simulator.utils import io as io_utils, from_yaml from mpi4py import MPI -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict class NetworkArchitecture(Component): """Creates the network architecture by generating the soma coordinates within specified layer geometry.""" class Config(BaseModel): + model_config = ConfigDict(extra="forbid") + filepath: str = Field("???") cell_distributions: config.CellDistributions = Field("???") synapses: config.Synapses = Field("???") diff --git a/src/miv_simulator/interface/synapse_forest.py b/src/miv_simulator/interface/synapse_forest.py index a540d49..623f810 100644 --- a/src/miv_simulator/interface/synapse_forest.py +++ b/src/miv_simulator/interface/synapse_forest.py @@ -1,10 +1,12 @@ from machinable import Component from miv_simulator import config, simulator -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict class GenerateSynapseForest(Component): class Config(BaseModel): + model_config = ConfigDict(extra="forbid") + filepath: str = Field("???") population: config.PopulationName = Field("???") morphology: config.SWCFilePath = Field("???") diff --git a/src/miv_simulator/interface/synapses.py b/src/miv_simulator/interface/synapses.py index cb11f1d..c11c7fa 100644 --- a/src/miv_simulator/interface/synapses.py +++ b/src/miv_simulator/interface/synapses.py @@ -3,7 +3,7 @@ from machinable import Component from miv_simulator import config from miv_simulator import simulator -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict from typing import Optional, Dict from miv_simulator.utils import from_yaml from mpi4py import MPI @@ -11,6 +11,8 @@ class Synapses(Component): class Config(BaseModel): + model_config = ConfigDict(extra="forbid") + forest_filepath: str = Field("???") cell_types: config.CellTypes = Field("???") population: str = Field("???")