Skip to content

Commit

Permalink
Adding: Pytests for evaluation splitters, and examples for meta split…
Browse files Browse the repository at this point in the history
…ters
  • Loading branch information
brunaafl committed Aug 15, 2024
1 parent 177bf65 commit a6b5772
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 12 deletions.
132 changes: 121 additions & 11 deletions moabb/evaluations/metasplitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
class OfflineSplit(BaseCrossValidator):
"""Offline split for evaluation test data.
It can be used for further splitting of the test data based on sessions or runs as needed.
Assumes that, per session, all test trials are available for inference. It can be used when
no filtering or data alignment is needed.
Expand All @@ -31,27 +33,59 @@ class OfflineSplit(BaseCrossValidator):
Not used in this case, just so it can be initialized in the same way as
TimeSeriesSplit.
Examples
--------
>>> import numpy as np
>>> import pandas as pd
>>> from moabb.evaluations.splitters import CrossSubjectSplitter
>>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 8],[8,9],[5,4],[2,5],[1,7]])
>>> y = np.array([1, 2, 1, 2, 1, 2, 1, 2])
>>> subjects = np.array([1, 1, 2, 2, 3, 3, 4, 4])
>>> sessions = np.array([0, 0, 1, 1, 0, 0, 1, 1])
>>> metadata = pd.DataFrame(data={'subject': subjects, 'session': sessions})
>>> csubj = CrossSubjectSplitter()
>>> csubj.get_n_splits(metadata)
2
>>> for i, (train_index, test_index) in enumerate(csubj.split(X, y, metadata)):
>>> print(f"Fold {i}:")
>>> print(f" Train: index={train_index}, group={subjects[train_index]}, sessions={sessions[train_index]}")
>>> print(f" Test: index={test_index}, group={subjects[test_index]}, sessions={sessions[train_index]}")
>>> X_test, y_test, meta_test = X[test_index], y[test_index], metadata.loc[test_index]
>>> for j, test_session in enumerate(off.split(X_test, y_test, meta_test)):
>>> print(f" By session - Test: index={test_session}, group={subjects[test_session]}, sessions={sessions[test_session]}")
Fold 0:
Train: index=[2 3 4 5 6 7], group=[2 2 3 3 4 4]
Test: index=[0 1], group=[1 1]
By session - Test: index=[0, 1], group=[1 1], sessions=[0 0]
By session - Test: index=[2, 3], group=[1 1], sessions=[1 1]
Fold 1:
Train: index=[0 1 2 3], group=[1 1 1 1], sessions=[0 0 1 1]
Test: index=[4 5 6 7], group=[2 2 2 2], sessions=[0 0 1 1]
By session - Test: index=[4, 5], group=[2 2], sessions=[0 0]
By session - Test: index=[6, 7], group=[2 2], sessions=[1 1]
"""

def __init__(self, n_folds: int):
def __init__(self, n_folds = None):
self.n_folds = n_folds

def get_n_splits(self, metadata):
return metadata.groupby(["subject", "session"]).ngroups

def split(self, X, y, metadata):

subjects = metadata.subject.unique()
subjects = metadata["subject"]

for subject in subjects:
for subject in subjects.unique():
mask = subjects == subject
X_, y_, meta_ = X[mask], y[mask], metadata[mask]
sessions = meta_.session.unique()

for session in sessions:
ix_test = meta_[meta_["session"] == session].index

yield ix_test
yield list(ix_test)


