From 2f16e0ffa4f3cf55c867104c376e9c4502164607 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 24 Jan 2022 18:10:33 +0100 Subject: [PATCH] Cluster progress (#195) * fix custom metric example * make Index projection consistently compile-time enabled * rework progress handling * tram toc update * doc fixes and tests * notebooks update --- CMakeLists.txt | 2 + deeptime/clustering/_kmeans.py | 45 ++++++++++-- deeptime/markov/msm/tram/_tram.py | 21 +++--- deeptime/markov/msm/tram/_tram_dataset.py | 11 +-- deeptime/src/include/deeptime/common.h | 15 ++-- deeptime/util/__init__.py | 8 +++ deeptime/util/callbacks.py | 68 +++++++++++++------ deeptime/util/platform.py | 10 ++- docs/requirements.txt | 1 + docs/source/index_msm.rst | 12 +++- docs/source/notebooks | 2 +- .../clustering_custom_metric/CMakeLists.txt | 6 ++ .../clustering_custom_metric/bindings.cpp | 2 +- examples/methods/plot_tram.py | 9 ++- tests/clustering/test_kmeans.py | 44 +++++++++++- tests/markov/msm/test_tram.py | 32 ++++----- tests/markov/msm/test_tram_datatset.py | 8 ++- tests/requirements.txt | 1 + tests/testing_utilities.py | 11 +++ 19 files changed, 229 insertions(+), 79 deletions(-) create mode 100644 examples/clustering_custom_metric/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index 78524e7a6..26cecc9b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,8 @@ add_subdirectory(deeptime/markov/msm/tram/_bindings) add_subdirectory(deeptime/markov/tools/estimation/dense/_bindings) add_subdirectory(deeptime/markov/tools/estimation/sparse/_bindings) +add_subdirectory(examples/clustering_custom_metric) + if(DEEPTIME_BUILD_CPP_TESTS) add_subdirectory(tests) endif() diff --git a/deeptime/clustering/_kmeans.py b/deeptime/clustering/_kmeans.py index 6d20cf923..c481d3ed9 100644 --- a/deeptime/clustering/_kmeans.py +++ b/deeptime/clustering/_kmeans.py @@ -1,5 +1,6 @@ import random import warnings +from contextlib import nullcontext from typing import Optional import numpy as np @@ -7,6 +8,7 @@ from ..base import EstimatorTransformer from ._cluster_model import ClusterModel from . import metrics +from ..util.callbacks import ProgressCallback from ..util.parallel import handle_n_jobs @@ -173,6 +175,10 @@ class KMeans(EstimatorTransformer): initial_centers: None or np.ndarray[k, dim], default=None This is used to resume the kmeans iteration. Note, that if this is set, the init_strategy is ignored and the centers are directly passed to the kmeans iteration algorithm. + progress : object + Progress bar object that `KMeans` will call to indicate progress to the user. Tested for a tqdm progress bar. + The interface is checked + via :meth:`supports_progress_interface `. References ---------- @@ -186,7 +192,7 @@ class KMeans(EstimatorTransformer): def __init__(self, n_clusters: int, max_iter: int = 500, metric='euclidean', tolerance=1e-5, init_strategy: str = 'kmeans++', fixed_seed=False, - n_jobs=None, initial_centers=None): + n_jobs=None, initial_centers=None, progress=None): super(KMeans, self).__init__() self.n_clusters = n_clusters @@ -198,6 +204,7 @@ def __init__(self, n_clusters: int, max_iter: int = 500, metric='euclidean', self.random_state = np.random.RandomState(self.fixed_seed) self.n_jobs = handle_n_jobs(n_jobs) self.initial_centers = initial_centers + self.progress = progress @property def initial_centers(self) -> Optional[np.ndarray]: @@ -421,14 +428,30 @@ def fit(self, data, initial_centers=None, callback_init_centers=None, callback_l if initial_centers is not None: self.initial_centers = initial_centers if self.initial_centers is None: - self.initial_centers = self._pick_initial_centers(data, self.init_strategy, n_jobs, callback_init_centers) + if self.progress is not None: + callback = KMeansCallback(self.progress, "KMeans++ initialization", self.n_clusters, + callback_init_centers) + context = callback + else: + callback = callback_init_centers + context = nullcontext() + with context: + self.initial_centers = self._pick_initial_centers(data, self.init_strategy, n_jobs, callback) # run k-means with all the data converged = False impl = metrics[self.metric] - cluster_centers, code, iterations, cost = impl.kmeans.cluster_loop( - data, self.initial_centers.copy(), n_jobs, self.max_iter, - self.tolerance, callback_loop) + + if self.progress is not None: + callback = KMeansCallback(self.progress, "KMeans iterations", self.max_iter, callback_loop) + context = callback + else: + callback = callback_loop + context = nullcontext() + with context: + cluster_centers, code, iterations, cost = impl.kmeans.cluster_loop( + data, self.initial_centers.copy(), n_jobs, self.max_iter, + self.tolerance, callback) if code == 0: converged = True else: @@ -526,3 +549,15 @@ def partial_fit(self, data, n_jobs=None): self._model._converged = True return self + + +class KMeansCallback(ProgressCallback): + + def __init__(self, progress, description, total, parent_callback=None): + super().__init__(progress, description=description, total=total) + self._parent_callback = parent_callback + + def __call__(self, *args, **kw): + super().__call__(*args, **kw) + if self._parent_callback is not None: + self._parent_callback(*args, **kw) diff --git a/deeptime/markov/msm/tram/_tram.py b/deeptime/markov/msm/tram/_tram.py index a2392d7c7..6989f7109 100644 --- a/deeptime/markov/msm/tram/_tram.py +++ b/deeptime/markov/msm/tram/_tram.py @@ -86,9 +86,9 @@ class TRAM(_MSMBaseEstimator): `track_log_likelihoods=true`, the log-likelihood are also stored. If `callback_interval=0`, no call to the callback function is done. progress : object - Progress bar object that `TRAM` will call to indicate progress to the user. - Tested for a tqdm progress bar. Should implement `update()` and `close()` and have `total` and `desc` - properties. + Progress bar object that `TRAM` will call to indicate progress to the user. Tested for a tqdm progress bar. + The interface is checked + via :meth:`supports_progress_interface `. See also -------- @@ -192,8 +192,7 @@ def fit(self, data, model=None, *args, **kw): return self def _run_estimation(self, tram_input): - """ Estimate the free energies using self-consistent iteration as described in the TRAM paper. - """ + """ Estimate the free energies using self-consistent iteration as described in the TRAM paper. """ with TRAMCallback(self.progress, self.maxiter, self.log_likelihoods, self.increments, self.callback_interval > 0) as callback: self._tram_estimator.estimate(tram_input, self.maxiter, self.maxerr, @@ -202,11 +201,11 @@ def _run_estimation(self, tram_input): if callback.last_increment > self.maxerr: warnings.warn( - f"TRAM did not converge after {self.maxiter} iteration. Last increment: {callback.last_increment}", - ConvergenceWarning) + f"TRAM did not converge after {self.maxiter} iteration(s). " + f"Last increment: {callback.last_increment}", ConvergenceWarning) -class TRAMCallback(callbacks.Callback): +class TRAMCallback(callbacks.ProgressCallback): """Callback for the TRAM estimate process. Increments a progress bar and optionally saves iteration increments and log likelihoods to a list. @@ -214,14 +213,16 @@ class TRAMCallback(callbacks.Callback): ---------- log_likelihoods_list : list, optional A list to append the log-likelihoods to that are passed to the callback.__call__() method. + total : int + Maximum number of callbacks. increments : list, optional A list to append the increments to that are passed to the callback.__call__() method. store_convergence_info : bool, default=False If True, log_likelihoods and increments are appended to their respective lists each time callback.__call__() is called. If false, no values are appended, only the last increment is stored. """ - def __init__(self, progress, n_iter, log_likelihoods_list=None, increments=None, store_convergence_info=False): - super().__init__(progress, n_iter=n_iter, display_text="Running TRAM estimate") + def __init__(self, progress, total, log_likelihoods_list=None, increments=None, store_convergence_info=False): + super().__init__(progress, total=total, description="Running TRAM estimate") self.log_likelihoods = log_likelihoods_list self.increments = increments self.store_convergence_info = store_convergence_info diff --git a/deeptime/markov/msm/tram/_tram_dataset.py b/deeptime/markov/msm/tram/_tram_dataset.py index a8c1746b2..420c37501 100644 --- a/deeptime/markov/msm/tram/_tram_dataset.py +++ b/deeptime/markov/msm/tram/_tram_dataset.py @@ -288,9 +288,10 @@ def restrict_to_largest_connected_set(self, connectivity='post_hoc_RE', connecti Only needed if connectivity="post_hoc_RE" or "BAR_variance". Values greater than 1.0 weaken the connectivity conditions. For 'post_hoc_RE' this multiplies the number of hypothetically observed transitions. For 'BAR_variance' this scales the threshold for the minimal allowed variance of free energy differences. - progress : object, default=None - Progress bar that TRAMDataset will call to indicate progress to the user. - Tested for a tqdm progress bar. Should implement `update()` and `close()`. + progress : object + Progress bar object that `TRAMDataset` will call to indicate progress to the user. + Tested for a tqdm progress bar. The interface is checked + via :meth:`supports_progress_interface `. Raises ------ @@ -416,8 +417,8 @@ def _find_largest_connected_set(self, connectivity, connectivity_factor, progres else: connectivity_fn = tram.find_state_transitions_BAR_variance - with callbacks.Callback(progress, self.n_therm_states * self.n_markov_states, - "Finding connected sets") as callback: + with callbacks.ProgressCallback(progress, "Finding connected sets", + self.n_therm_states * self.n_markov_states) as callback: (i_s, j_s) = connectivity_fn(self.ttrajs, self.dtrajs, self.bias_matrices, all_state_counts, self.n_therm_states, self.n_markov_states, connectivity_factor, callback) diff --git a/deeptime/src/include/deeptime/common.h b/deeptime/src/include/deeptime/common.h index 5bb5c7077..a17dc7112 100644 --- a/deeptime/src/include/deeptime/common.h +++ b/deeptime/src/include/deeptime/common.h @@ -46,6 +46,11 @@ struct ComputeIndex { static constexpr auto compute(const Arr &strides, const std::tuple &tup, std::index_sequence) { return (0 + ... + (strides[I] * std::get(tup))); } + + template + static constexpr auto computeContainer(const Arr &strides, const Arr2 &tup, std::index_sequence) { + return (0 + ... + (strides[I] * std::get(tup))); + } }; } @@ -152,13 +157,9 @@ class Index { * @param indices the Dims-dimensional index * @return the 1D index */ - template - value_type index(const Arr &indices) const { - std::size_t result{0}; - for (std::size_t i = 0; i < Dims; ++i) { - result += _cum_size[i] * indices[i]; - } - return result; + template> + constexpr value_type index(const Arr &indices) const { + return detail::ComputeIndex<>::computeContainer(_cum_size, indices, Indices{}); } /** diff --git a/deeptime/util/__init__.py b/deeptime/util/__init__.py index ec2f9f7f8..2c55813c1 100644 --- a/deeptime/util/__init__.py +++ b/deeptime/util/__init__.py @@ -51,6 +51,12 @@ parallel.handle_n_jobs decorators.cached_property decorators.plotting_function + + callbacks.supports_progress_interface + callbacks.ProgressCallback + + platform.module_available + platform.handle_progress_bar """ from .stats import QuantityStatistics, confidence_interval @@ -58,3 +64,5 @@ from . import data from . import types +from . import callbacks +from . import platform diff --git a/deeptime/util/callbacks.py b/deeptime/util/callbacks.py index c4c25a8b6..6615c30be 100644 --- a/deeptime/util/callbacks.py +++ b/deeptime/util/callbacks.py @@ -1,28 +1,58 @@ -import copy from .platform import handle_progress_bar -class Callback: - """Base callback function for the c++ bindings to indicate progress by incrementing a progress bar. +def supports_progress_interface(bar): + r""" Method to check if a progress bar supports the deeptime interface, meaning that it + has `update`, `close`, and `set_description` methods as well as a `total` attribute. - Parameters - ---------- - progress_bar : object - Tested for a tqdm progress bar. Should implement update() and close() and have .total and .desc properties. - n_iter : int - Number of iterations to completion. - display_text : string - text to display in front of the progress bar. - """ + Parameters + ---------- + bar : object, optional + The progress bar implementation to check, can be None. - def __init__(self, progress, n_iter=None, display_text=None): - self.progress_bar = handle_progress_bar(progress)() - if display_text is not None: - self.progress_bar.desc = display_text - if n_iter is not None: - self.progress_bar.total = n_iter + Returns + ------- + supports : bool + Whether the progress bar is supported. - def __call__(self): + See Also + -------- + ProgressCallback + """ + has_methods = all(callable(getattr(bar, method, None)) for method in supports_progress_interface.required_methods) + return has_methods + + +supports_progress_interface.required_methods = ['update', 'close', 'set_description'] + + +class ProgressCallback: + r"""Base callback function for the c++ bindings to indicate progress by incrementing a progress bar. + + Parameters + ---------- + progress : object + Tested for a tqdm progress bar. Should implement `update()`, `set_description()`, and `close()`. Should + also possess a `total` constructor keyword argument. + total : int + Number of iterations to completion. + description : string + text to display in front of the progress bar. + + See Also + -------- + supports_progress_interface + """ + + def __init__(self, progress, description=None, total=None): + self.progress_bar = handle_progress_bar(progress)(total=total) + assert supports_progress_interface(self.progress_bar), \ + f"Progress bar did not satisfy interface! It should at least have " \ + f"the method(s) {supports_progress_interface.required_methods}." + if description is not None: + self.progress_bar.set_description(description) + + def __call__(self, *args, **kw): self.progress_bar.update() def __enter__(self): diff --git a/deeptime/util/platform.py b/deeptime/util/platform.py index 999188369..8e2697e68 100644 --- a/deeptime/util/platform.py +++ b/deeptime/util/platform.py @@ -36,19 +36,17 @@ def handle_progress_bar(progress): class progress: def __init__(self, x=None, **_): self._x = x + self.total = None - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - return False + def __enter__(self): return self + def __exit__(self, exc_type, exc_val, exc_tb): return False def __iter__(self): for x in self._x: yield x def update(self): pass - def close(self): pass + def set_description(self, *_): pass return progress diff --git a/docs/requirements.txt b/docs/requirements.txt index bdb67510e..623a8382b 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -14,3 +14,4 @@ memory_profiler mdshare nbconvert jupyter +tqdm diff --git a/docs/source/index_msm.rst b/docs/source/index_msm.rst index 5a494cab8..719d683f6 100644 --- a/docs/source/index_msm.rst +++ b/docs/source/index_msm.rst @@ -54,7 +54,6 @@ over the encountered state transitions. This is covered in `transition counting notebooks/mlmsm notebooks/pcca notebooks/tpt - notebooks/tram Furthermore, deeptime implements :class:`Augmented Markov models ` :footcite:`olsson2017combining` which can be used when experimental data is available, as well as @@ -62,6 +61,17 @@ Furthermore, deeptime implements :class:`Augmented Markov models struct MaximumMetric { diff --git a/examples/methods/plot_tram.py b/examples/methods/plot_tram.py index 456d049c4..03a990175 100644 --- a/examples/methods/plot_tram.py +++ b/examples/methods/plot_tram.py @@ -14,7 +14,7 @@ from deeptime.data import tmatrix_metropolis1d from deeptime.markov.msm import MarkovStateModel, TRAM -from deeptime.clustering import KMeansModel +from deeptime.clustering import ClusterModel xs = np.linspace(-1.5, 1.5, num=100) n_samples = 10000 @@ -71,13 +71,12 @@ def sample_trajectories(bias_functions): bias_matrices[i, :, j] = bias_function(traj) # discretize the trajectories into two Markov states (centered around the two wells) - clustering = KMeansModel(cluster_centers=np.asarray([-0.75, 0.75]), metric='euclidean') + clustering = ClusterModel(cluster_centers=np.asarray([-0.75, 0.75]), metric='euclidean') - dtrajs = clustering.transform(trajectories.flatten()).reshape( - (len(bias_matrices), n_samples)) + dtrajs = clustering.transform(trajectories.flatten()).reshape((len(bias_matrices), n_samples)) from tqdm import tqdm - tram = TRAM(lagtime=1, maxiter=100, progress=tqdm) + tram = TRAM(lagtime=1, maxiter=100, maxerr=1e-2, progress=tqdm) # For every simulation frame seen in trajectory i and time step t, btrajs[i][t,k] is the # bias energy of that frame evaluated in the k'th thermodynamic state (i.e. at the k'th diff --git a/tests/clustering/test_kmeans.py b/tests/clustering/test_kmeans.py index 62312265d..506d95d1e 100644 --- a/tests/clustering/test_kmeans.py +++ b/tests/clustering/test_kmeans.py @@ -4,16 +4,18 @@ import numpy as np import pytest -from numpy.testing import assert_equal +from numpy.testing import assert_equal, assert_ from sklearn.datasets import make_blobs from sklearn.utils.extmath import row_norms import deeptime as dt import deeptime.clustering +from tests.testing_utilities import ProgressMock def cluster_kmeans(data, k, max_iter=5, init_strategy='kmeans++', fixed_seed=False, n_jobs=None, cluster_centers=None, - callback_init_centers=None, callback_loop=None) -> (dt.clustering.KMeans, dt.clustering.ClusterModel): + callback_init_centers=None, callback_loop=None) -> ( +dt.clustering.KMeans, dt.clustering.ClusterModel): est = dt.clustering.KMeans(n_clusters=k, max_iter=max_iter, init_strategy=init_strategy, fixed_seed=fixed_seed, n_jobs=n_jobs, initial_centers=np.array(cluster_centers) if cluster_centers is not None else None) @@ -372,3 +374,41 @@ def test_synthetic_trivial(self): found[i] = True assert np.all(found) + + +@pytest.mark.parametrize("with_progress_bar", [True, False]) +def test_kmeans_progress(with_progress_bar): + progress_instances = [] + + class ProgressFactory: + def __new__(cls, *args, **kwargs): + progress = ProgressMock() + progress_instances.append(progress) + return progress + + n_init_callbacks = 0 + n_loop_callbacks = 0 + + def init_callback(*_): + nonlocal n_init_callbacks + n_init_callbacks += 1 + + def loop_callback(*_): + nonlocal n_loop_callbacks + n_loop_callbacks += 1 + + data = np.random.uniform(-10, 10, size=(100, 5)) + kmeans = dt.clustering.KMeans(2, max_iter=5) + if with_progress_bar: + kmeans.progress = ProgressFactory + kmeans.fit(data, callback_init_centers=init_callback, callback_loop=loop_callback) + + assert_equal(n_init_callbacks, 2) + assert_(0 < n_loop_callbacks <= 10) + + if with_progress_bar: + assert_equal(len(progress_instances), 2) + assert_equal(progress_instances[0].n_update_calls, 2) + assert_equal(progress_instances[0].n_close_calls, 1) + assert_equal(progress_instances[1].n_update_calls, n_loop_callbacks) + assert_equal(progress_instances[1].n_close_calls, 1) diff --git a/tests/markov/msm/test_tram.py b/tests/markov/msm/test_tram.py index 48036b954..151328d9a 100644 --- a/tests/markov/msm/test_tram.py +++ b/tests/markov/msm/test_tram.py @@ -1,8 +1,12 @@ import numpy as np import pytest +from tqdm import tqdm +from flaky import flaky + from deeptime.markov.msm import TRAM +from tests.testing_utilities import ProgressMock + from .test_tram_model import make_random_model -from flaky import flaky def make_random_input_data(n_therm_states, n_markov_states, n_samples=10, make_ttrajs=True): @@ -240,24 +244,20 @@ def test_callback_called(track_log_likelihoods): def test_progress_bar_update_called(): - class ProgressMock: - def __init__(self): - self.total = 1 - self.desc = 0 - - def update(self): - self.tracking_ints[0] += 1 + progress = ProgressMock() - def close(self): - self.tracking_ints[1] += 1 + class ProgressFactory: + def __new__(cls, *args, **kwargs): return progress - # workaround to track function calls because the progress bar is copied internally - ProgressMock.tracking_ints = [0, 0] - - tram = TRAM(callback_interval=2, maxiter=10, progress=ProgressMock) + tram = TRAM(callback_interval=2, maxiter=10, progress=ProgressFactory) tram.fit(make_random_input_data(5, 5)) # update() should be called 5 times - np.testing.assert_equal(ProgressMock.tracking_ints[0], 5) + np.testing.assert_equal(progress.n_update_calls, 5) # and close() one time - np.testing.assert_equal(ProgressMock.tracking_ints[1], 1) + np.testing.assert_equal(progress.n_close_calls, 1) + + +def test_tqdm_progress_bar(): + tram = TRAM(callback_interval=2, maxiter=10, progress=tqdm) + tram.fit(make_random_input_data(5, 5)) diff --git a/tests/markov/msm/test_tram_datatset.py b/tests/markov/msm/test_tram_datatset.py index f966f7c0e..c96641418 100644 --- a/tests/markov/msm/test_tram_datatset.py +++ b/tests/markov/msm/test_tram_datatset.py @@ -1,6 +1,7 @@ import numpy as np import pytest -from deeptime.markov import TransitionCountEstimator, TransitionCountModel +from tqdm import tqdm +from deeptime.markov import TransitionCountEstimator from deeptime.markov.msm import TRAMDataset from deeptime.markov.msm.tram._tram_bindings import tram as tram_bindings from .test_tram_model import make_random_model @@ -291,3 +292,8 @@ def test_check_against_model_is_valid(make_ttrajs): model = make_random_model(3, 5) dataset = make_random_dataset(3, 5, make_ttrajs=make_ttrajs) dataset.check_against_model(model) + + +def test_progres_bar(): + dataset = make_random_dataset(3, 5) + dataset.restrict_to_largest_connected_set(progress=tqdm) diff --git a/tests/requirements.txt b/tests/requirements.txt index df415e929..040532a72 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -13,3 +13,4 @@ pytest-cov pytest-faulthandler==2.0.1 pytest-sugar==0.9.4 flaky +tqdm diff --git a/tests/testing_utilities.py b/tests/testing_utilities.py index 72804469d..eb43b031c 100644 --- a/tests/testing_utilities.py +++ b/tests/testing_utilities.py @@ -4,6 +4,17 @@ import numpy as np +class ProgressMock: + def __init__(self): + self.total = 1 + self.n_update_calls = 0 + self.n_close_calls = 0 + + def set_description(self, *_): ... + def update(self): self.n_update_calls += 1 + def close(self): self.n_close_calls += 1 + + class timing(object): """A timing context manager