Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Spike2 files #410

Merged
merged 14 commits into from
Apr 27, 2023
85 changes: 85 additions & 0 deletions phys2bids/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,88 @@ def load_gep(filename):
timeseries = [t_ch, trigger]
timeseries.extend(data)
return BlueprintInput(timeseries, freq, names, units, 1)


def load_smr(filename, chtrig=0):
"""Load Spike2 smr file and populate object phys_input.

Parameters
----------
filename: str
Path to the spike smr or smrx file.

chtrig : int
Index of trigger channel.

Returns
-------
BlueprintInput

Note
----
Index of chtrig is 1-index (i.e. spike2 channel number).

See Also
--------
physio_obj.BlueprintInput
"""
import sonpy

# taken from sonpy demo
read_data = {
sonpy.lib.DataType.Adc: sonpy.lib.SonFile.ReadInts,
sonpy.lib.DataType.EventFall: sonpy.lib.SonFile.ReadEvents,
sonpy.lib.DataType.EventRise: sonpy.lib.SonFile.ReadEvents,
sonpy.lib.DataType.EventBoth: sonpy.lib.SonFile.ReadEvents,
sonpy.lib.DataType.Marker: sonpy.lib.SonFile.ReadMarkers,
sonpy.lib.DataType.AdcMark: sonpy.lib.SonFile.ReadWaveMarks,
sonpy.lib.DataType.RealMark: sonpy.lib.SonFile.ReadRealMarks,
sonpy.lib.DataType.TextMark: sonpy.lib.SonFile.ReadTextMarks,
sonpy.lib.DataType.RealWave: sonpy.lib.SonFile.ReadFloats,
}

smrfile = sonpy.lib.SonFile(filename, True)
time_base = smrfile.GetTimeBase()
n_channels = smrfile.MaxChannels()
freq, names, units, timeseries = [], [], [], []
for i in range(n_channels):
current_channel = smrfile.ChannelType(i)
max_n_tick = smrfile.ChannelMaxTime(i)
if current_channel != sonpy.lib.DataType.Off and max_n_tick > 0:
max_n_tick = smrfile.ChannelMaxTime(i)
sample_rate = smrfile.GetIdealRate(i)
if current_channel == sonpy.lib.DataType.Adc:
divide = smrfile.ChannelDivide(i)
else: # marker channels
divide = 1 / (time_base * sample_rate)
# conversion factor from CED spike2 doc
# http://ced.co.uk/img/Spike9.pdf
gain = smrfile.GetChannelScale(i) / 6553.6
offset = smrfile.GetChannelOffset(i)
name = smrfile.GetChannelTitle(chan=i)
unit = smrfile.GetChannelUnits(chan=i)

n_samples = int(np.floor((max_n_tick) / divide))
raw_signal = read_data[current_channel](
smrfile, chan=i, nMax=n_samples, tFrom=0, tUpto=max_n_tick
)

signal = np.array(raw_signal) * gain + offset

# save the data
freq.append(sample_rate)
names.append(name)
units.append(unit)
timeseries.append(signal)

# use the channel with highest sample rate to create time stamps
idx_max = np.argmax(freq)
n_timepoints = len(timeseries[idx_max]) # end point included
time = np.arange(n_timepoints) * freq[idx_max]

# prepend to the existing list
freq = [freq[idx_max]] + freq
timeseries = [time] + timeseries
units = ["s"] + units
names = ["time"] + names
return BlueprintInput(timeseries, freq, names, units, chtrig)
10 changes: 10 additions & 0 deletions phys2bids/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,13 @@ def ge_badfiles(testpath):
tmp = fetch_file("tdmyn", testpath, "PPGData_epiRT_columnscsv_00_00_000")
tmp = fetch_file("b6skq", testpath, "PPGData_epiRT_columnstsv_00_00_000")
return fetch_file("8235b", testpath, "PPGData_epiRT_string0000_00_00_000")


@pytest.fixture
def spike2_smrx_file(testpath):
return fetch_file("7x5qw", testpath, "Test_ppg_pulse_spike2.smrx")


@pytest.fixture
def spike2_smr_file(testpath):
return fetch_file("zdpfr", testpath, "Test_ppg_pulse_spike2.smr")
35 changes: 35 additions & 0 deletions phys2bids/tests/test_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math
import os
import sys

import numpy as np
import pytest
Expand Down Expand Up @@ -214,3 +215,37 @@ def test_load_gep_two_files_resp(ge_two_gep_files_resp, testpath):
gep_data2 = np.loadtxt(os.path.join(testpath, "PPGData_epiRT_0000000000_00_00_000.gep"))
assert np.array_equal(gep_data1, phys_obj.timeseries[2])
assert np.array_equal(gep_data2, phys_obj.timeseries[3])


@pytest.mark.skipif(
sys.version_info < (3, 7) or sys.version_info > (3, 9),
reason="Requires python between 3.7 and 3.9",
)
@pytest.mark.xfail(reason="We need to fix sonpy install")
def test_load_smr(spike2_smr_file, spike2_smrx_file):
chtrig = 5

# 32-bit file
phys_obj = io.load_smr(spike2_smr_file, chtrig)
assert phys_obj.ch_name[0] == "time"
assert phys_obj.freq[0] == 1000.0
assert phys_obj.units[0] == "s"

# checks that the scanner strigger is in the right channel
# the marker channels are stored as binary
assert phys_obj.ch_name[chtrig] == "Scan Vol"
assert phys_obj.freq[chtrig] == 200.0
assert phys_obj.units[chtrig] == ""
assert len(phys_obj.timeseries[chtrig]) == 60

# 64-bit file should have the same
phys_obj = io.load_smr(spike2_smrx_file, chtrig)
assert phys_obj.ch_name[0] == "time"
assert phys_obj.freq[0] == 1000.0
assert phys_obj.units[0] == "s"
# checks that the scanner trigger is in the right channel
# the marker channels are stored as binary
assert phys_obj.ch_name[chtrig] == "Scan Vol"
assert phys_obj.freq[chtrig] == 200.0
assert phys_obj.units[chtrig] == ""
assert len(phys_obj.timeseries[chtrig]) == 60
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ packages = find:
include_package_data = True

[options.extras_require]
spike2 =
sonpy >=1.7.5;python_version=='3.7.*,3.8.*,3.9.*'
acq =
bioread >=1.0.5
mat=
Expand All @@ -55,6 +57,7 @@ style =
interfaces =
%(acq)s
%(mat)s
%(spike2)s
test =
pytest >=5.3
pytest-cov
Expand Down