Skip to content

Commit

Permalink
Merge pull request #410 from htwangtw/read-spike
Browse files Browse the repository at this point in the history
Add support for Spike2 files
  • Loading branch information
smoia authored Apr 27, 2023
2 parents d2a54ff + fea0e0d commit 6e96e9f
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 0 deletions.
85 changes: 85 additions & 0 deletions phys2bids/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,88 @@ def load_gep(filename):
timeseries = [t_ch, trigger]
timeseries.extend(data)
return BlueprintInput(timeseries, freq, names, units, 1)


def load_smr(filename, chtrig=0):
"""Load Spike2 smr file and populate object phys_input.
Parameters
----------
filename: str
Path to the spike smr or smrx file.
chtrig : int
Index of trigger channel.
Returns
-------
BlueprintInput
Note
----
Index of chtrig is 1-index (i.e. spike2 channel number).
See Also
--------
physio_obj.BlueprintInput
"""
import sonpy

# taken from sonpy demo
read_data = {
sonpy.lib.DataType.Adc: sonpy.lib.SonFile.ReadInts,
sonpy.lib.DataType.EventFall: sonpy.lib.SonFile.ReadEvents,
sonpy.lib.DataType.EventRise: sonpy.lib.SonFile.ReadEvents,
sonpy.lib.DataType.EventBoth: sonpy.lib.SonFile.ReadEvents,
sonpy.lib.DataType.Marker: sonpy.lib.SonFile.ReadMarkers,
sonpy.lib.DataType.AdcMark: sonpy.lib.SonFile.ReadWaveMarks,
sonpy.lib.DataType.RealMark: sonpy.lib.SonFile.ReadRealMarks,
sonpy.lib.DataType.TextMark: sonpy.lib.SonFile.ReadTextMarks,
sonpy.lib.DataType.RealWave: sonpy.lib.SonFile.ReadFloats,
}

smrfile = sonpy.lib.SonFile(filename, True)
time_base = smrfile.GetTimeBase()
n_channels = smrfile.MaxChannels()
freq, names, units, timeseries = [], [], [], []
for i in range(n_channels):
current_channel = smrfile.ChannelType(i)
max_n_tick = smrfile.ChannelMaxTime(i)
if current_channel != sonpy.lib.DataType.Off and max_n_tick > 0:
max_n_tick = smrfile.ChannelMaxTime(i)
sample_rate = smrfile.GetIdealRate(i)
if current_channel == sonpy.lib.DataType.Adc:
divide = smrfile.ChannelDivide(i)
else: # marker channels
divide = 1 / (time_base * sample_rate)
# conversion factor from CED spike2 doc
# http://ced.co.uk/img/Spike9.pdf
gain = smrfile.GetChannelScale(i) / 6553.6
offset = smrfile.GetChannelOffset(i)
name = smrfile.GetChannelTitle(chan=i)
unit = smrfile.GetChannelUnits(chan=i)

n_samples = int(np.floor((max_n_tick) / divide))
raw_signal = read_data[current_channel](
smrfile, chan=i, nMax=n_samples, tFrom=0, tUpto=max_n_tick
)

signal = np.array(raw_signal) * gain + offset

# save the data
freq.append(sample_rate)
names.append(name)
units.append(unit)
timeseries.append(signal)

# use the channel with highest sample rate to create time stamps
idx_max = np.argmax(freq)
n_timepoints = len(timeseries[idx_max]) # end point included
time = np.arange(n_timepoints) * freq[idx_max]

# prepend to the existing list
freq = [freq[idx_max]] + freq
timeseries = [time] + timeseries
units = ["s"] + units
names = ["time"] + names
return BlueprintInput(timeseries, freq, names, units, chtrig)
10 changes: 10 additions & 0 deletions phys2bids/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,13 @@ def ge_badfiles(testpath):
tmp = fetch_file("tdmyn", testpath, "PPGData_epiRT_columnscsv_00_00_000")
tmp = fetch_file("b6skq", testpath, "PPGData_epiRT_columnstsv_00_00_000")
return fetch_file("8235b", testpath, "PPGData_epiRT_string0000_00_00_000")


@pytest.fixture
def spike2_smrx_file(testpath):
return fetch_file("7x5qw", testpath, "Test_ppg_pulse_spike2.smrx")


@pytest.fixture
def spike2_smr_file(testpath):
return fetch_file("zdpfr", testpath, "Test_ppg_pulse_spike2.smr")
35 changes: 35 additions & 0 deletions phys2bids/tests/test_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math
import os
import sys

import numpy as np
import pytest
Expand Down Expand Up @@ -214,3 +215,37 @@ def test_load_gep_two_files_resp(ge_two_gep_files_resp, testpath):
gep_data2 = np.loadtxt(os.path.join(testpath, "PPGData_epiRT_0000000000_00_00_000.gep"))
assert np.array_equal(gep_data1, phys_obj.timeseries[2])
assert np.array_equal(gep_data2, phys_obj.timeseries[3])


@pytest.mark.skipif(
sys.version_info < (3, 7) or sys.version_info > (3, 9),
reason="Requires python between 3.7 and 3.9",
)
@pytest.mark.xfail(reason="We need to fix sonpy install")
def test_load_smr(spike2_smr_file, spike2_smrx_file):
chtrig = 5

# 32-bit file
phys_obj = io.load_smr(spike2_smr_file, chtrig)
assert phys_obj.ch_name[0] == "time"
assert phys_obj.freq[0] == 1000.0
assert phys_obj.units[0] == "s"

# checks that the scanner strigger is in the right channel
# the marker channels are stored as binary
assert phys_obj.ch_name[chtrig] == "Scan Vol"
assert phys_obj.freq[chtrig] == 200.0
assert phys_obj.units[chtrig] == ""
assert len(phys_obj.timeseries[chtrig]) == 60

# 64-bit file should have the same
phys_obj = io.load_smr(spike2_smrx_file, chtrig)
assert phys_obj.ch_name[0] == "time"
assert phys_obj.freq[0] == 1000.0
assert phys_obj.units[0] == "s"
# checks that the scanner trigger is in the right channel
# the marker channels are stored as binary
assert phys_obj.ch_name[chtrig] == "Scan Vol"
assert phys_obj.freq[chtrig] == 200.0
assert phys_obj.units[chtrig] == ""
assert len(phys_obj.timeseries[chtrig]) == 60
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ packages = find:
include_package_data = True

[options.extras_require]
spike2 =
sonpy >=1.7.5;python_version=='3.7.*,3.8.*,3.9.*'
acq =
bioread >=1.0.5
mat=
Expand All @@ -55,6 +57,7 @@ style =
interfaces =
%(acq)s
%(mat)s
%(spike2)s
test =
pytest >=5.3
pytest-cov
Expand Down

0 comments on commit 6e96e9f

Please sign in to comment.