diff --git a/benchmarks/light_benchmark.py b/benchmarks/light_benchmark.py index 6db1fad5..66759903 100644 --- a/benchmarks/light_benchmark.py +++ b/benchmarks/light_benchmark.py @@ -27,6 +27,7 @@ QuantumClassifierWithDefaultRiemannianPipeline, QuantumMDMWithRiemannianPipeline, ) +from pyriemann_qiskit.classification import QuanticNCH import warnings import sys @@ -102,6 +103,22 @@ LDA(solver="lsqr", shrinkage="auto"), ) +pipelines["NCH+MIN_HULL"] = make_pipeline( + XdawnCovariances( + nfilter=3, + # classes=[labels_dict["Target"]], + estimator="lwf", + xdawn_estimator="scm", + ), + QuanticNCH( + n_hulls_per_class=1, + n_samples_per_hull=3, + n_jobs=12, + subsampling="min", + quantum=False, + ), +) + ############################################################################## # Compute score # -------------- diff --git a/doc/api.rst b/doc/api.rst index abe88dd0..7dfb88ec 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -53,7 +53,7 @@ Utils functions are low level functions for the `classification` and `pipelines` Utils ~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. _hyper_params_factory_api: +.. _utils_api: .. currentmodule:: pyriemann_qiskit.utils.utils .. autosummary:: diff --git a/examples/ERP/classify_P300_nch.py b/examples/ERP/classify_P300_nch.py index 2876edc1..20f4957e 100644 --- a/examples/ERP/classify_P300_nch.py +++ b/examples/ERP/classify_P300_nch.py @@ -19,17 +19,19 @@ # Modified from plot_classify_EEG_tangentspace.py of pyRiemann # License: BSD (3-clause) -from pyriemann.estimation import XdawnCovariances -from sklearn.pipeline import make_pipeline -from matplotlib import pyplot as plt import warnings -import seaborn as sns + +from matplotlib import pyplot as plt from moabb import set_log_level from moabb.datasets import bi2013a from moabb.evaluations import WithinSessionEvaluation from moabb.paradigms import P300 -from pyriemann_qiskit.classification import QuanticNCH from pyriemann.classification import MDM +from pyriemann.estimation import XdawnCovariances +import seaborn as sns +from sklearn.pipeline import make_pipeline + +from pyriemann_qiskit.classification import QuanticNCH print(__doc__) @@ -80,7 +82,7 @@ n_hulls_per_class=1, n_samples_per_hull=3, n_jobs=12, - hull_type="random-hull", + subsampling="random", quantum=False, ), ) @@ -97,7 +99,7 @@ n_hulls_per_class=1, n_samples_per_hull=3, n_jobs=12, - hull_type="min-hull", + subsampling="min", quantum=False, ), ) diff --git a/pyriemann_qiskit/classification.py b/pyriemann_qiskit/classification.py index 1cc94ecf..f55a5cc9 100644 --- a/pyriemann_qiskit/classification.py +++ b/pyriemann_qiskit/classification.py @@ -11,6 +11,7 @@ from warnings import warn from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin +from pyriemann.utils.distance import distance from pyriemann.classification import MDM from pyriemann_qiskit.datasets import get_feature_dimension from pyriemann_qiskit.utils import ( @@ -34,7 +35,7 @@ from .utils.hyper_params_factory import gen_zz_feature_map, gen_two_local, get_spsa from .utils import get_provider, get_devices, get_simulator -from .utils.distance import qdistance_logeuclid_to_convex_hull, distance_logeuclid +from .utils.distance import qdistance_logeuclid_to_convex_hull from joblib import Parallel, delayed import random @@ -58,24 +59,24 @@ class QuanticClassifierBase(BaseEstimator, ClassifierMixin): Parameters ---------- - quantum : bool (default: True) + quantum : bool, default=True - If true will run on local or remote quantum backend (depending on q_account_token value), - If false, will perform classical computing instead. - q_account_token : string (default:None) + q_account_token : string | None, default=None If `quantum` is True and `q_account_token` provided, the classification task will be running on a IBM quantum backend. If `load_account` is provided, the classifier will use the previous token saved with `IBMProvider.save_account()`. - verbose : bool (default:True) + verbose : bool, default=True If true, will output all intermediate results and logs. - shots : int (default:1024) + shots : int, default=1024 Number of repetitions of each circuit, for sampling. - gen_feature_map : Callable[int, QuantumCircuit | FeatureMap] \ - (default : Callable[int, ZZFeatureMap]) + gen_feature_map : Callable[int, QuantumCircuit | FeatureMap], \ + default=Callable[int, ZZFeatureMap] Function generating a feature map to encode data into a quantum state. - seed: int | None (default: None) - Random seed for the simulation + seed : int | None, default=None + Random seed for the simulation. Notes ----- @@ -232,7 +233,7 @@ def score(self, X, y): X : ndarray, shape (n_samples, n_features) Input vector, where `n_samples` is the number of samples and `n_features` is the number of features. - y: ndarray, shape (n_samples,) + y : ndarray, shape (n_samples,) Predicted target vector relative to X. Returns @@ -317,38 +318,38 @@ class QuanticSVM(QuanticClassifierBase): Parameters ---------- - gamma : float | None (default: None) + gamma : float | None, default=None Used as input for sklearn rbf_kernel which is used internally. See [3]_ for more information about gamma. - C : float (default: 1.0) + C : float, default=1.0 Regularization parameter. The strength of the regularization is inversely proportional to C. Must be strictly positive. Note, if pegasos is enabled you may want to consider larger values of C. - max_iter: int | None (default: None) + max_iter: int | None, default=None Number of steps in Pegasos or (Q)SVC. If None, respective default values for Pegasos and SVC are used. The default value for Pegasos is 1000. For (Q)SVC it is -1 (that is not limit). - pegasos : boolean (default: False) + pegasos : boolean, default=False If true, uses Qiskit's PegasosQSVC instead of QSVC. - quantum : bool (default: True) + quantum : bool, default=True - If true will run on local or remote backend (depending on q_account_token value), - If false, will perform classical computing instead. - q_account_token : string (default:None) + q_account_token : string | None, default=None If `quantum` is True and `q_account_token` provided, the classification task will be running on a IBM quantum backend. If `load_account` is provided, the classifier will use the previous token saved with `IBMProvider.save_account()`. - verbose : bool (default:True) + verbose : bool, default=True If true, will output all intermediate results and logs. - shots : int (default:1024) + shots : int, default=1024 Number of repetitions of each circuit, for sampling. - gen_feature_map : Callable[int, QuantumCircuit | FeatureMap] \ - (default : Callable[int, ZZFeatureMap]) + gen_feature_map : Callable[int, QuantumCircuit | FeatureMap], \ + default=Callable[int, ZZFeatureMap] Function generating a feature map to encode data into a quantum state. - seed: int | None (default: None) + seed : int | None, default=None Random seed for the simulation See Also @@ -460,30 +461,29 @@ class QuanticVQC(QuanticClassifierBase): Parameters ---------- - optimizer : Optimizer (default:SPSA) - The classical optimizer to use. - See [3]_ for details. - gen_var_form : Callable[int, QuantumCircuit | VariationalForm] \ - (default: Callable[int, TwoLocal]) + optimizer : Optimizer, default=SPSA + The classical optimizer to use. See [3]_ for details. + gen_var_form : Callable[int, QuantumCircuit | VariationalForm], \ + default=Callable[int, TwoLocal] Function generating a variational form instance. - quantum : bool (default: True) + quantum : bool, default=True - If true will run on local or remote backend (depending on q_account_token value). - If false, will perform classical computing instead. - q_account_token : string (default:None) + q_account_token : string | None, default=None If `quantum` is True and `q_account_token` provided, the classification task will be running on a IBM quantum backend. If `load_account` is provided, the classifier will use the previous token saved with `IBMProvider.save_account()`. - verbose : bool (default:True) + verbose : bool, default=True If true, will output all intermediate results and logs - shots : int (default:1024) + shots : int, default=1024 Number of repetitions of each circuit, for sampling - gen_feature_map : Callable[int, QuantumCircuit | FeatureMap] \ - (default : Callable[int, ZZFeatureMap]) + gen_feature_map : Callable[int, QuantumCircuit | FeatureMap], \ + default=Callable[int, ZZFeatureMap] Function generating a feature map to encode data into a quantum state. - seed: int | None (default: None) - Random seed for the simulation + seed : int | None, default=None + Random seed for the simulation. Notes ----- @@ -623,7 +623,7 @@ class QuanticMDM(QuanticClassifierBase): - If true will run on local or remote backend (depending on q_account_token value), - If false, will perform classical computing instead. - q_account_token : string, default=None + q_account_token : string | None, default=None If `quantum` is True and `q_account_token` provided, the classification task will be running on a IBM quantum backend. If `load_account` is provided, the classifier will use the previous @@ -633,7 +633,7 @@ class QuanticMDM(QuanticClassifierBase): shots : int, default=1024 Number of repetitions of each circuit, for sampling. seed : int | None, default=None - Random seed for the simulation + Random seed for the simulation. upper_bound : int, default=7 The maximum integer value for matrix normalization. regularization : MixinTransformer, default=None @@ -768,16 +768,18 @@ class NearestConvexHull(BaseEstimator, ClassifierMixin, TransformerMixin): Parameters ---------- - n_jobs : int, (default=6) + n_jobs : int, default=6 The number of jobs to use for the computation. This works by computing each of the hulls in parallel. - n_hulls_per_class: int, (default 3) - The number of hulls used per class. - n_samples_per_hull: int, (default 15) - Defines how many samples are used to build a hull. - hull_type: string, (default "min-hull") - Selects how the hull is constructed. Possible values are - "min-hull" and "random-hull" + n_hulls_per_class : int, default=3 + The number of hulls used per class, when subsampling is "random". + n_samples_per_hull : int, default=15 + Defines how many samples are used to build a hull. -1 will include + all samples per class. + subsampling : {"min", "random"}, default="min" + Subsampling strategy of training set to estimate distance to hulls. + "min" estimates hull using the n_samples_per_hull closest matrices. + "random" estimates hull using n_samples_per_hull random matrices. References ---------- @@ -788,7 +790,11 @@ class NearestConvexHull(BaseEstimator, ClassifierMixin, TransformerMixin): """ def __init__( - self, n_jobs=6, n_hulls_per_class=3, n_samples_per_hull=10, hull_type="min-hull" + self, + n_jobs=6, + n_hulls_per_class=3, + n_samples_per_hull=10, + subsampling="min", ): """Init.""" self.n_jobs = n_jobs @@ -796,10 +802,10 @@ def __init__( self.n_hulls_per_class = n_hulls_per_class self.matrices_per_class_ = {} self.debug = False - self.hull_type = hull_type + self.subsampling = subsampling - if hull_type not in ["min-hull", "random-hull"]: - raise Exception("Error: Unknown hull type.") + if subsampling not in ["min", "random"]: + raise ValueError(f"Unknown subsampling type {subsampling}.") def fit(self, X, y): """Fit (store the training data). @@ -833,45 +839,39 @@ def fit(self, X, y): print("End NCH Train") - def _process_sample_min_hull(self, test_sample): + def _process_sample_min_hull(self, x): """Finds the closes N covmats and uses them to build a single hull per class""" - distances = [] + dists = [] for c in self.classes_: - distances_to_covs = [ - distance_logeuclid(test_sample, cov) - for cov in self.matrices_per_class_[c] - ] - - # take the first N min distances - indexes = np.argsort(np.array(distances_to_covs))[ - 0 : self.n_samples_per_hull - ] + dist = distance(self.matrices_per_class_[c], x, metric="logeuclid")[:, 0] + # take the closest matrices + indexes = np.argsort(dist)[0 : self.n_samples_per_hull] if self.debug: - print("Distances to test sample: ", distances_to_covs) + print("Distances to test sample: ", dist) print("Smallest N distances indexes:", indexes) print("Smallest N distances: ") for pp in indexes: - print(distances_to_covs[pp]) + print(dist[pp]) d = qdistance_logeuclid_to_convex_hull( - self.matrices_per_class_[c][indexes], test_sample + self.matrices_per_class_[c][indexes], x ) if self.debug: print("Final hull distance:", d) - distances.append(d) + dists.append(d) - return distances + return dists - def _process_sample_random_hull(self, test_sample): + def _process_sample_random_hull(self, x): """Uses random samples to build a hull, can be several hulls per class""" - distances = [] + dists = [] for c in self.classes_: - total_distance = 0 + dist_total = 0 # using multiple hulls for i in range(0, self.n_hulls_per_class): @@ -882,28 +882,28 @@ def _process_sample_random_hull(self, test_sample): range(self.matrices_per_class_[c].shape[0]), k=self.n_samples_per_hull, ) - hull_data = self.matrices_per_class_[c][random_samples, :, :] + hull_data = self.matrices_per_class_[c][random_samples] - distance = qdistance_logeuclid_to_convex_hull(hull_data, test_sample) - total_distance = total_distance + distance + dist = qdistance_logeuclid_to_convex_hull(hull_data, x) + dist_total = dist_total + dist - distances.append(total_distance) + dists.append(dist_total) - return distances + return dists def _predict_distances(self, X): """Helper to predict the distance. Equivalent to transform.""" - dist = [] + dists = [] if self.debug: print("Total test samples:", X.shape[0]) - if self.hull_type == "min-hull": + if self.subsampling == "min": self._process_sample = self._process_sample_min_hull - elif self.hull_type == "random-hull": + elif self.subsampling == "random": self._process_sample = self._process_sample_random_hull else: - raise Exception("Error: Unknown hull type.") + raise ValueError(f"Unknown subsampling type {self.subsampling}.") parallel = self.n_jobs > 1 @@ -914,16 +914,18 @@ def _predict_distances(self, X): print("Not running in parallel") if parallel: - dist = Parallel(n_jobs=self.n_jobs)( - delayed(self._process_sample)(test_sample) for test_sample in X + dists = Parallel(n_jobs=self.n_jobs)( + delayed(self._process_sample)(x) for x in X ) else: - for test_sample in X: - dist_sample = self._process_sample(test_sample) - dist.append(dist_sample) + for x in X: + dist = self._process_sample(x) + dists.append(dist) - return dist + dists = np.asarray(dists) + + return dists def predict(self, X): """Get the predictions. @@ -940,10 +942,7 @@ def predict(self, X): print("Start NCH Predict") dist = self._predict_distances(X) - predictions = [ - self.classes_[min(range(len(values)), key=values.__getitem__)] - for values in dist - ] + predictions = self.classes_[dist.argmin(axis=1)] if self.debug: print("End NCH Predict") @@ -971,8 +970,9 @@ def transform(self, X): class QuanticNCH(QuanticClassifierBase): - """A Quantum wrapper around the NCH algorithm. It allows both classical - and Quantum versions to be executed. + """A Quantum wrapper around the NCH algorithm. + + It allows both classical and Quantum versions to be executed. Notes ----- @@ -980,39 +980,41 @@ class QuanticNCH(QuanticClassifierBase): Parameters ---------- - quantum : bool (default: True) + quantum : bool, default=True Only applies if `metric` contains a cpm distance or mean. - If true will run on local or remote backend (depending on q_account_token value), - If false, will perform classical computing instead. - q_account_token : string (default:None) + q_account_token : string | None, default=None If `quantum` is True and `q_account_token` provided, the classification task will be running on a IBM quantum backend. If `load_account` is provided, the classifier will use the previous token saved with `IBMProvider.save_account()`. - verbose : bool (default:True) + verbose : bool, default=True If true, will output all intermediate results and logs. - shots : int (default:1024) + shots : int, default=1024 Number of repetitions of each circuit, for sampling. - seed: int | None (default: None) + seed : int | None, default=None Random seed for the simulation - upper_bound : int (default: 7) + upper_bound : int, default=7 The maximum integer value for matrix normalization. - regularization: MixinTransformer (defulat: None) + regularization : MixinTransformer | None, default=None Additional post-processing to regularize means. - classical_optimizer : OptimizationAlgorithm - An instance of OptimizationAlgorithm [3]_ - n_jobs : int, (default=6) + classical_optimizer : OptimizationAlgorithm, default=SlsqpOptimizer() + An instance of OptimizationAlgorithm [3]_. + n_jobs : int, default=6 The number of jobs to use for the computation. This works by computing each of the hulls in parallel. - n_hulls_per_class: int, (default 3) + n_hulls_per_class : int, default=3 The number of hulls used per class. - n_samples_per_hull: int, (default 15) - Defines how many samples are used to build a hull. - hull_type: string, (default "min-hull") - Selects how the hull is constructed. Possible values are - "min-hull" and "random-hull" + n_samples_per_hull : int, default=15 + Defines how many samples are used to build a hull. -1 will include + all samples per class. + subsampling : {"min", "random"}, default="min" + Subsampling strategy of training set to estimate distance to hulls. + "min" estimates hull using the n_samples_per_hull closest matrices. + "random" estimates hull using n_samples_per_hull random matrices. """ @@ -1029,7 +1031,7 @@ def __init__( classical_optimizer=SlsqpOptimizer(), # set here new default optimizer n_hulls_per_class=3, n_samples_per_hull=10, - hull_type="min-hull", + subsampling="min", ): QuanticClassifierBase.__init__( self, quantum, q_account_token, verbose, shots, None, seed @@ -1040,7 +1042,7 @@ def __init__( self.n_hulls_per_class = n_hulls_per_class self.n_samples_per_hull = n_samples_per_hull self.n_jobs = n_jobs - self.hull_type = hull_type + self.subsampling = subsampling def _init_algo(self, n_features): self._log("Nearest Convex Hull Classifier initiating algorithm") @@ -1049,7 +1051,7 @@ def _init_algo(self, n_features): n_hulls_per_class=self.n_hulls_per_class, n_samples_per_hull=self.n_samples_per_hull, n_jobs=self.n_jobs, - hull_type=self.hull_type, + subsampling=self.subsampling, ) if self.quantum: diff --git a/tests/test_classification.py b/tests/test_classification.py index 3fe8dcc3..c58930e5 100644 --- a/tests/test_classification.py +++ b/tests/test_classification.py @@ -7,6 +7,7 @@ QuanticSVM, QuanticVQC, QuanticMDM, + QuanticNCH, ) from pyriemann_qiskit.datasets import get_mne_sample from pyriemann_qiskit.utils.filtering import NaiveDimRed @@ -211,7 +212,7 @@ def get_params(self): metric={"mean": "logdet", "distance": "logdet"}, ) return { - "n_samples": 100, + "n_samples": 50, "n_features": 9, "quantum_instance": quantum_instance, "type": "bin_cov", @@ -234,3 +235,28 @@ def get_params(self): def check(self): TestClassicalMDM.check(self) + + +class TestClassicalNCH(BinaryFVT): + """Test the classical version of NCH using the QuanticNCH wrapper.""" + + def get_params(self): + quantum_instance = QuanticNCH( + n_hulls_per_class=1, + n_samples_per_hull=3, + subsampling="random", + quantum=False, + ) + return { + "n_samples": 50, + "n_features": 7, + "quantum_instance": quantum_instance, + "type": "bin_cov", + } + + def check(self): + assert len(self.prediction) == len(self.labels) + # Check if the number of classes is consistent + assert len(np.unique(self.prediction)) == len(np.unique(self.labels)) + # Check if the proba for each classes are returned + assert self.predict_proab.shape[1] == len(np.unique(self.labels)) diff --git a/tests/test_utils_distance.py b/tests/test_utils_distance.py index 9c08e622..90fd7dd3 100644 --- a/tests/test_utils_distance.py +++ b/tests/test_utils_distance.py @@ -4,10 +4,14 @@ ClassicalOptimizer, NaiveQAOAOptimizer, ) -from pyriemann_qiskit.utils.distance import weights_logeuclid_to_convex_hull +from pyriemann_qiskit.utils.distance import ( + qdistance_logeuclid_to_convex_hull, + weights_logeuclid_to_convex_hull, +) from pyriemann_qiskit.classification import QuanticMDM from pyriemann_qiskit.datasets import get_mne_sample from pyriemann.estimation import XdawnCovariances +from pyriemann.utils.mean import mean_logeuclid from sklearn.pipeline import make_pipeline from sklearn.model_selection import StratifiedKFold, cross_val_score @@ -29,11 +33,24 @@ def test_performance(metric): @pytest.mark.parametrize("optimizer", [ClassicalOptimizer(), NaiveQAOAOptimizer()]) -def test_distance_logeuclid_to_convex_hull_cpm(optimizer): +def test_qdistance_logeuclid_to_convex_hull(optimizer, get_covmats): + n_trials, n_channels = 5, 3 + covmats = get_covmats(n_trials, n_channels) + + dist = qdistance_logeuclid_to_convex_hull(covmats, covmats[0], optimizer=optimizer) + assert dist == pytest.approx(0, rel=1e-5, abs=1e-5) + + covmean = mean_logeuclid(covmats) + dist = qdistance_logeuclid_to_convex_hull(covmats, covmean, optimizer=optimizer) + assert dist == pytest.approx(0, rel=1e-5, abs=1e-5) + + +@pytest.mark.parametrize("optimizer", [ClassicalOptimizer(), NaiveQAOAOptimizer()]) +def test_weight_logeuclid_to_convex_hull(optimizer): X_0 = np.array([[0.9, 1.1], [0.9, 1.1]]) X_1 = X_0 + 1 - X = np.stack((X_0, X_1)) - y = (X_0 + X_1) / 3 - weights = weights_logeuclid_to_convex_hull(X, y, optimizer=optimizer) + X_train = np.stack((X_0, X_1)) + X_test = (X_0 + X_1) / 3 + weights = weights_logeuclid_to_convex_hull(X_train, X_test, optimizer=optimizer) distances = 1 - weights assert distances.argmin() == 0