diff --git a/phys2bids/io.py b/phys2bids/io.py index 37c62355f..d4a5e3903 100644 --- a/phys2bids/io.py +++ b/phys2bids/io.py @@ -538,3 +538,88 @@ def load_gep(filename): timeseries = [t_ch, trigger] timeseries.extend(data) return BlueprintInput(timeseries, freq, names, units, 1) + + +def load_smr(filename, chtrig=0): + """Load Spike2 smr file and populate object phys_input. + + Parameters + ---------- + filename: str + Path to the spike smr or smrx file. + + chtrig : int + Index of trigger channel. + + Returns + ------- + BlueprintInput + + Note + ---- + Index of chtrig is 1-index (i.e. spike2 channel number). + + See Also + -------- + physio_obj.BlueprintInput + """ + import sonpy + + # taken from sonpy demo + read_data = { + sonpy.lib.DataType.Adc: sonpy.lib.SonFile.ReadInts, + sonpy.lib.DataType.EventFall: sonpy.lib.SonFile.ReadEvents, + sonpy.lib.DataType.EventRise: sonpy.lib.SonFile.ReadEvents, + sonpy.lib.DataType.EventBoth: sonpy.lib.SonFile.ReadEvents, + sonpy.lib.DataType.Marker: sonpy.lib.SonFile.ReadMarkers, + sonpy.lib.DataType.AdcMark: sonpy.lib.SonFile.ReadWaveMarks, + sonpy.lib.DataType.RealMark: sonpy.lib.SonFile.ReadRealMarks, + sonpy.lib.DataType.TextMark: sonpy.lib.SonFile.ReadTextMarks, + sonpy.lib.DataType.RealWave: sonpy.lib.SonFile.ReadFloats, + } + + smrfile = sonpy.lib.SonFile(filename, True) + time_base = smrfile.GetTimeBase() + n_channels = smrfile.MaxChannels() + freq, names, units, timeseries = [], [], [], [] + for i in range(n_channels): + current_channel = smrfile.ChannelType(i) + max_n_tick = smrfile.ChannelMaxTime(i) + if current_channel != sonpy.lib.DataType.Off and max_n_tick > 0: + max_n_tick = smrfile.ChannelMaxTime(i) + sample_rate = smrfile.GetIdealRate(i) + if current_channel == sonpy.lib.DataType.Adc: + divide = smrfile.ChannelDivide(i) + else: # marker channels + divide = 1 / (time_base * sample_rate) + # conversion factor from CED spike2 doc + # http://ced.co.uk/img/Spike9.pdf + gain = smrfile.GetChannelScale(i) / 6553.6 + offset = smrfile.GetChannelOffset(i) + name = smrfile.GetChannelTitle(chan=i) + unit = smrfile.GetChannelUnits(chan=i) + + n_samples = int(np.floor((max_n_tick) / divide)) + raw_signal = read_data[current_channel]( + smrfile, chan=i, nMax=n_samples, tFrom=0, tUpto=max_n_tick + ) + + signal = np.array(raw_signal) * gain + offset + + # save the data + freq.append(sample_rate) + names.append(name) + units.append(unit) + timeseries.append(signal) + + # use the channel with highest sample rate to create time stamps + idx_max = np.argmax(freq) + n_timepoints = len(timeseries[idx_max]) # end point included + time = np.arange(n_timepoints) * freq[idx_max] + + # prepend to the existing list + freq = [freq[idx_max]] + freq + timeseries = [time] + timeseries + units = ["s"] + units + names = ["time"] + names + return BlueprintInput(timeseries, freq, names, units, chtrig) diff --git a/phys2bids/tests/conftest.py b/phys2bids/tests/conftest.py index dcf4d564d..69160fde6 100644 --- a/phys2bids/tests/conftest.py +++ b/phys2bids/tests/conftest.py @@ -138,3 +138,13 @@ def ge_badfiles(testpath): tmp = fetch_file("tdmyn", testpath, "PPGData_epiRT_columnscsv_00_00_000") tmp = fetch_file("b6skq", testpath, "PPGData_epiRT_columnstsv_00_00_000") return fetch_file("8235b", testpath, "PPGData_epiRT_string0000_00_00_000") + + +@pytest.fixture +def spike2_smrx_file(testpath): + return fetch_file("7x5qw", testpath, "Test_ppg_pulse_spike2.smrx") + + +@pytest.fixture +def spike2_smr_file(testpath): + return fetch_file("zdpfr", testpath, "Test_ppg_pulse_spike2.smr") diff --git a/phys2bids/tests/test_io.py b/phys2bids/tests/test_io.py index 3302f1c35..ffed64546 100644 --- a/phys2bids/tests/test_io.py +++ b/phys2bids/tests/test_io.py @@ -1,5 +1,6 @@ import math import os +import sys import numpy as np import pytest @@ -214,3 +215,37 @@ def test_load_gep_two_files_resp(ge_two_gep_files_resp, testpath): gep_data2 = np.loadtxt(os.path.join(testpath, "PPGData_epiRT_0000000000_00_00_000.gep")) assert np.array_equal(gep_data1, phys_obj.timeseries[2]) assert np.array_equal(gep_data2, phys_obj.timeseries[3]) + + +@pytest.mark.skipif( + sys.version_info < (3, 7) or sys.version_info > (3, 9), + reason="Requires python between 3.7 and 3.9", +) +@pytest.mark.xfail(reason="We need to fix sonpy install") +def test_load_smr(spike2_smr_file, spike2_smrx_file): + chtrig = 5 + + # 32-bit file + phys_obj = io.load_smr(spike2_smr_file, chtrig) + assert phys_obj.ch_name[0] == "time" + assert phys_obj.freq[0] == 1000.0 + assert phys_obj.units[0] == "s" + + # checks that the scanner strigger is in the right channel + # the marker channels are stored as binary + assert phys_obj.ch_name[chtrig] == "Scan Vol" + assert phys_obj.freq[chtrig] == 200.0 + assert phys_obj.units[chtrig] == "" + assert len(phys_obj.timeseries[chtrig]) == 60 + + # 64-bit file should have the same + phys_obj = io.load_smr(spike2_smrx_file, chtrig) + assert phys_obj.ch_name[0] == "time" + assert phys_obj.freq[0] == 1000.0 + assert phys_obj.units[0] == "s" + # checks that the scanner trigger is in the right channel + # the marker channels are stored as binary + assert phys_obj.ch_name[chtrig] == "Scan Vol" + assert phys_obj.freq[chtrig] == 200.0 + assert phys_obj.units[chtrig] == "" + assert len(phys_obj.timeseries[chtrig]) == 60 diff --git a/setup.cfg b/setup.cfg index 2bec2730a..f9208504e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,6 +36,8 @@ packages = find: include_package_data = True [options.extras_require] +spike2 = + sonpy >=1.7.5;python_version=='3.7.*,3.8.*,3.9.*' acq = bioread >=1.0.5 mat= @@ -55,6 +57,7 @@ style = interfaces = %(acq)s %(mat)s + %(spike2)s test = pytest >=5.3 pytest-cov