From edde50830f85906330efa871965c2ff48532e8be Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Fri, 21 Apr 2023 20:54:50 +0200 Subject: [PATCH 01/15] Changing the cross-session to include parallel --- moabb/evaluations/evaluations.py | 175 ++++++++++++++++--------------- 1 file changed, 92 insertions(+), 83 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index e8ee23a6c..1131f6ae3 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -6,6 +6,7 @@ import joblib import numpy as np +from joblib import Parallel, delayed from mne.epochs import BaseEpochs from sklearn.base import clone from sklearn.metrics import get_scorer @@ -476,102 +477,110 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X, y, cv, groups): return grid_clf # flake8: noqa: C901 - def evaluate(self, dataset, pipelines, param_grid): + def evaluate(self, dataset, pipelines, param_grid, n_jobs=-1): if not self.is_valid(dataset): raise AssertionError("Dataset is not appropriate for evaluation") # Progressbar at subject level - for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-CrossSession"): - # check if we already have result for this subject/pipeline - # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) - if len(run_pipes) == 0: - continue + results = Parallel(n_jobs=n_jobs)( + delayed(self.process_subject)(subject, param_grid, pipelines, dataset) + for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-CrossSession") + ) - # get the data - X, y, metadata = self.paradigm.get_data( - dataset=dataset, - subjects=[subject], - return_epochs=self.return_epochs, - return_raws=self.return_raws, - ) - le = LabelEncoder() - y = y if self.mne_labels else le.fit_transform(y) - groups = metadata.session.values - scorer = get_scorer(self.paradigm.scoring) + for result in results: + yield result - for name, clf in run_pipes.items(): - if _carbonfootprint: - # Initialise CodeCarbon - tracker = EmissionsTracker(save_to_file=False, log_level="error") - tracker.start() + def process_subject(self, subject, param_grid, pipelines, dataset): + # check if we already have result for this subject/pipeline + # we might need a better granularity, if we query the DB + run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + if len(run_pipes) == 0: + return - # we want to store a results per session - cv = LeaveOneGroupOut() + # get the data + X, y, metadata = self.paradigm.get_data( + dataset=dataset, + subjects=[subject], + return_epochs=self.return_epochs, + return_raws=self.return_raws, + ) + le = LabelEncoder() + y = y if self.mne_labels else le.fit_transform(y) + groups = metadata.session.values + scorer = get_scorer(self.paradigm.scoring) - grid_clf = clone(clf) + for name, clf in run_pipes.items(): + if _carbonfootprint: + # Initialise CodeCarbon + tracker = EmissionsTracker(save_to_file=False, log_level="error") + tracker.start() - # Load result if the folder exist - name_grid = os.path.join( - str(self.hdf5_path), - "GridSearch_CrossSession", - dataset.code, - str(subject), - name, - ) + # we want to store a results per session + cv = LeaveOneGroupOut() - # Implement Grid Search - grid_clf = self._grid_search( - param_grid, name_grid, name, grid_clf, X, y, cv, groups - ) + grid_clf = clone(clf) - if _carbonfootprint: - emissions_grid = tracker.stop() - if emissions_grid is None: - emissions_grid = 0 + # Load result if the folder exist + name_grid = os.path.join( + str(self.hdf5_path), + "GridSearch_CrossSession", + dataset.code, + str(subject), + name, + ) - for train, test in cv.split(X, y, groups): - if _carbonfootprint: - tracker.start() - t_start = time() - if isinstance(X, BaseEpochs): - cvclf = clone(grid_clf) - cvclf.fit(X[train], y[train]) - score = scorer(cvclf, X[test], y[test]) - else: - result = _fit_and_score( - clone(grid_clf), - X, - y, - scorer, - train, - test, - verbose=False, - parameters=None, - fit_params=None, - error_score=self.error_score, - ) - score = result["test_scores"] - if _carbonfootprint: - emissions = tracker.stop() - if emissions is None: - emissions = 0 + # Implement Grid Search + grid_clf = self._grid_search( + param_grid, name_grid, name, grid_clf, X, y, cv, groups + ) - duration = time() - t_start - nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] - res = { - "time": duration, - "dataset": dataset, - "subject": subject, - "session": groups[test][0], - "score": score, - "n_samples": len(train), - "n_channels": nchan, - "pipeline": name, - } - if _carbonfootprint: - res["carbon_emission"] = (1000 * (emissions + emissions_grid),) + if _carbonfootprint: + emissions_grid = tracker.stop() + if emissions_grid is None: + emissions_grid = 0 - yield res + for train, test in cv.split(X, y, groups): + if _carbonfootprint: + tracker.start() + t_start = time() + if isinstance(X, BaseEpochs): + cvclf = clone(grid_clf) + cvclf.fit(X[train], y[train]) + score = scorer(cvclf, X[test], y[test]) + else: + result = _fit_and_score( + clone(grid_clf), + X, + y, + scorer, + train, + test, + verbose=False, + parameters=None, + fit_params=None, + error_score=self.error_score, + ) + score = result["test_scores"] + if _carbonfootprint: + emissions = tracker.stop() + if emissions is None: + emissions = 0 + + duration = time() - t_start + nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] + res = { + "time": duration, + "dataset": dataset, + "subject": subject, + "session": groups[test][0], + "score": score, + "n_samples": len(train), + "n_channels": nchan, + "pipeline": name, + } + if _carbonfootprint: + res["carbon_emission"] = (1000 * (emissions + emissions_grid),) + + return res def is_valid(self, dataset): return dataset.n_sessions > 1 From 0abe9a54ac1a96dd37620e022a5c5f072e123e60 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Fri, 21 Apr 2023 21:08:28 +0200 Subject: [PATCH 02/15] Fixing pass with return --- moabb/evaluations/evaluations.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 1131f6ae3..9ca9e864a 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -494,7 +494,8 @@ def process_subject(self, subject, param_grid, pipelines, dataset): # we might need a better granularity, if we query the DB run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) if len(run_pipes) == 0: - return + log.info(f"Subject {subject} already processed") + pass # get the data X, y, metadata = self.paradigm.get_data( From bde6d2cafb12fa6b11fb795acf99fd3e456fd980 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Fri, 21 Apr 2023 21:31:58 +0200 Subject: [PATCH 03/15] Changing the evaluations.py --- moabb/evaluations/evaluations.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 9ca9e864a..8c0dc4f42 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -477,25 +477,26 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X, y, cv, groups): return grid_clf # flake8: noqa: C901 - def evaluate(self, dataset, pipelines, param_grid, n_jobs=-1): + def evaluate(self, dataset, pipelines, param_grid): if not self.is_valid(dataset): raise AssertionError("Dataset is not appropriate for evaluation") # Progressbar at subject level - results = Parallel(n_jobs=n_jobs)( + results = [] + for result in Parallel(n_jobs=self.n_jobs)( delayed(self.process_subject)(subject, param_grid, pipelines, dataset) for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-CrossSession") - ) + ): + results.extend(result) - for result in results: - yield result + return results def process_subject(self, subject, param_grid, pipelines, dataset): # check if we already have result for this subject/pipeline # we might need a better granularity, if we query the DB run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) if len(run_pipes) == 0: - log.info(f"Subject {subject} already processed") - pass + print(f"Subject {subject} already processed") + return [] # get the data X, y, metadata = self.paradigm.get_data( @@ -509,6 +510,7 @@ def process_subject(self, subject, param_grid, pipelines, dataset): groups = metadata.session.values scorer = get_scorer(self.paradigm.scoring) + results = [] for name, clf in run_pipes.items(): if _carbonfootprint: # Initialise CodeCarbon @@ -581,7 +583,8 @@ def process_subject(self, subject, param_grid, pipelines, dataset): if _carbonfootprint: res["carbon_emission"] = (1000 * (emissions + emissions_grid),) - return res + results.append(res) + return results def is_valid(self, dataset): return dataset.n_sessions > 1 From 7bf1726529d2a07d713266e5af347ec636dd7be7 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Thu, 27 Apr 2023 18:34:00 +0200 Subject: [PATCH 04/15] Changing the Within evaluations.py --- moabb/evaluations/evaluations.py | 166 ++++++++++++++++--------------- 1 file changed, 88 insertions(+), 78 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 8c0dc4f42..f093761d5 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -175,92 +175,102 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X_, y_, cv): # flake8: noqa: C901 def _evaluate(self, dataset, pipelines, param_grid): # Progress Bar at subject level - for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-WithinSession"): - # check if we already have result for this subject/pipeline - # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) - if len(run_pipes) == 0: - continue - - # get the data - X, y, metadata = self.paradigm.get_data( - dataset, [subject], self.return_epochs, self.return_raws + results = [] + for result in Parallel(n_jobs=self.n_jobs)( + delayed(self._evaluate_subject)(subject, dataset, pipelines, param_grid) + for subject in tqdm( + dataset.subject_list, desc=f"{dataset.code}-WithinSession" ) + ): + results.extend(result) - # iterate over sessions - for session in np.unique(metadata.session): - ix = metadata.session == session + return results - for name, clf in run_pipes.items(): - if _carbonfootprint: - # Initialize CodeCarbon - tracker = EmissionsTracker(save_to_file=False, log_level="error") - tracker.start() - t_start = time() - cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) - scorer = get_scorer(self.paradigm.scoring) - le = LabelEncoder() - y_cv = le.fit_transform(y[ix]) - X_ = X[ix] - y_ = y[ix] if self.mne_labels else y_cv + def _evaluate_subject(self, subject, dataset, pipelines, param_grid): + # check if we already have result for this subject/pipeline + # we might need a better granularity, if we query the DB + run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + if len(run_pipes) == 0: + return + # get the data + X, y, metadata = self.paradigm.get_data( + dataset, [subject], self.return_epochs, self.return_raws + ) - grid_clf = clone(clf) + # iterate over sessions + for session in np.unique(metadata.session): + ix = metadata.session == session - name_grid = os.path.join( - str(self.hdf5_path), - "GridSearch_WithinSession", - dataset.code, - "subject" + str(subject), - str(session), - str(name), - ) + for name, clf in run_pipes.items(): + if _carbonfootprint: + # Initialize CodeCarbon + tracker = EmissionsTracker(save_to_file=False, log_level="error") + tracker.start() + t_start = time() + cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) + scorer = get_scorer(self.paradigm.scoring) + le = LabelEncoder() + y_cv = le.fit_transform(y[ix]) + X_ = X[ix] + y_ = y[ix] if self.mne_labels else y_cv + + grid_clf = clone(clf) + + name_grid = os.path.join( + str(self.hdf5_path), + "GridSearch_WithinSession", + dataset.code, + "subject" + str(subject), + str(session), + str(name), + ) - # Implement Grid Search - grid_clf = self._grid_search( - param_grid, name_grid, name, grid_clf, X_, y_, cv - ) + # Implement Grid Search + grid_clf = self._grid_search( + param_grid, name_grid, name, grid_clf, X_, y_, cv + ) - if isinstance(X, BaseEpochs): - scorer = get_scorer(self.paradigm.scoring) - acc = list() - X_ = X[ix] - y_ = y[ix] if self.mne_labels else y_cv - for train, test in cv.split(X_, y_): - cvclf = clone(grid_clf) - cvclf.fit(X_[train], y_[train]) - acc.append(scorer(cvclf, X_[test], y_[test])) - acc = np.array(acc) - else: - acc = cross_val_score( - grid_clf, - X[ix], - y_cv, - cv=cv, - scoring=self.paradigm.scoring, - n_jobs=self.n_jobs, - error_score=self.error_score, - ) - score = acc.mean() - if _carbonfootprint: - emissions = tracker.stop() - if emissions is None: - emissions = np.NaN - duration = time() - t_start - nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] - res = { - "time": duration / 5.0, # 5 fold CV - "dataset": dataset, - "subject": subject, - "session": session, - "score": score, - "n_samples": len(y_cv), # not training sample - "n_channels": nchan, - "pipeline": name, - } - if _carbonfootprint: - res["carbon_emission"] = (1000 * emissions,) + if isinstance(X, BaseEpochs): + scorer = get_scorer(self.paradigm.scoring) + acc = list() + X_ = X[ix] + y_ = y[ix] if self.mne_labels else y_cv + for train, test in cv.split(X_, y_): + cvclf = clone(grid_clf) + cvclf.fit(X_[train], y_[train]) + acc.append(scorer(cvclf, X_[test], y_[test])) + acc = np.array(acc) + else: + acc = cross_val_score( + grid_clf, + X[ix], + y_cv, + cv=cv, + scoring=self.paradigm.scoring, + n_jobs=self.n_jobs, + error_score=self.error_score, + ) + score = acc.mean() + if _carbonfootprint: + emissions = tracker.stop() + if emissions is None: + emissions = np.NaN + duration = time() - t_start + nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] + res = { + "time": duration / 5.0, # 5 fold CV + "dataset": dataset, + "subject": subject, + "session": session, + "score": score, + "n_samples": len(y_cv), # not training sample + "n_channels": nchan, + "pipeline": name, + } + if _carbonfootprint: + res["carbon_emission"] = (1000 * emissions,) - yield res + yield res def get_data_size_subsets(self, y): if self.data_size is None: From d8da7f6970a0c2493739fc7d84053c9b996f3dca Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Thu, 27 Apr 2023 19:20:30 +0200 Subject: [PATCH 05/15] Reverting --- moabb/evaluations/evaluations.py | 166 ++++++++++++++++--------------- 1 file changed, 88 insertions(+), 78 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 8c0dc4f42..f093761d5 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -175,92 +175,102 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X_, y_, cv): # flake8: noqa: C901 def _evaluate(self, dataset, pipelines, param_grid): # Progress Bar at subject level - for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-WithinSession"): - # check if we already have result for this subject/pipeline - # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) - if len(run_pipes) == 0: - continue - - # get the data - X, y, metadata = self.paradigm.get_data( - dataset, [subject], self.return_epochs, self.return_raws + results = [] + for result in Parallel(n_jobs=self.n_jobs)( + delayed(self._evaluate_subject)(subject, dataset, pipelines, param_grid) + for subject in tqdm( + dataset.subject_list, desc=f"{dataset.code}-WithinSession" ) + ): + results.extend(result) - # iterate over sessions - for session in np.unique(metadata.session): - ix = metadata.session == session + return results - for name, clf in run_pipes.items(): - if _carbonfootprint: - # Initialize CodeCarbon - tracker = EmissionsTracker(save_to_file=False, log_level="error") - tracker.start() - t_start = time() - cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) - scorer = get_scorer(self.paradigm.scoring) - le = LabelEncoder() - y_cv = le.fit_transform(y[ix]) - X_ = X[ix] - y_ = y[ix] if self.mne_labels else y_cv + def _evaluate_subject(self, subject, dataset, pipelines, param_grid): + # check if we already have result for this subject/pipeline + # we might need a better granularity, if we query the DB + run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + if len(run_pipes) == 0: + return + # get the data + X, y, metadata = self.paradigm.get_data( + dataset, [subject], self.return_epochs, self.return_raws + ) - grid_clf = clone(clf) + # iterate over sessions + for session in np.unique(metadata.session): + ix = metadata.session == session - name_grid = os.path.join( - str(self.hdf5_path), - "GridSearch_WithinSession", - dataset.code, - "subject" + str(subject), - str(session), - str(name), - ) + for name, clf in run_pipes.items(): + if _carbonfootprint: + # Initialize CodeCarbon + tracker = EmissionsTracker(save_to_file=False, log_level="error") + tracker.start() + t_start = time() + cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) + scorer = get_scorer(self.paradigm.scoring) + le = LabelEncoder() + y_cv = le.fit_transform(y[ix]) + X_ = X[ix] + y_ = y[ix] if self.mne_labels else y_cv + + grid_clf = clone(clf) + + name_grid = os.path.join( + str(self.hdf5_path), + "GridSearch_WithinSession", + dataset.code, + "subject" + str(subject), + str(session), + str(name), + ) - # Implement Grid Search - grid_clf = self._grid_search( - param_grid, name_grid, name, grid_clf, X_, y_, cv - ) + # Implement Grid Search + grid_clf = self._grid_search( + param_grid, name_grid, name, grid_clf, X_, y_, cv + ) - if isinstance(X, BaseEpochs): - scorer = get_scorer(self.paradigm.scoring) - acc = list() - X_ = X[ix] - y_ = y[ix] if self.mne_labels else y_cv - for train, test in cv.split(X_, y_): - cvclf = clone(grid_clf) - cvclf.fit(X_[train], y_[train]) - acc.append(scorer(cvclf, X_[test], y_[test])) - acc = np.array(acc) - else: - acc = cross_val_score( - grid_clf, - X[ix], - y_cv, - cv=cv, - scoring=self.paradigm.scoring, - n_jobs=self.n_jobs, - error_score=self.error_score, - ) - score = acc.mean() - if _carbonfootprint: - emissions = tracker.stop() - if emissions is None: - emissions = np.NaN - duration = time() - t_start - nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] - res = { - "time": duration / 5.0, # 5 fold CV - "dataset": dataset, - "subject": subject, - "session": session, - "score": score, - "n_samples": len(y_cv), # not training sample - "n_channels": nchan, - "pipeline": name, - } - if _carbonfootprint: - res["carbon_emission"] = (1000 * emissions,) + if isinstance(X, BaseEpochs): + scorer = get_scorer(self.paradigm.scoring) + acc = list() + X_ = X[ix] + y_ = y[ix] if self.mne_labels else y_cv + for train, test in cv.split(X_, y_): + cvclf = clone(grid_clf) + cvclf.fit(X_[train], y_[train]) + acc.append(scorer(cvclf, X_[test], y_[test])) + acc = np.array(acc) + else: + acc = cross_val_score( + grid_clf, + X[ix], + y_cv, + cv=cv, + scoring=self.paradigm.scoring, + n_jobs=self.n_jobs, + error_score=self.error_score, + ) + score = acc.mean() + if _carbonfootprint: + emissions = tracker.stop() + if emissions is None: + emissions = np.NaN + duration = time() - t_start + nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] + res = { + "time": duration / 5.0, # 5 fold CV + "dataset": dataset, + "subject": subject, + "session": session, + "score": score, + "n_samples": len(y_cv), # not training sample + "n_channels": nchan, + "pipeline": name, + } + if _carbonfootprint: + res["carbon_emission"] = (1000 * emissions,) - yield res + yield res def get_data_size_subsets(self, y): if self.data_size is None: From 0eba45eefd8d22b17265ded5f0fc6ce665275246 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Thu, 27 Apr 2023 19:25:03 +0200 Subject: [PATCH 06/15] Reverting again --- moabb/evaluations/evaluations.py | 166 +++++++++++++++---------------- 1 file changed, 78 insertions(+), 88 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index f093761d5..8c0dc4f42 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -175,102 +175,92 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X_, y_, cv): # flake8: noqa: C901 def _evaluate(self, dataset, pipelines, param_grid): # Progress Bar at subject level - results = [] - for result in Parallel(n_jobs=self.n_jobs)( - delayed(self._evaluate_subject)(subject, dataset, pipelines, param_grid) - for subject in tqdm( - dataset.subject_list, desc=f"{dataset.code}-WithinSession" - ) - ): - results.extend(result) - - return results - - def _evaluate_subject(self, subject, dataset, pipelines, param_grid): - # check if we already have result for this subject/pipeline - # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) - if len(run_pipes) == 0: - return - # get the data - X, y, metadata = self.paradigm.get_data( - dataset, [subject], self.return_epochs, self.return_raws - ) - - # iterate over sessions - for session in np.unique(metadata.session): - ix = metadata.session == session + for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-WithinSession"): + # check if we already have result for this subject/pipeline + # we might need a better granularity, if we query the DB + run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + if len(run_pipes) == 0: + continue - for name, clf in run_pipes.items(): - if _carbonfootprint: - # Initialize CodeCarbon - tracker = EmissionsTracker(save_to_file=False, log_level="error") - tracker.start() - t_start = time() - cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) - scorer = get_scorer(self.paradigm.scoring) - le = LabelEncoder() - y_cv = le.fit_transform(y[ix]) - X_ = X[ix] - y_ = y[ix] if self.mne_labels else y_cv - - grid_clf = clone(clf) - - name_grid = os.path.join( - str(self.hdf5_path), - "GridSearch_WithinSession", - dataset.code, - "subject" + str(subject), - str(session), - str(name), - ) + # get the data + X, y, metadata = self.paradigm.get_data( + dataset, [subject], self.return_epochs, self.return_raws + ) - # Implement Grid Search - grid_clf = self._grid_search( - param_grid, name_grid, name, grid_clf, X_, y_, cv - ) + # iterate over sessions + for session in np.unique(metadata.session): + ix = metadata.session == session - if isinstance(X, BaseEpochs): + for name, clf in run_pipes.items(): + if _carbonfootprint: + # Initialize CodeCarbon + tracker = EmissionsTracker(save_to_file=False, log_level="error") + tracker.start() + t_start = time() + cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) scorer = get_scorer(self.paradigm.scoring) - acc = list() + le = LabelEncoder() + y_cv = le.fit_transform(y[ix]) X_ = X[ix] y_ = y[ix] if self.mne_labels else y_cv - for train, test in cv.split(X_, y_): - cvclf = clone(grid_clf) - cvclf.fit(X_[train], y_[train]) - acc.append(scorer(cvclf, X_[test], y_[test])) - acc = np.array(acc) - else: - acc = cross_val_score( - grid_clf, - X[ix], - y_cv, - cv=cv, - scoring=self.paradigm.scoring, - n_jobs=self.n_jobs, - error_score=self.error_score, + + grid_clf = clone(clf) + + name_grid = os.path.join( + str(self.hdf5_path), + "GridSearch_WithinSession", + dataset.code, + "subject" + str(subject), + str(session), + str(name), ) - score = acc.mean() - if _carbonfootprint: - emissions = tracker.stop() - if emissions is None: - emissions = np.NaN - duration = time() - t_start - nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] - res = { - "time": duration / 5.0, # 5 fold CV - "dataset": dataset, - "subject": subject, - "session": session, - "score": score, - "n_samples": len(y_cv), # not training sample - "n_channels": nchan, - "pipeline": name, - } - if _carbonfootprint: - res["carbon_emission"] = (1000 * emissions,) - yield res + # Implement Grid Search + grid_clf = self._grid_search( + param_grid, name_grid, name, grid_clf, X_, y_, cv + ) + + if isinstance(X, BaseEpochs): + scorer = get_scorer(self.paradigm.scoring) + acc = list() + X_ = X[ix] + y_ = y[ix] if self.mne_labels else y_cv + for train, test in cv.split(X_, y_): + cvclf = clone(grid_clf) + cvclf.fit(X_[train], y_[train]) + acc.append(scorer(cvclf, X_[test], y_[test])) + acc = np.array(acc) + else: + acc = cross_val_score( + grid_clf, + X[ix], + y_cv, + cv=cv, + scoring=self.paradigm.scoring, + n_jobs=self.n_jobs, + error_score=self.error_score, + ) + score = acc.mean() + if _carbonfootprint: + emissions = tracker.stop() + if emissions is None: + emissions = np.NaN + duration = time() - t_start + nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] + res = { + "time": duration / 5.0, # 5 fold CV + "dataset": dataset, + "subject": subject, + "session": session, + "score": score, + "n_samples": len(y_cv), # not training sample + "n_channels": nchan, + "pipeline": name, + } + if _carbonfootprint: + res["carbon_emission"] = (1000 * emissions,) + + yield res def get_data_size_subsets(self, y): if self.data_size is None: From 4a15e28ab4ba3ebbde166671a7cec27649d36baf Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Wed, 3 May 2023 10:42:25 +0200 Subject: [PATCH 07/15] Parallel WithinSession --- moabb/evaluations/evaluations.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index f093761d5..988f1822d 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -177,21 +177,21 @@ def _evaluate(self, dataset, pipelines, param_grid): # Progress Bar at subject level results = [] for result in Parallel(n_jobs=self.n_jobs)( - delayed(self._evaluate_subject)(subject, dataset, pipelines, param_grid) + delayed(self.process_subject)(subject, param_grid, pipelines, dataset) for subject in tqdm( dataset.subject_list, desc=f"{dataset.code}-WithinSession" ) ): results.extend(result) - return results - def _evaluate_subject(self, subject, dataset, pipelines, param_grid): + def process_subject(self, subject, param_grid, pipelines, dataset): # check if we already have result for this subject/pipeline # we might need a better granularity, if we query the DB run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) if len(run_pipes) == 0: - return + return [] + # get the data X, y, metadata = self.paradigm.get_data( dataset, [subject], self.return_epochs, self.return_raws From 14c1eb36a6af516ab6d1e129c59c2c6ac67bd905 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Tue, 16 May 2023 14:35:09 +0200 Subject: [PATCH 08/15] Updating the evaluation, removing the yield --- moabb/evaluations/evaluations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 988f1822d..2c580d137 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -408,7 +408,7 @@ def evaluate(self, dataset, pipelines, param_grid): if self.calculate_learning_curve: yield from self._evaluate_learning_curve(dataset, pipelines) else: - yield from self._evaluate(dataset, pipelines, param_grid) + return self._evaluate(dataset, pipelines, param_grid) def is_valid(self, dataset): return True From 6c828b5edf91350e7e90e7fc662beb7bd8289179 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Tue, 16 May 2023 14:56:20 +0200 Subject: [PATCH 09/15] Updating the evaluation, removing the yield --- moabb/evaluations/evaluations.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 2c580d137..86ccd47a3 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -270,7 +270,7 @@ def process_subject(self, subject, param_grid, pipelines, dataset): if _carbonfootprint: res["carbon_emission"] = (1000 * emissions,) - yield res + return res def get_data_size_subsets(self, y): if self.data_size is None: @@ -408,7 +408,7 @@ def evaluate(self, dataset, pipelines, param_grid): if self.calculate_learning_curve: yield from self._evaluate_learning_curve(dataset, pipelines) else: - return self._evaluate(dataset, pipelines, param_grid) + yield from self._evaluate(dataset, pipelines, param_grid) def is_valid(self, dataset): return True From 4c965981c6aaeaad579793994f114e9e9347daaa Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Tue, 16 May 2023 15:04:41 +0200 Subject: [PATCH 10/15] Changing the parameter to base evaluation --- moabb/evaluations/base.py | 6 +++++- moabb/evaluations/evaluations.py | 13 +++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/moabb/evaluations/base.py b/moabb/evaluations/base.py index d548503b6..38ea2bcd6 100644 --- a/moabb/evaluations/base.py +++ b/moabb/evaluations/base.py @@ -27,6 +27,9 @@ class BaseEvaluation(ABC): If not None, can guarantee same seed for shuffling examples. n_jobs: int, default=1 Number of jobs for fitting of pipeline. + n_jobs_evaluation: int, default=1 + Number of jobs for evaluation, processing in parallel the within session, + cross-session or cross-subject. overwrite: bool, default=False If true, overwrite the results. error_score: "raise" or numeric, default="raise" @@ -52,6 +55,7 @@ def __init__( datasets=None, random_state=None, n_jobs=1, + n_jobs_evaluation=1, overwrite=False, error_score="raise", suffix="", @@ -63,12 +67,12 @@ def __init__( ): self.random_state = random_state self.n_jobs = n_jobs + self.n_jobs_evaluation = n_jobs_evaluation self.error_score = error_score self.hdf5_path = hdf5_path self.return_epochs = return_epochs self.return_raws = return_raws self.mne_labels = mne_labels - # check paradigm if not isinstance(paradigm, BaseParadigm): raise (ValueError("paradigm must be an Paradigm instance")) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 86ccd47a3..76d4d4aa6 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -72,6 +72,9 @@ class WithinSessionEvaluation(BaseEvaluation): If not None, can guarantee same seed for shuffling examples. n_jobs: int, default=1 Number of jobs for fitting of pipeline. + n_jobs_evaluation: int, default=1 + Number of jobs for evaluation, processing in parallel the within session, + cross-session or cross-subject. overwrite: bool, default=False If true, overwrite the results. error_score: "raise" or numeric, default="raise" @@ -176,7 +179,7 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X_, y_, cv): def _evaluate(self, dataset, pipelines, param_grid): # Progress Bar at subject level results = [] - for result in Parallel(n_jobs=self.n_jobs)( + for result in Parallel(n_jobs=self.n_jobs_evaluation)( delayed(self.process_subject)(subject, param_grid, pipelines, dataset) for subject in tqdm( dataset.subject_list, desc=f"{dataset.code}-WithinSession" @@ -432,6 +435,9 @@ class CrossSessionEvaluation(BaseEvaluation): If not None, can guarantee same seed for shuffling examples. n_jobs: int, default=1 Number of jobs for fitting of pipeline. + n_jobs_evaluation: int, default=1 + Number of jobs for evaluation, processing in parallel the within session, + cross-session or cross-subject. overwrite: bool, default=False If true, overwrite the results. error_score: "raise" or numeric, default="raise" @@ -492,7 +498,7 @@ def evaluate(self, dataset, pipelines, param_grid): raise AssertionError("Dataset is not appropriate for evaluation") # Progressbar at subject level results = [] - for result in Parallel(n_jobs=self.n_jobs)( + for result in Parallel(n_jobs=self.n_jobs_evaluation)( delayed(self.process_subject)(subject, param_grid, pipelines, dataset) for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-CrossSession") ): @@ -617,6 +623,9 @@ class CrossSubjectEvaluation(BaseEvaluation): If not None, can guarantee same seed for shuffling examples. n_jobs: int, default=1 Number of jobs for fitting of pipeline. + n_jobs_evaluation: int, default=1 + Number of jobs for evaluation, processing in parallel the within session, + cross-session or cross-subject. overwrite: bool, default=False If true, overwrite the results. error_score: "raise" or numeric, default="raise" From 275a16ed81f6554f635a983574568a8248e3f66b Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Tue, 16 May 2023 16:06:26 +0200 Subject: [PATCH 11/15] Adding verbose as true --- moabb/evaluations/evaluations.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 76d4d4aa6..36051eaee 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -179,7 +179,7 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X_, y_, cv): def _evaluate(self, dataset, pipelines, param_grid): # Progress Bar at subject level results = [] - for result in Parallel(n_jobs=self.n_jobs_evaluation)( + for result in Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( delayed(self.process_subject)(subject, param_grid, pipelines, dataset) for subject in tqdm( dataset.subject_list, desc=f"{dataset.code}-WithinSession" @@ -193,7 +193,7 @@ def process_subject(self, subject, param_grid, pipelines, dataset): # we might need a better granularity, if we query the DB run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) if len(run_pipes) == 0: - return [] + return # get the data X, y, metadata = self.paradigm.get_data( @@ -498,7 +498,7 @@ def evaluate(self, dataset, pipelines, param_grid): raise AssertionError("Dataset is not appropriate for evaluation") # Progressbar at subject level results = [] - for result in Parallel(n_jobs=self.n_jobs_evaluation)( + for result in Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( delayed(self.process_subject)(subject, param_grid, pipelines, dataset) for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-CrossSession") ): From 563446b5d4b28f4fdc14c9e2481df6003af7a213 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Tue, 16 May 2023 16:38:47 +0200 Subject: [PATCH 12/15] Fixing the issue =) --- moabb/evaluations/evaluations.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 36051eaee..4f7230fa7 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -176,30 +176,31 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X_, y_, cv): return grid_clf # flake8: noqa: C901 + def _evaluate(self, dataset, pipelines, param_grid): - # Progress Bar at subject level - results = [] - for result in Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( - delayed(self.process_subject)(subject, param_grid, pipelines, dataset) + results = Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( + delayed(self._evaluate_subject)(dataset, pipelines, param_grid, subject) for subject in tqdm( dataset.subject_list, desc=f"{dataset.code}-WithinSession" ) - ): - results.extend(result) - return results + ) - def process_subject(self, subject, param_grid, pipelines, dataset): + # Concatenate the results from all subjects + yield from [res for subject_results in results for res in subject_results] + + def _evaluate_subject(self, dataset, pipelines, param_grid, subject): + # Progress Bar at subject level # check if we already have result for this subject/pipeline # we might need a better granularity, if we query the DB run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) if len(run_pipes) == 0: - return + return [] # get the data X, y, metadata = self.paradigm.get_data( dataset, [subject], self.return_epochs, self.return_raws ) - + subject_results = [] # iterate over sessions for session in np.unique(metadata.session): ix = metadata.session == session @@ -272,8 +273,9 @@ def process_subject(self, subject, param_grid, pipelines, dataset): } if _carbonfootprint: res["carbon_emission"] = (1000 * emissions,) + subject_results.append(res) - return res + return subject_results def get_data_size_subsets(self, y): if self.data_size is None: From 654dad247c8629597adae2d133d177bea03b8ff4 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Tue, 16 May 2023 19:39:38 +0200 Subject: [PATCH 13/15] Saving the models --- moabb/evaluations/evaluations.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 4f7230fa7..94b653e67 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -237,12 +237,15 @@ def _evaluate_subject(self, dataset, pipelines, param_grid, subject): if isinstance(X, BaseEpochs): scorer = get_scorer(self.paradigm.scoring) acc = list() + model_list = list() X_ = X[ix] y_ = y[ix] if self.mne_labels else y_cv for train, test in cv.split(X_, y_): cvclf = clone(grid_clf) cvclf.fit(X_[train], y_[train]) acc.append(scorer(cvclf, X_[test], y_[test])) + model_list.append(deepcopy(cvclf)) + acc = np.array(acc) else: acc = cross_val_score( @@ -261,6 +264,23 @@ def _evaluate_subject(self, dataset, pipelines, param_grid, subject): emissions = np.NaN duration = time() - t_start nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] + + name_save = os.path.join( + str(self.hdf5_path), + "TrainedModels_WithinSession", + dataset.code, + "subject" + str(subject), + str(session), + str(name), + ) + + for id_cv, model in enumerate(model_list): + os.makedirs(name_save, exist_ok=True) + joblib.dump( + model, + os.path.join(name_save, f"cv{id_cv}.pkl"), + ) + res = { "time": duration / 5.0, # 5 fold CV "dataset": dataset, From b39239714a5bc268a0124a21106692b510310ed4 Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Wed, 17 May 2023 14:51:40 +0200 Subject: [PATCH 14/15] Changing the save folder --- moabb/evaluations/evaluations.py | 33 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 94b653e67..860305e76 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -234,17 +234,30 @@ def _evaluate_subject(self, dataset, pipelines, param_grid, subject): param_grid, name_grid, name, grid_clf, X_, y_, cv ) + name_save = os.path.join( + str(self.hdf5_path), + "TrainedModels_WithinSession", + dataset.code, + "subject" + str(subject), + str(session), + str(name), + ) + if isinstance(X, BaseEpochs): scorer = get_scorer(self.paradigm.scoring) acc = list() - model_list = list() X_ = X[ix] y_ = y[ix] if self.mne_labels else y_cv for train, test in cv.split(X_, y_): cvclf = clone(grid_clf) cvclf.fit(X_[train], y_[train]) acc.append(scorer(cvclf, X_[test], y_[test])) - model_list.append(deepcopy(cvclf)) + + os.makedirs(name_save, exist_ok=True) + joblib.dump( + cvclf, + os.path.join(name_save, f"cv{id_cv}.pkl"), + ) acc = np.array(acc) else: @@ -265,22 +278,6 @@ def _evaluate_subject(self, dataset, pipelines, param_grid, subject): duration = time() - t_start nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] - name_save = os.path.join( - str(self.hdf5_path), - "TrainedModels_WithinSession", - dataset.code, - "subject" + str(subject), - str(session), - str(name), - ) - - for id_cv, model in enumerate(model_list): - os.makedirs(name_save, exist_ok=True) - joblib.dump( - model, - os.path.join(name_save, f"cv{id_cv}.pkl"), - ) - res = { "time": duration / 5.0, # 5 fold CV "dataset": dataset, From 20a968ac64cf88c7a5d693eccc20dea27507080b Mon Sep 17 00:00:00 2001 From: bruAristimunha Date: Wed, 17 May 2023 14:59:42 +0200 Subject: [PATCH 15/15] Adding id_cv --- moabb/evaluations/evaluations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 860305e76..d83f17492 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -248,7 +248,7 @@ def _evaluate_subject(self, dataset, pipelines, param_grid, subject): acc = list() X_ = X[ix] y_ = y[ix] if self.mne_labels else y_cv - for train, test in cv.split(X_, y_): + for id_cv, (train, test) in enumerate(cv.split(X_, y_)): cvclf = clone(grid_clf) cvclf.fit(X_[train], y_[train]) acc.append(scorer(cvclf, X_[test], y_[test]))