class TimeSeriesSplit(BaseCrossValidator):
Expand All @@ -71,13 +105,53 @@ class TimeSeriesSplit(BaseCrossValidator):
calib_size: int
Size of calibration set, used if there is just one run.
Examples
--------
>>> import numpy as np
>>> import pandas as pd
>>> from moabb.evaluations.splitters import CrossSubjectSplitter
>>> from moabb.evaluations.metasplitters import TimeSeriesSplit
>>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 8], [8, 9], [5, 4], [2, 5], [1, 7]])
>>> y = np.array([1, 2, 1, 2, 1, 2, 1, 2])
>>> subjects = np.array([1, 1, 1, 1, 2, 2, 2, 2])
>>> sessions = np.array([0, 0, 1, 1, 0, 0, 1, 1])
>>> runs = np.array(['0', '1', '0', '1', '0', '1', '0', '1'])
>>> metadata = pd.DataFrame(data={'subject': subjects, 'session': sessions, 'run':runs})
>>> csubj = CrossSubjectSplitter()
>>> tssplit = TimeSeriesSplit()
>>> tssplit.get_n_splits(metadata)
4
>>> for i, (train_index, test_index) in enumerate(csubj.split(X, y, metadata)):
>>> print(f"Fold {i}:")
>>> print(f" Train: index={train_index}, group={subjects[train_index]}, sessions={sessions[train_index]}, runs={runs[train_index]}")
>>> print(f" Test: index={test_index}, group={subjects[test_index]}, sessions={sessions[test_index]}, runs={runs[test_index]}")
>>> X_test, y_test, meta_test = X[test_index], y[test_index], metadata.loc[test_index]
>>> for j, (test_ix, calib_ix) in enumerate(tssplit.split(X_test, y_test, meta_test)):
>>> print(f" Evaluation: index={test_ix}, group={subjects[test_ix]}, sessions={sessions[test_ix]}, runs={runs[test_ix]}")
>>> print(f" Calibration: index={calib_ix}, group={subjects[calib_ix]}, sessions={sessions[calib_ix]}, runs={runs[calib_ix]}")
Fold 0:
Train: index=[4 5 6 7], group=[2 2 2 2], sessions=[0 0 1 1], runs=['0' '1' '0' '1']
Test: index=[0 1 2 3], group=[1 1 1 1], sessions=[0 0 1 1], runs=['0' '1' '0' '1']
Evaluation: index=[1], group=[1], sessions=[0], runs=['1']
Calibration: index=[0], group=[1], sessions=[0], runs=['0']
Evaluation: index=[3], group=[1], sessions=[1], runs=['1']
Calibration: index=[2], group=[1], sessions=[1], runs=['0']
Fold 1:
Train: index=[0 1 2 3], group=[1 1 1 1], sessions=[0 0 1 1], runs=['0' '1' '0' '1']
Test: index=[4 5 6 7], group=[2 2 2 2], sessions=[0 0 1 1], runs=['0' '1' '0' '1']
Evaluation: index=[5], group=[2], sessions=[0], runs=['1']
Calibration: index=[4], group=[2], sessions=[0], runs=['0']
Evaluation: index=[7], group=[2], sessions=[1], runs=['1']
Calibration: index=[6], group=[2], sessions=[1], runs=['0']
"""

def __init__(self, calib_size: int = None):
self.calib_size = calib_size

def get_n_splits(self, metadata):
return metadata.groupby(["subject", "session"])
return len(metadata.groupby(["subject", "session"]))

def split(self, X, y, metadata):

Expand All @@ -93,10 +167,10 @@ def split(self, X, y, metadata):
break # Take the fist run as calibration
else:
calib_size = self.calib_size
calib_ix = group[:calib_size]
test_ix = group[calib_size:]
calib_ix = group[:calib_size].index
test_ix = group[calib_size:].index

yield test_ix, calib_ix # Take first #calib_size samples as calibration
yield list(test_ix), list(calib_ix) # Take first #calib_size samples as calibration


class SamplerSplit(BaseCrossValidator):
Expand All @@ -119,6 +193,40 @@ class SamplerSplit(BaseCrossValidator):
'value' with the actual values as a numpy array. This array should be
sorted, such that values in data_size are strictly monotonically increasing.
Examples
--------
>>> import pandas as pd
>>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 8], [8, 9], [5, 4], [2, 5], [1, 7], [8, 9], [5, 4], [2, 5], [1, 7]])
>>> y = np.array([1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2])
>>> subjects = np.array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
>>> sessions = np.array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1])
>>> runs = np.array(['0', '0', '1', '2', '0', '0', '1', '2', '0', '0', '1', '2'])
>>> metadata = pd.DataFrame(data={'subject': subjects, 'session': sessions, 'run':runs})
>>> from moabb.evaluations.metasplitters import SamplerSplit
>>> from moabb.evaluations.splitters import CrossSessionSplitter
>>> data_size = dict(policy="per_class", value=np.array([2,3]))
>>> data_eval = CrossSessionSplitter()
>>> sampler = SamplerSplit(data_eval, data_size)
>>> for i, (train_index, test_index) in enumerate(sampler.split(X, y, metadata)):
>>> print(f"Fold {i}:")
>>> print(f" Train: index={train_index}, sessions={sessions[train_index]}")
>>> print(f" Test: index={test_index}, sessions={sessions[test_index]}")
Fold 0:
Train: index=[6 8 7 9], sessions=[1 1 1 1]
Test: index=[0 1 2 3 4 5], sessions=[0 0 0 0 0 0]
Fold 1:
Train: index=[ 6 8 10 7 9 11], sessions=[1 1 1 1 1 1]
Test: index=[0 1 2 3 4 5], sessions=[0 0 0 0 0 0]
Fold 2:
Train: index=[0 2 1 3], sessions=[0 0 0 0]
Test: index=[ 6 7 8 9 10 11], sessions=[1 1 1 1 1 1]
Fold 3:
Train: index=[0 2 4 1 3 5], sessions=[0 0 0 0 0 0]
Test: index=[ 6 7 8 9 10 11], sessions=[1 1 1 1 1 1]
"""

