From af2b24d3c78c82d0a41ba10534e9be867dd947c7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 14 Jun 2024 13:36:15 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- eye2bids/edf2bids.py | 161 ++++++++++++++++++++++++++--------------- tests/test_edf2bids.py | 44 +++++++---- 2 files changed, 132 insertions(+), 73 deletions(-) diff --git a/eye2bids/edf2bids.py b/eye2bids/edf2bids.py index c66be9f..c781043 100644 --- a/eye2bids/edf2bids.py +++ b/eye2bids/edf2bids.py @@ -4,6 +4,7 @@ import gzip import json +import re import subprocess from pathlib import Path @@ -15,7 +16,6 @@ from eye2bids._parser import global_parser from eye2bids.logger import eye2bids_logger -import re e2b_log = eye2bids_logger() @@ -323,77 +323,109 @@ def _load_asc_file_as_reduced_df(events_asc_file: str | Path) -> pd.DataFrame: df_ms = _load_asc_file_as_df(events_asc_file) return pd.DataFrame(df_ms.iloc[0:, 2:]) + def _df_events_after_start(events: list) -> pd.DataFrame: - start_index = next(i for i, line in enumerate(events) if re.match(r'START\s+.*', line)) - end_index = next(i for i in range(len(events) - 1, -1, -1) if re.match(r'END\s+.*', events[i])) + start_index = next( + i for i, line in enumerate(events) if re.match(r"START\s+.*", line) + ) + end_index = next( + i for i in range(len(events) - 1, -1, -1) if re.match(r"END\s+.*", events[i]) + ) if end_index > start_index: - data_lines = events[start_index + 1:end_index] - return pd.DataFrame([line.strip().split('\t') for line in data_lines]) + data_lines = events[start_index + 1 : end_index] + return pd.DataFrame([line.strip().split("\t") for line in data_lines]) else: return print("No 'END' found after the selected 'START'.") -def _df_physioevents(events_after_start: pd.DataFrame) -> pd.DataFrame: - events_after_start['Event_Letters'] = events_after_start[0].str.extractall(r"([A-Za-z]+)").groupby(level=0).agg(''.join) - events_after_start['Event_Numbers'] = events_after_start[0].str.extract(r'(\d+)') - events_after_start[['msg_timestamp', 'message']] = events_after_start[1].str.split(n=1, expand=True) - events_after_start['message'] = events_after_start['message'].astype(str) - - msg_mask = events_after_start['Event_Letters'] == 'MSG' - events_after_start.loc[msg_mask, 'Event_Numbers'] = events_after_start.loc[msg_mask, 'msg_timestamp'] - physioevents_reordered = (pd.concat([ - events_after_start['Event_Numbers'], - events_after_start[2], events_after_start['Event_Letters'], - events_after_start['message'] - ], axis=1, ignore_index=True) - .replace({None: np.nan, 'None': np.nan}) - .rename(columns={0: 'timestamp',1: 'duration', 2: 'trial_type', 3: 'message'}) + +def _df_physioevents(events_after_start: pd.DataFrame) -> pd.DataFrame: + events_after_start["Event_Letters"] = ( + events_after_start[0].str.extractall(r"([A-Za-z]+)").groupby(level=0).agg("".join) + ) + events_after_start["Event_Numbers"] = events_after_start[0].str.extract(r"(\d+)") + events_after_start[["msg_timestamp", "message"]] = events_after_start[1].str.split( + n=1, expand=True + ) + events_after_start["message"] = events_after_start["message"].astype(str) + + msg_mask = events_after_start["Event_Letters"] == "MSG" + events_after_start.loc[msg_mask, "Event_Numbers"] = events_after_start.loc[ + msg_mask, "msg_timestamp" + ] + physioevents_reordered = ( + pd.concat( + [ + events_after_start["Event_Numbers"], + events_after_start[2], + events_after_start["Event_Letters"], + events_after_start["message"], + ], + axis=1, + ignore_index=True, + ) + .replace({None: np.nan, "None": np.nan}) + .rename(columns={0: "timestamp", 1: "duration", 2: "trial_type", 3: "message"}) ) return physioevents_reordered -def _physioevents_eye1 (physioevents_reordered: pd.DataFrame) -> pd.DataFrame: + +def _physioevents_eye1(physioevents_reordered: pd.DataFrame) -> pd.DataFrame: physioevents_eye1_list = ["MSG", "EFIXL", "ESACCL", "EBLINKL"] - physioevents_eye1 = physioevents_reordered[physioevents_reordered['trial_type'].isin(physioevents_eye1_list)] - physioevents_eye1 = physioevents_eye1.replace({"EFIXL": "fixation", "ESACCL": "saccade", "MSG": np.nan, None: np.nan}) - - physioevents_eye1['blink'] = 0 + physioevents_eye1 = physioevents_reordered[ + physioevents_reordered["trial_type"].isin(physioevents_eye1_list) + ] + physioevents_eye1 = physioevents_eye1.replace( + {"EFIXL": "fixation", "ESACCL": "saccade", "MSG": np.nan, None: np.nan} + ) + + physioevents_eye1["blink"] = 0 last_non_na_trial_type = None for i in range(len(physioevents_eye1)): - current_trial_type = physioevents_eye1.iloc[i]['trial_type'] - if pd.notna(current_trial_type): - if current_trial_type == 'saccade' and last_non_na_trial_type == 'EBLINKL': - physioevents_eye1.iloc[i, physioevents_eye1.columns.get_loc('blink')] = 1 + current_trial_type = physioevents_eye1.iloc[i]["trial_type"] + if pd.notna(current_trial_type): + if current_trial_type == "saccade" and last_non_na_trial_type == "EBLINKL": + physioevents_eye1.iloc[i, physioevents_eye1.columns.get_loc("blink")] = 1 last_non_na_trial_type = current_trial_type - physioevents_eye1.loc[physioevents_eye1['trial_type'].isna(), 'blink'] = np.nan - physioevents_eye1['blink'] = physioevents_eye1['blink'].astype('Int64') - physioevents_eye1 = physioevents_eye1[physioevents_eye1.trial_type != 'EBLINKL'] + physioevents_eye1.loc[physioevents_eye1["trial_type"].isna(), "blink"] = np.nan + physioevents_eye1["blink"] = physioevents_eye1["blink"].astype("Int64") + physioevents_eye1 = physioevents_eye1[physioevents_eye1.trial_type != "EBLINKL"] - physioevents_eye1 = physioevents_eye1[['timestamp', 'duration', 'trial_type','blink', 'message']] + physioevents_eye1 = physioevents_eye1[ + ["timestamp", "duration", "trial_type", "blink", "message"] + ] return physioevents_eye1 -def _physioevents_eye2 (physioevents_reordered: pd.DataFrame) -> pd.DataFrame: + +def _physioevents_eye2(physioevents_reordered: pd.DataFrame) -> pd.DataFrame: physioevents_eye2_list = ["MSG", "EFIXR", "ESACCR", "EBLINKR"] - physioevents_eye2 = physioevents_reordered[physioevents_reordered['trial_type'].isin(physioevents_eye2_list)] - physioevents_eye2 = physioevents_eye2.replace({"EFIXR": "fixation", "ESACCR": "saccade", "MSG": np.nan, None: np.nan}) + physioevents_eye2 = physioevents_reordered[ + physioevents_reordered["trial_type"].isin(physioevents_eye2_list) + ] + physioevents_eye2 = physioevents_eye2.replace( + {"EFIXR": "fixation", "ESACCR": "saccade", "MSG": np.nan, None: np.nan} + ) - physioevents_eye2['blink'] = 0 + physioevents_eye2["blink"] = 0 last_non_na_trial_type = None for i in range(len(physioevents_eye2)): - current_trial_type = physioevents_eye2.iloc[i]['trial_type'] - if pd.notna(current_trial_type): - if current_trial_type == 'saccade' and last_non_na_trial_type == 'EBLINKR': - physioevents_eye2.iloc[i, physioevents_eye2.columns.get_loc('blink')] = 1 + current_trial_type = physioevents_eye2.iloc[i]["trial_type"] + if pd.notna(current_trial_type): + if current_trial_type == "saccade" and last_non_na_trial_type == "EBLINKR": + physioevents_eye2.iloc[i, physioevents_eye2.columns.get_loc("blink")] = 1 last_non_na_trial_type = current_trial_type - physioevents_eye2.loc[physioevents_eye2['trial_type'].isna(), 'blink'] = np.nan - physioevents_eye2['blink'] = physioevents_eye2['blink'].astype('Int64') - physioevents_eye2 = physioevents_eye2[physioevents_eye2.trial_type != 'EBLINKR'] + physioevents_eye2.loc[physioevents_eye2["trial_type"].isna(), "blink"] = np.nan + physioevents_eye2["blink"] = physioevents_eye2["blink"].astype("Int64") + physioevents_eye2 = physioevents_eye2[physioevents_eye2.trial_type != "EBLINKR"] - physioevents_eye2 = physioevents_eye2[['timestamp', 'duration', 'trial_type','blink', 'message']] + physioevents_eye2 = physioevents_eye2[ + ["timestamp", "duration", "trial_type", "blink", "message"] + ] return physioevents_eye2 @@ -487,12 +519,12 @@ def edf2bids( } else: metadata_eye1 = { - "AverageCalibrationError": (_extract_AverageCalibrationError(df_ms)[0::2]), - "MaximalCalibrationError": (_extract_MaximalCalibrationError(df_ms)[0::2]), - "RecordedEye": (_extract_RecordedEye(df_ms_reduced)) - } - - json_eye1 = dict(base_json, **metadata_eye1) + "AverageCalibrationError": (_extract_AverageCalibrationError(df_ms)[0::2]), + "MaximalCalibrationError": (_extract_MaximalCalibrationError(df_ms)[0::2]), + "RecordedEye": (_extract_RecordedEye(df_ms_reduced)), + } + + json_eye1 = dict(base_json, **metadata_eye1) if _2eyesmode(df_ms_reduced) == True: json_eye2 = dict(base_json, **metadata_eye2) @@ -547,8 +579,8 @@ def edf2bids( output_dir=output_dir, input_file=input_file, suffix="_recording-eye1_physioevents", - extension="json" - ) + extension="json", + ) with open(output_filename_eye1, "w") as outfile: json.dump(events_json, outfile, indent=4) @@ -630,15 +662,19 @@ def edf2bids( suffix="_recording-eye1_physioevents", extension="tsv.gz", ) - if _extract_RecordedEye(df_ms_reduced) == 'Left': - content = physioevents_eye1.to_csv(sep="\t", index=False, na_rep="n/a", header=None) - elif _extract_RecordedEye(df_ms_reduced) == 'Right': - content = physioevents_eye2.to_csv(sep="\t", index=False, na_rep="n/a", header=None) + if _extract_RecordedEye(df_ms_reduced) == "Left": + content = physioevents_eye1.to_csv( + sep="\t", index=False, na_rep="n/a", header=None + ) + elif _extract_RecordedEye(df_ms_reduced) == "Right": + content = physioevents_eye2.to_csv( + sep="\t", index=False, na_rep="n/a", header=None + ) with gzip.open(output_eventsfilename_eye1, "wb") as f: f.write(content.encode()) e2b_log.info(f"file generated: {output_eventsfilename_eye1}") - + else: output_eventsfilename_eye1 = generate_output_filename( output_dir=output_dir, @@ -646,7 +682,9 @@ def edf2bids( suffix="_recording-eye1_physioevents", extension="tsv.gz", ) - content = physioevents_eye1.to_csv(sep="\t", index=False, na_rep="n/a", header=None) + content = physioevents_eye1.to_csv( + sep="\t", index=False, na_rep="n/a", header=None + ) with gzip.open(output_eventsfilename_eye1, "wb") as f: f.write(content.encode()) @@ -658,12 +696,15 @@ def edf2bids( suffix="_recording-eye2_physioevents", extension="tsv.gz", ) - content = physioevents_eye2.to_csv(sep="\t", index=False, na_rep="n/a", header=None) + content = physioevents_eye2.to_csv( + sep="\t", index=False, na_rep="n/a", header=None + ) with gzip.open(output_eventsfilename_eye2, "wb") as f: f.write(content.encode()) e2b_log.info(f"file generated: {output_eventsfilename_eye2}") + def generate_output_filename( output_dir: Path, input_file: Path, suffix: str, extension: str ) -> Path: diff --git a/tests/test_edf2bids.py b/tests/test_edf2bids.py index 81e6660..0b9f50e 100644 --- a/tests/test_edf2bids.py +++ b/tests/test_edf2bids.py @@ -3,13 +3,15 @@ import json from pathlib import Path -import pandas as pd import numpy as np +import pandas as pd import pytest from eye2bids.edf2bids import ( _check_edf2asc_present, _convert_edf_to_asc_events, + _df_events_after_start, + _df_physioevents, _extract_AverageCalibrationError, _extract_CalibrationPosition, _extract_CalibrationType, @@ -25,8 +27,6 @@ _load_asc_file, _load_asc_file_as_df, _load_asc_file_as_reduced_df, - _df_events_after_start, - _df_physioevents, _physioevents_eye1, _physioevents_eye2, edf2bids, @@ -75,9 +75,12 @@ def test_edf_end_to_end(metadata_file, eyelink_test_data_dir): expected_data_tsv = output_dir / f"{input_file.stem}_recording-eye1_physio.tsv.gz" assert expected_data_tsv.exists() - expected_events_tsv = output_dir / f"{input_file.stem}_recording-eye1_physioevents.tsv.gz" + expected_events_tsv = ( + output_dir / f"{input_file.stem}_recording-eye1_physioevents.tsv.gz" + ) assert expected_events_tsv.exists() + @pytest.mark.skipif(not _check_edf2asc_present(), reason="edf2asc missing") @pytest.mark.parametrize("metadata_file", [data_dir() / "metadata.yml", None]) def test_edf_end_to_end_2eyes(metadata_file, eyelink_test_data_dir): @@ -101,17 +104,23 @@ def test_edf_end_to_end_2eyes(metadata_file, eyelink_test_data_dir): events = json.load(f) assert events["StimulusPresentation"]["ScreenResolution"] == [1919, 1079] - expected_data_sidecar_eye1 = output_dir / f"{input_file.stem}_recording-eye1_physio.json" + expected_data_sidecar_eye1 = ( + output_dir / f"{input_file.stem}_recording-eye1_physio.json" + ) assert expected_data_sidecar_eye1.exists() with open(expected_data_sidecar_eye1) as f: eyetrack = json.load(f) assert eyetrack["SamplingFrequency"] == 1000 assert eyetrack["RecordedEye"] == "Left" - expected_data_tsv_eye1 = output_dir / f"{input_file.stem}_recording-eye1_physio.tsv.gz" + expected_data_tsv_eye1 = ( + output_dir / f"{input_file.stem}_recording-eye1_physio.tsv.gz" + ) assert expected_data_tsv_eye1.exists() - expected_events_tsv_eye1 = output_dir / f"{input_file.stem}_recording-eye1_physioevents.tsv.gz" + expected_events_tsv_eye1 = ( + output_dir / f"{input_file.stem}_recording-eye1_physioevents.tsv.gz" + ) assert expected_events_tsv_eye1.exists() expected_events_sidecar_eye2 = ( @@ -119,16 +128,22 @@ def test_edf_end_to_end_2eyes(metadata_file, eyelink_test_data_dir): ) assert expected_events_sidecar_eye2.exists() - expected_data_sidecar_eye2 = output_dir / f"{input_file.stem}_recording-eye2_physio.json" + expected_data_sidecar_eye2 = ( + output_dir / f"{input_file.stem}_recording-eye2_physio.json" + ) assert expected_data_sidecar_eye2.exists() with open(expected_data_sidecar_eye2) as f: eyetrack = json.load(f) assert eyetrack["RecordedEye"] == "Right" - expected_data_tsv_eye2 = output_dir / f"{input_file.stem}_recording-eye2_physio.tsv.gz" + expected_data_tsv_eye2 = ( + output_dir / f"{input_file.stem}_recording-eye2_physio.tsv.gz" + ) assert expected_data_tsv_eye2.exists() - expected_events_tsv_eye2 = output_dir / f"{input_file.stem}_recording-eye2_physioevents.tsv.gz" + expected_events_tsv_eye2 = ( + output_dir / f"{input_file.stem}_recording-eye2_physioevents.tsv.gz" + ) assert expected_events_tsv_eye2.exists() @@ -625,11 +640,14 @@ def test_number_columns_physioevents_tsv(eyelink_test_data_dir): output_dir=output_dir, ) - expected_physioevents_tsv = output_dir / f"{input_file.stem}_recording-eye2_physioevents.tsv.gz" + expected_physioevents_tsv = ( + output_dir / f"{input_file.stem}_recording-eye2_physioevents.tsv.gz" + ) df = pd.read_csv(expected_physioevents_tsv, sep="\t") number_columns = len(df.columns) assert number_columns == 5 + @pytest.mark.parametrize( "folder, expected", [ @@ -639,7 +657,7 @@ def test_number_columns_physioevents_tsv(eyelink_test_data_dir): ("rest", [9199380, np.nan, np.nan, np.nan, "RECORD_START"]), ("satf", [3098683, 20, "saccade", 1, np.nan]), ("vergence", [819655, np.nan, np.nan, np.nan, "!MODE RECORD CR 1000 2 1 LR"]), - ("2eyes",[767985, 169, "fixation", 0, np.nan]) + ("2eyes", [767985, 169, "fixation", 0, np.nan]), ], ) def test_physioevents_value(folder, expected, eyelink_test_data_dir): @@ -649,4 +667,4 @@ def test_physioevents_value(folder, expected, eyelink_test_data_dir): events_after_start = _df_events_after_start(events) physioevents_reordered = _df_physioevents(events_after_start) physioevents_eye1 = _physioevents_eye1(physioevents_reordered) - assert physioevents_eye1.iloc[0].tolist() == expected \ No newline at end of file + assert physioevents_eye1.iloc[0].tolist() == expected