Skip to content

Commit

Permalink
Merge pull request #119 from BEL-Public/feature-add-pns-set-parser
Browse files Browse the repository at this point in the history
feat(xml_files): add PNSSet parser
  • Loading branch information
damian5710 authored Mar 25, 2024
2 parents c9df350 + 38282c3 commit bdbaece
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
# Name the job
name: Lint and Test
# Set the type of machine to run on
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04
steps:
# Check out the latest commit from the current branch
- name: Checkout Current Branch
Expand Down
1 change: 1 addition & 0 deletions examples/example_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -8112,6 +8112,7 @@
}
},
"dipoleSet": {},
"PNSSet": {},
"historyEntries": [
{
"name": "Noise_30Seconds",
Expand Down
47 changes: 47 additions & 0 deletions mffpy/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ANY KIND, either express or implied.
"""

from typing import Any, Dict
from glob import glob
import os.path as op
from zipfile import ZipFile, ZIP_STORED
Expand All @@ -30,3 +31,49 @@ def ensure_mfz():
for content_filename in glob(op.join(fname[:-3] + 'mff', '*')):
arc_filename = op.basename(content_filename)
zf.write(content_filename, arcname=arc_filename)


@pytest.fixture
def sensors() -> Dict[int, Any]:
return {
0: {
'name': 'ECG',
'number': 0,
'unit': 'uV',
'psgType': 0,
'mapping': 1,
'samplingRate': 0,
'sensorType': 'ECG',
'highpass': 0.3000000119,
'lowpass': 70,
'notch': 60,
'groupNumber': 1,
'gain': 1,
'defaultDisplayAmplitude': 7.5,
'highpassDisplay': 0.3000000119,
'lowpassDisplay': 70,
'notchDisplay': 60,
'color': [0.0000, 0.0000, 0.0000, 1.0000],
'positiveUp': 'false',
},
1: {
'name': 'EMG',
'number': 1,
'unit': 'uV',
'psgType': 0,
'mapping': 2,
'samplingRate': 0,
'sensorType': 'EMG',
'highpass': 10,
'lowpass': 100,
'notch': 60,
'groupNumber': 1,
'gain': 1,
'defaultDisplayAmplitude': 7.5,
'highpassDisplay': 10,
'lowpassDisplay': 100,
'notchDisplay': 60,
'color': [0.0000, 0.0000, 0.0000, 1.0000],
'positiveUp': 'false',
}
}
18 changes: 16 additions & 2 deletions mffpy/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def test_overwrite_mfz(tmpdir):
assert R2.startdatetime == time2


def test_writer_writes_multple_bins(tmpdir):
def test_writer_writes_multple_bins(tmpdir, sensors):
"""test that `mffpy.Writer` can write multiple binary files"""
dirname = join(str(tmpdir), 'multiple_bins.mff')
device = 'HydroCel GSN 256 1.0'
Expand All @@ -174,7 +174,7 @@ def test_writer_writes_multple_bins(tmpdir):
sampling_rate = 128
num_channels_dict = {
'EEG': 256,
'PNSData': 16
'PNSData': 2
}
data = {
dtype: np.random.randn(
Expand All @@ -192,6 +192,12 @@ def test_writer_writes_multple_bins(tmpdir):
startdatetime = datetime.strptime(
'1984-02-18T14:00:10.000000+0100', XML._time_format)
W.addxml('fileInfo', recordTime=startdatetime)
W.addxml(
'PNSSet',
name='Physio 16 set 60hz 1.0',
amp_series='400',
sensors=sensors,
)
W.add_coordinates_and_sensor_layout(device)
for b in bin_writers.values():
W.addbin(b)
Expand All @@ -212,6 +218,14 @@ def test_writer_writes_multple_bins(tmpdir):
layout = XML.from_file(layout)
assert layout.name == device

pns_set = R.directory.filepointer('pnsSet')
pns_set = XML.from_file(pns_set)
assert pns_set.name == 'Physio 16 set 60hz 1.0'
assert pns_set.amp_series == '400'
for key, val in pns_set.sensors.items():
for k, v in val.items():
assert v == sensors[key][k]


def test_write_multiple_blocks():
"""check that BinWriter correctly handles adding multiple blocks"""
Expand Down
13 changes: 13 additions & 0 deletions mffpy/tests/test_xml_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

examples_path = join(dirname(__file__), '..', '..', 'examples')
mff_path = join(examples_path, 'example_1.mff')
mffpath_3 = join(examples_path, 'example_3.mff')

"""
Here are several fixtures that parse example xml files
Expand Down Expand Up @@ -516,6 +517,18 @@ def test_dipoleSet_w_different_order(dipoleSet):
], dtype=np.float32))


def test_pnsSet(sensors):
"""test parsing of `pnsSet.xml`"""
filepath = join(mffpath_3, 'pnsSet.xml')
assert exists(filepath), f"Not found: '{filepath}'"
pns_set = XML.from_file(filepath)
assert pns_set.name == 'Physio 16 set 60hz 1.0'
assert pns_set.amp_series == '400'
for key, val in pns_set.sensors.items():
for k, v in val.items():
assert v == sensors[key][k]