def __init__(self, data_eval, data_size):
Expand All @@ -136,9 +244,11 @@ def split(self, X, y, metadata, **kwargs):
cv = self.data_eval
sampler = self.sampler

for ix_train, _ in cv.split(X, y, metadata, **kwargs):
X_train, y_train, meta_train = X[ix_train], y[ix_train], metadata[ix_train]
yield sampler.split(X_train, y_train, meta_train)
for ix_train, ix_test in cv.split(X, y, metadata, **kwargs):
X_train, y_train, meta_train = X[ix_train], y[ix_train], metadata.iloc[ix_train]
for ix_train_sample in sampler.split(X_train, y_train, meta_train):
ix_train_sample = ix_train[ix_train_sample]
yield ix_train_sample, ix_test


class IndividualSamplerSplit(BaseCrossValidator):
Expand Down
2 changes: 1 addition & 1 deletion moabb/evaluations/splitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class CrossSubjectSplitter(BaseCrossValidator):
def __init__(self, n_groups=None):
self.n_groups = n_groups

def get_n_splits(self, X, y, metadata):
def get_n_splits(self, metadata):
return len(metadata.subject.unique())

def split(self, X, y, metadata):
Expand Down
87 changes: 87 additions & 0 deletions moabb/tests/splits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import os.path as osp

import numpy as np
import pytest
import torch
from sklearn.model_selection import StratifiedKFold, LeaveOneGroupOut

from moabb.evaluations.splitters import (CrossSessionSplitter, CrossSubjectSplitter, WithinSessionSplitter)
from moabb.datasets.fake import FakeDataset
from moabb.paradigms.motor_imagery import FakeImageryParadigm

dataset = FakeDataset(["left_hand", "right_hand"], n_subjects=3, seed=12)
paradigm = FakeImageryParadigm()


# Split done for the Within Session evaluation
def eval_split_within_session():
for subject in dataset.subject_list:
X, y, metadata = paradigm.get_data(dataset=dataset, subjects=[subject])
sessions = metadata.session
for session in np.unique(sessions):
ix = sessions == session
cv = StratifiedKFold(5, shuffle=True, random_state=42)
X_, y_ = X[ix], y[ix]
for train, test in cv.split(X_, y_):
yield X_[train], X_[test]

# Split done for the Cross Session evaluation
def eval_split_cross_session():
for subject in dataset.subject_list:
X, y, metadata = paradigm.get_data(dataset=dataset, subjects=[subject])
groups = metadata.session.values
cv = LeaveOneGroupOut()
for train, test in cv.split(X, y, groups):
yield X[train], X[test]

# Split done for the Cross Subject evaluation
def eval_split_cross_subject():
X, y, metadata = paradigm.get_data(dataset=dataset)
groups = metadata.subject.values
cv = LeaveOneGroupOut()
for train, test in cv.split(X, y, groups):
yield X[train], X[test]


def test_within_session():
X, y, metadata = paradigm.get_data(dataset=dataset)

split = WithinSessionSplitter(n_folds=5)

for ix, ((X_train_t, X_test_t), (train, test)) in enumerate(
zip(eval_split_within_session(), split.split(X, y, metadata, random_state=42))):
X_train, X_test = X[train], X[test]

# Check if the output is the same as the input
assert np.array_equal(X_train, X_train_t)
assert np.array_equal(X_test, X_test_t)


def test_cross_session():
X, y, metadata = paradigm.get_data(dataset=dataset)

split = CrossSessionSplitter()

for ix, ((X_train_t, X_test_t), (train, test)) in enumerate(
zip(eval_split_cross_session(), split.split(X, y, metadata))):
X_train, X_test = X[train], X[test]

# Check if the output is the same as the input
assert np.array_equal(X_train, X_train_t)
assert np.array_equal(X_test, X_test_t)


def test_cross_subject():
X, y, metadata = paradigm.get_data(dataset=dataset)

split = CrossSubjectSplitter()

for ix, ((X_train_t, X_test_t), (train, test)) in enumerate(
zip(eval_split_cross_subject(), split.split(X, y, metadata))):
X_train, X_test = X[train], X[test]

# Check if the output is the same as the input
assert np.array_equal(X_train, X_train_t)
assert np.array_equal(X_test, X_test_t)

0 comments on commit a6b5772

Please sign in to comment.