@pytest.mark.parametrize("idx,expected", [
('name', 'Noise_30Seconds'),
('method', 'Segmentation'),
Expand Down
165 changes: 165 additions & 0 deletions mffpy/xml_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,171 @@ def get_serializable_content(self):
return content


class PNSSet(XML):
"""Parser for 'pnsSet.xml' file
These files have the following structure:
```
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<PNSSet xmlns="http://www.egi.com/pnsSet_mff"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<name>Physio 16 set 60hz 1.0</name>
<ampSeries>400</ampSeries>
<sensors>
<sensor>
<name>ECG</name>
<number>0</number>
<unit>uV</unit>
<psgType>0</psgType>
<mapping>1</mapping>
<samplingRate>0</samplingRate>
<sensorType>ECG</sensorType>
<highpass>0.3000000119</highpass>
<lowpass>70</lowpass>
<notch>60</notch>
<groupNumber>1</groupNumber>
<gain>1</gain>
<defaultDisplayAmplitude>7.5</defaultDisplayAmplitude>
<highpassDisplay>0.3000000119</highpassDisplay>
<lowpassDisplay>70</lowpassDisplay>
<notchDisplay>60</notchDisplay>
<color>0.0000,0.0000,0.0000,1.0000</color>
<positiveUp>false</positiveUp>
</sensor>
<sensor>
<name>EMG</name>
<number>1</number>
<unit>uV</unit>
<psgType>0</psgType>
<mapping>2</mapping>
<samplingRate>0</samplingRate>
<sensorType>EMG</sensorType>
<highpass>10</highpass>
<lowpass>100</lowpass>
<notch>60</notch>
<groupNumber>1</groupNumber>
<gain>1</gain>
<defaultDisplayAmplitude>7.5</defaultDisplayAmplitude>
<highpassDisplay>10</highpassDisplay>
<lowpassDisplay>100</lowpassDisplay>
<notchDisplay>60</notchDisplay>
<color>0.0000,0.0000,0.0000,1.0000</color>
<positiveUp>false</positiveUp>
...
```
"""

_xmlns = r'{http://www.egi.com/pnsSet_mff}'
_xmlroottag = r'PNSSet'
_default_filename = 'pnsSet.xml'
_sensor_type_reverter = {
'name': str,
'number': str,
'unit': str,
'psgType': str,
'mapping': str,
'samplingRate': str,
'sensorType': str,
'highpass': str,
'lowpass': str,
'notch': str,
'groupNumber': str,
'gain': str,
'defaultDisplayAmplitude': str,
'highpassDisplay': str,
'lowpassDisplay': str,
'notchDisplay': str,
'color': lambda color: ','.join(
["{:.4f}".format(c) for c in color]
),
'positiveUp': str
}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._sensor_type_converter = {
'name': str,
'number': int,
'unit': str,
'psgType': int,
'mapping': int,
'samplingRate': int,
'sensorType': str,
'highpass': float,
'lowpass': float,
'notch': int,
'groupNumber': int,
'gain': int,
'defaultDisplayAmplitude': float,
'highpassDisplay': float,
'lowpassDisplay': float,
'notchDisplay': int,
'color': lambda s: list(map(float, s.split(","))),
'positiveUp': str,
}

@cached_property
def sensors(self) -> Dict[int, Any]:
return dict([
self._parse_sensor(sensor)
for sensor in self.find('sensors')
])

def _parse_sensor(self, el) -> Tuple[int, Any]:
assert self.nsstrip(el.tag) == 'sensor', f"""
Unknown sensor with tag '{self.nsstrip(el.tag)}'"""
ans = {}
for e in el:
tag = self.nsstrip(e.tag)
ans[tag] = self._sensor_type_converter[tag](e.text)
return ans['number'], ans

@cached_property
def name(self) -> str:
"""return value of the name tag"""
return self.find('name').text

@cached_property
def amp_series(self) -> str:
"""return value of the ampSeries tag"""
return self.find('ampSeries').text

def get_content(self) -> Dict[str, Any]:
"""return properties of the sensor
set read from the .xml"""
return {
'name': self.name,
'ampSeries': self.amp_series,
'sensors': self.sensors
}

def get_serializable_content(self) -> Dict[str, Any]:
"""return a serializable object containing the
properties of the sensor set read from the .xml"""
return copy.deepcopy(self.get_content())

@classmethod
def content(cls, name: str, amp_series: str, # type: ignore
sensors: Dict[int, Any]) -> Dict[str, Any]:
"""return content in xml-convertible json format"""
formatted_sensors = []
for sensor in sensors.values():
formatted = {}
for k, v in sensor.items():
assert k in cls._sensor_type_reverter, "sensor property "
f"'{k}' not serializable. Needs to be on of "
"{list(cls._sensor_type_reverter.keys())}"
formatted[k] = {
TEXT: cls._sensor_type_reverter[k](v) # type: ignore
}
formatted_sensors.append({TEXT: formatted})
return {
'name': {TEXT: name},
'ampSeries': {TEXT: amp_series},
'sensors': {TEXT: {'sensor': formatted_sensors}},
}


class History(XML):
"""Parser for 'history.xml' files
Expand Down

0 comments on commit bdbaece

Please sign in to comment.