From a2781d7bd15ed258c26eab6be85e012c22a62911 Mon Sep 17 00:00:00 2001 From: Shahar Bar Date: Tue, 31 Dec 2024 12:22:57 +0200 Subject: [PATCH] Adaptive Windowing for Multi-Armed Bandits ### Changes: * Added adaptive windowing mechanism to detect and handle concept drift in MAB models. * Introduced ActionsManager class to handle action memory and updates with configurable window sizes. * Refactored Model class hierarchy to support model resetting and memory management. * Added support for infinite and fixed-size windows with change detection via delta parameter. * Enhanced test coverage for adaptive windowing functionality across MAB variants. --- .gitignore | 1 + pybandits/actions_manager.py | 626 +++++++++++++++ pybandits/base.py | 15 +- pybandits/cmab.py | 48 +- pybandits/mab.py | 266 +++---- pybandits/model.py | 191 ++++- pybandits/pydantic_version_compatibility.py | 2 + pybandits/smab.py | 41 +- pybandits/strategy.py | 10 +- pyproject.toml | 4 +- tests/test_actions_manager.py | 91 +++ tests/test_cmab.py | 546 ++++++++++--- tests/test_model.py | 42 +- tests/test_smab.py | 828 ++++++++++++++++---- tests/test_strategy.py | 16 +- 15 files changed, 2185 insertions(+), 542 deletions(-) create mode 100644 pybandits/actions_manager.py create mode 100644 tests/test_actions_manager.py diff --git a/.gitignore b/.gitignore index c206dc6..b09f433 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,4 @@ MANIFEST # poetry poetry.lock +.qodo diff --git a/pybandits/actions_manager.py b/pybandits/actions_manager.py new file mode 100644 index 0000000..b1e86bf --- /dev/null +++ b/pybandits/actions_manager.py @@ -0,0 +1,626 @@ +import warnings +from abc import ABC, abstractmethod +from collections import defaultdict, deque +from inspect import isclass +from typing import Any, Callable, Deque, Dict, Generic, List, Literal, Optional, Set, Union + +import numpy as np +from numpy.typing import ArrayLike + +from pybandits.base import ACTION_IDS_PREFIX, ACTIONS, ActionId, BinaryReward, Probability, PyBanditsBaseModel +from pybandits.model import ( + BaseModel, + CmabModelType, + Model, + ModelMO, + SmabModelType, +) +from pybandits.pydantic_version_compatibility import ( + PYDANTIC_VERSION_1, + PYDANTIC_VERSION_2, + GenericModel, + NonNegativeInt, + PositiveInt, + field_validator, + model_validator, + pydantic_version, + validate_call, +) +from pybandits.utils import extract_argument_names_from_function + +_NO_CHANGE_POINT = -1 + + +class ActionsManager(PyBanditsBaseModel, ABC): + """ + Base class for managing actions and their associated models. + The class allows to account for non-stationarity by providing an adaptive window scheme for action update. + + Parameters + ---------- + actions : Dict[ActionId, Model] + The list of possible actions, and their associated Model. + adaptive_window_size : Optional[Union[PositiveInt, Literal["inf"]]] + The size of the adaptive window for action update. If None, no adaptive window is used. + delta : Optional[Probability], 0.1 if not specified. + The confidence level for the adaptive window. + """ + + actions: Dict[ActionId, BaseModel] + adaptive_window_size: Optional[Union[PositiveInt, Literal["inf"]]] = None + delta: Optional[Probability] = None + + actions_memory: Optional[Deque] = None + rewards_memory: Optional[Deque] = None + + if pydantic_version == PYDANTIC_VERSION_1: + + class Config: + arbitrary_types_allowed = True + json_encoders = {deque: list} + + elif pydantic_version == PYDANTIC_VERSION_2: + model_config = {"arbitrary_types_allowed": True, "json_encoders": {deque: list}} + else: + raise ValueError(f"Unsupported pydantic version: {pydantic_version}") + + @field_validator("actions", mode="before") + @classmethod + def at_least_one_action_is_defined(cls, v): + # validate number of actions + if len(v) == 0: + raise AttributeError("At least one action should be defined.") + elif len(v) == 1: + warnings.warn("Only a single action was supplied. This MAB will be deterministic.") + # validate that all actions are of the same configuration + action_models = list(v.values()) + action_type = cls._get_field_type("actions") + if any(not isinstance(action, action_type) for action in action_models): + raise TypeError(f"All actions should follow {action_type} type.") + return v + + if pydantic_version == PYDANTIC_VERSION_1: + + @model_validator(mode="before") + @classmethod + def check_delta(cls, values): + delta = cls._get_value_with_default("delta", values) + adaptive_window_size = cls._get_value_with_default("adaptive_window_size", values) + if delta is not None and not adaptive_window_size: + raise AttributeError("Delta should only be defined when adaptive_window_size is defined.") + if adaptive_window_size and delta is None: + values["delta"] = 0.1 + return values + + @model_validator(mode="before") + @classmethod + def maybe_initialize_memory(cls, values): + reference_memory_len = None + expected_memory_length_for_inf = cls._get_expected_memory_length(actions=values["actions"]) + for memory_name in ["actions_memory", "rewards_memory"]: + if values["adaptive_window_size"] is None and values.get(memory_name, None) is not None: + raise AttributeError(f"{memory_name} should only be defined when adaptive_window_size is defined.") + if values["adaptive_window_size"] is not None: + if memory_name not in values or values[memory_name] is None: + values[memory_name] = ( + deque() + if values["adaptive_window_size"] == "inf" + else deque(maxlen=values["adaptive_window_size"]) + ) + else: + memory_len = len(values[memory_name]) + if reference_memory_len is not None and memory_len != reference_memory_len: + raise AttributeError(f"{memory_name} should have the same length as the other memory.") + else: + reference_memory_len = memory_len + if values["adaptive_window_size"] is int: + if memory_len > values["adaptive_window_size"]: + raise AttributeError( + f"{memory_name} should have a length less than or equal to adaptive_window_size." + ) + else: # adaptive_window_size == "inf" + if memory_len > expected_memory_length_for_inf: + raise AttributeError( + f"{memory_name} should have a length less than or equal to the expected memory length." + ) + if isinstance(values[memory_name], list): # serialization from json + maxlen = values["adaptive_window_size"] if values["adaptive_window_size"] != "inf" else None + values[memory_name] = deque(values[memory_name], maxlen=maxlen) + + return values + + elif pydantic_version == PYDANTIC_VERSION_2: + + @model_validator(mode="after") + def check_delta(self): + if self.delta is not None and not self.adaptive_window_size: + raise AttributeError("Delta should only be defined when adaptive_window_size is defined.") + if self.adaptive_window_size and self.delta is None: + self.delta = 0.1 + return self + + @model_validator(mode="after") + def maybe_initialize_memory(self): + reference_memory_len = None + expected_memory_length_for_inf = self._get_expected_memory_length(actions=self.actions) + for memory_name in ["actions_memory", "rewards_memory"]: + if self.adaptive_window_size is None and getattr(self, memory_name, None) is not None: + raise AttributeError(f"{memory_name} should only be defined when adaptive_window_size is defined.") + if self.adaptive_window_size is not None: + if not hasattr(self, memory_name) or getattr(self, memory_name) is None: + setattr( + self, + memory_name, + deque() if self.adaptive_window_size == "inf" else deque(maxlen=self.adaptive_window_size), + ) + else: + if reference_memory_len is not None and len(getattr(self, memory_name)) != reference_memory_len: + raise AttributeError(f"{memory_name} should have the same length as the other memory.") + else: + reference_memory_len = len(getattr(self, memory_name)) + if self.adaptive_window_size is int: + if len(getattr(self, memory_name)) > self.adaptive_window_size: + raise AttributeError( + f"{memory_name} should have a length less than or equal to adaptive_window_size." + ) + else: # adaptive_window_size == "inf" + if len(getattr(self, memory_name)) > expected_memory_length_for_inf: + raise AttributeError( + f"{memory_name} should have a length less than or equal to the expected memory length." + ) + return self + + else: + raise ValueError(f"Unsupported pydantic version: {pydantic_version}") + + @classmethod + def _get_expected_memory_length(cls, actions: Dict[ActionId, BaseModel]) -> NonNegativeInt: + """ + Get the expected memory length for the adaptive window. + + Parameters + ---------- + actions : Dict[ActionId, BaseModel] + The list of possible actions, and their associated Model. + + Returns + ------- + NonNegativeInt + The expected memory length. + """ + if not actions: + raise AttributeError("At least one action should be defined.") + reference_model = list(actions.values())[0] + if isinstance(reference_model, Model): + expected_memory_length_for_inf = sum( + [action_model.n_successes + action_model.n_failures - 2 for action_model in actions.values()] + ) + elif isinstance(reference_model, ModelMO): + expected_memory_length_for_inf = sum( + [ + action_model.models[0].n_successes + action_model.models[0].n_failures - 2 + for action_model in actions.values() + ] + ) + else: + raise ValueError(f"Model type {type(reference_model)} not supported.") + return expected_memory_length_for_inf + + def __init__( + self, + adaptive_window_size: Optional[Union[PositiveInt, Literal["inf"]]] = None, + delta: Optional[Probability] = None, + actions: Optional[Dict[ActionId, Model]] = None, + action_ids: Optional[Set[ActionId]] = None, + actions_memory: Optional[Deque] = None, + rewards_memory: Optional[Deque] = None, + kwargs: Optional[Dict[str, Any]] = None, + ): + actions = self._instantiate_actions(actions=actions, action_ids=action_ids, kwargs=kwargs) + super().__init__( + actions=actions, + adaptive_window_size=adaptive_window_size, + delta=delta, + actions_memory=actions_memory, + rewards_memory=rewards_memory, + ) + + def _validate_update_params( + self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]], **kwargs + ): + """ + Verify that the given list of action IDs is a subset of the currently defined actions and that + the rewards type matches the strategy type. + + Parameters + ---------- + actions : List[ActionId] + The selected action for each sample. + rewards: Union[List[BinaryReward], List[List[BinaryReward]]] + The reward for each sample. + """ + invalid = set(actions) - set(self.actions.keys()) + if invalid: + raise AttributeError(f"The following invalid action(s) were specified: {invalid}.") + if len(actions) != len(rewards): + raise AttributeError(f"Shape mismatch: actions and rewards should have the same length {len(actions)}.") + + @validate_call + def update(self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]], **kwargs): + """ + Update the models associated with the given actions using the provided rewards. + For adaptive window size, the update by resetting the action models and retraining them on the new data. + + Parameters + ---------- + actions : List[ActionId] + The selected action for each sample. + rewards: Union[List[BinaryReward], List[List[BinaryReward]]] + The reward for each sample. + """ + # Discuss with team: + # We do comparison withing memory, but neglect the full view of the data. + # What if beyond last change point we don't have any data for some action? (Extremely low probability) + # It is just cold started, but is it good for us? + # We need to think about it. + + self._validate_update_params(actions, rewards, **kwargs) + + if self.adaptive_window_size is not None: + memory_len = len(self.actions_memory) + self.actions_memory.extend(actions) + self.rewards_memory.extend(rewards) + if memory_len and (last_change_point := self._get_last_change_point()) != _NO_CHANGE_POINT: + self.actions_memory = type(self.actions_memory)( + self.actions_memory[i] for i in range(last_change_point, len(self.actions_memory)) + ) + self.rewards_memory = type(self.rewards_memory)( + self.rewards_memory[i] for i in range(last_change_point, len(self.rewards_memory)) + ) + + for action_model in self.actions.values(): + action_model.reset() + self._update_actions(self.actions_memory, self.rewards_memory, **kwargs) + else: + self._update_actions(actions, rewards, **kwargs) + else: + self._update_actions(actions, rewards, **kwargs) + + @abstractmethod + def _update_actions( + self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]], *args, **kwargs + ): + """ + Update the models associated with the given actions using the provided rewards. + + Parameters + ---------- + actions : List[ActionId] + The selected action for each sample. + rewards: Union[List[BinaryReward], List[List[BinaryReward]]] + The reward for each sample. + """ + pass + + def _get_last_change_point(self) -> NonNegativeInt: + """ + Get the last change point among all actions. + + Returns + ------- + NonNegativeInt + The last change point. 0 if no change point is found. + """ + change_points = [ + self._get_last_change_point_for_action(action_id=action_id) for action_id in self.actions.keys() + ] + return max(change_points) + + def _get_threshold(self, past_trials: PositiveInt, present_trials: PositiveInt) -> float: + """ + Get the threshold for the given past window and present window. + + Parameters + ---------- + past_trials : PositiveInt + The number of trials in the past window. + present_trials : PositiveInt + The number of trials in the present window. + + Returns + ------- + threshold : float + The threshold value. + """ + full_trials = past_trials + present_trials + threshold = past_trials * present_trials * np.sqrt(1 / (2 * full_trials) * np.log(4 * full_trials / self.delta)) + return threshold + + def _get_last_change_point_for_action(self, action_id: ActionId) -> int: + """ + Get the last change point for the given action. + + Parameters + ---------- + action_id : ActionId + The action ID. + + Returns + ------- + NonNegativeInt + The last change point for the given action. -1 if no change point is found. + """ + action_index = np.where([a == action_id for a in self.actions_memory])[0].tolist() + rewards_window = [self.rewards_memory[i] for i in action_index] + window_length = len(rewards_window) + cumulative_reward = np.cumsum(np.array(rewards_window), axis=0) + if cumulative_reward.ndim == 1: + cumulative_reward = cumulative_reward[:, np.newaxis] + reference_model = self.actions[action_id] + expected_memory_length_for_inf = self._get_expected_memory_length(actions={action_id: reference_model}) + if self.adaptive_window_size == "inf" and expected_memory_length_for_inf == len(self.actions_memory): + current_sum = 0 + current_trials = 0 + start_index = 1 + else: + action_model = self.actions[action_id] + current_sum = action_model.n_successes - 1 + current_trials = action_model.n_successes + action_model.n_failures - 2 + start_index = 0 + + for i in reversed(range(start_index, window_length)): + if start_index == 0: + past_sum = current_sum + else: + past_sum = current_sum + cumulative_reward[i - 1] + present_sum = cumulative_reward[-1] - cumulative_reward[i - 1] + if abs(past_sum * (window_length - i) - present_sum * (i + current_trials)) > self._get_threshold( + past_trials=i + current_trials, present_trials=window_length - i + ): + return action_index[i] + + return _NO_CHANGE_POINT + + @classmethod + def _instantiate_actions( + cls, actions: Optional[Dict[ActionId, Model]], action_ids: Optional[Set[ActionId]], kwargs + ): + """ + Utility function to instantiate the action models based on the provided kwargs. + + Parameters + ---------- + actions : Optional[Dict[ActionId, Model]] + The list of possible actions and their associated models. + action_ids : Optional[Set[ActionId]] + The list of possible actions. + kwargs : Dict[str, Any] + Additional parameters for the mab and for the action model. + + Returns + ------- + actions : Dict[ActionId, Model] + Dictionary of actions and the parameters of their associated model. + """ + if actions is None: + action_specific_kwargs = cls._extract_action_specific_kwargs(kwargs) + + # Extract inner_action_ids + inner_action_ids = action_ids or set(action_specific_kwargs.keys()) + if not inner_action_ids: + raise ValueError( + "inner_action_ids should be provided either directly or via keyword argument in the form of " + "action_id_{model argument name} = {action_id: value}." + ) + action_model_start = cls._get_action_model_start_method(True) + action_general_kwargs = cls._extract_action_model_class_and_attributes(kwargs, action_model_start) + actions = {} + for a in inner_action_ids: + actions[a] = action_model_start(**action_general_kwargs, **action_specific_kwargs.get(a, {})) + + if all(isinstance(potential_model, Dict) for potential_model in actions.values()): + action_model_start = cls._get_action_model_start_method(False) + state_actions = actions.copy() + actions = {} + for action_id, action_state in state_actions.items(): + actions[action_id] = action_model_start(**action_state) + + return actions + + @staticmethod + def _extract_action_specific_kwargs(kwargs) -> Dict[ActionId, Dict]: + """ + Utility function to extract kwargs that are specific for each action when constructing the action model. + + Parameters + ---------- + kwargs : Dict[str, Any] + Additional parameters for the mab and for the action model. + + Returns + ------- + action_specific_kwargs : Dict[str, Dict] + Dictionary of actions and the parameters of their associated model. + kwargs : Dict[str, Any] + Dictionary of parameters and their values, without the action_specific_kwargs. + """ + action_specific_kwargs = defaultdict(dict) + for keyword in list(kwargs.keys()): + argument = kwargs[keyword] + if keyword.startswith(ACTION_IDS_PREFIX) and type(argument) is dict: + kwargs.pop(keyword) + inner_keyword = keyword.split(ACTION_IDS_PREFIX)[1] + for action_id, value in argument.items(): + action_specific_kwargs[action_id][inner_keyword] = value + if keyword == ACTIONS and type(argument) is dict: + kwargs.pop(keyword) + action_specific_kwargs.update(argument) + return dict(action_specific_kwargs) + + @classmethod + def _extract_action_model_class_and_attributes( + cls, kwargs: Dict[str, Any], action_model_start: Callable + ) -> Dict[str, Dict]: + """ + Utility function to extract kwargs that are specific for each action when constructing the action model. + + Parameters + ---------- + kwargs : Dict[str, Any] + Additional parameters for the mab and for the action model. + action_model_start : Callable + Function handle for the action model start: either cold start or init. + + Returns + ------- + action_model_cold_start : Callable + Function handle for factoring the required action model. + action_general_kwargs : Dict[str, any] + Dictionary of parameters and their values for the action model. + """ + if isclass(action_model_start): + action_model_attributes = list(action_model_start.model_fields.keys()) + else: + action_model_attributes = extract_argument_names_from_function(action_model_start, True) + + action_general_kwargs = {k: kwargs.pop(k) for k in action_model_attributes if k in kwargs.keys()} + return action_general_kwargs + + @classmethod + def _get_action_model_start_method(cls, cold_start_mode: bool) -> Callable: + action_model_class = cls._get_field_type("actions") + if cold_start_mode and hasattr(action_model_class, "cold_start"): + action_model_start = action_model_class.cold_start + else: + action_model_start = action_model_class + return action_model_start + + +class SmabActionsManager(ActionsManager, GenericModel, Generic[SmabModelType]): + """ + Manages actions and their associated models for sMAB models. + The class allows to account for non-stationarity by providing an adaptive window scheme for action update. + + Parameters + ---------- + actions : Dict[ActionId, BaseBeta] + The list of possible actions, and their associated Model. + adaptive_window_size : Optional[Union[PositiveInt, Literal["inf"]]] + The size of the adaptive window for action update. If None, no adaptive window is used. + delta : Optional[Probability], 0.1 if not specified. + The confidence level for the adaptive window. + """ + + actions: Dict[ActionId, SmabModelType] + + @field_validator("actions", mode="after") + @classmethod + def all_actions_have_same_number_of_objectives(cls, actions: Dict[ActionId, SmabModelType]): + n_objs_per_action = [len(beta.models) if hasattr(beta, "models") else None for beta in actions.values()] + if len(set(n_objs_per_action)) != 1: + raise ValueError("All actions should have the same number of objectives") + return actions + + def _update_actions(self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]]): + """ + Update the models associated with the given actions using the provided rewards. + + Parameters + ---------- + actions : List[ActionId] + The selected action for each sample. + rewards: Union[List[BinaryReward], List[List[BinaryReward]]] + The reward for each sample. + """ + + rewards_dict = defaultdict(list) + + for a, r in zip(actions, rewards): + rewards_dict[a].append(r) + + for a in set(actions): + self.actions[a].update(rewards=rewards_dict[a]) + + +class CmabActionsManager(ActionsManager, GenericModel, Generic[CmabModelType]): + """ + Manages actions and their associated models for cMAB models. + The class allows to account for non-stationarity by providing an adaptive window scheme for action update. + + Parameters + ---------- + actions : Dict[ActionId, BayesianLogisticRegression] + The list of possible actions, and their associated Model. + adaptive_window_size : Optional[Union[PositiveInt, Literal["inf"]]] + The size of the adaptive window for action update. If None, no adaptive window is used. + delta : Optional[Probability], 0.1 if not specified. + The confidence level for the adaptive window. + """ + + actions: Dict[ActionId, CmabModelType] + + @field_validator("actions", mode="after") + @classmethod + def check_bayesian_logistic_regression_models(cls, v): + action_models = list(v.values()) + first_action = action_models[0] + first_action_type = type(first_action) + for action in action_models[1:]: + if not isinstance(action, first_action_type): + raise AttributeError("All actions should follow the same type.") + if not len(action.betas) == len(first_action.betas): + raise AttributeError("All actions should have the same number of betas.") + if not action.update_method == first_action.update_method: + raise AttributeError("All actions should have the same update method.") + if not action.update_kwargs == first_action.update_kwargs: + raise AttributeError("All actions should have the same update kwargs.") + return v + + def _validate_update_params( + self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]], context: ArrayLike + ): + """ + Verify that the given list of action IDs is a subset of the currently defined actions and that + the rewards type matches the strategy type. + + Parameters + ---------- + actions : List[ActionId] + The selected action for each sample. + rewards: Union[List[BinaryReward], List[List[BinaryReward]]] + The reward for each sample. + context: ArrayLike of shape (n_samples, n_features) + Matrix of contextual features. + """ + super()._validate_update_params(actions, rewards) + if len(context) != len(actions): + raise AttributeError(f"Shape mismatch: actions and context should have the same length {len(actions)}.") + + def _update_actions( + self, + actions: List[ActionId], + rewards: Union[List[BinaryReward], List[List[BinaryReward]]], + context: ArrayLike, + ): + """ + Update the models associated with the given actions using the provided rewards. + + Parameters + ---------- + actions : List[ActionId] + The selected action for each sample. + rewards: Union[List[BinaryReward], List[List[BinaryReward]]] + The reward for each sample. + context: ArrayLike of shape (n_samples, n_features) + Matrix of contextual features. + """ + # cast inputs to numpy arrays to facilitate their manipulation + context, actions, rewards = np.array(context), np.array(actions), np.array(rewards) + + for a in set(actions): + # get context and rewards of the samples associated to action a + context_of_a = context[actions == a] + rewards_of_a = rewards[actions == a].tolist() + + # update model associated to action a + self.actions[a].update(context=context_of_a, rewards=rewards_of_a) diff --git a/pybandits/base.py b/pybandits/base.py index 4cae4ad..e087b1d 100644 --- a/pybandits/base.py +++ b/pybandits/base.py @@ -21,7 +21,7 @@ # SOFTWARE. -from typing import Any, Dict, List, NewType, Tuple, Union +from typing import Any, Dict, List, NewType, Tuple, Union, _GenericAlias, get_args, get_origin from pybandits.pydantic_version_compatibility import ( PYDANTIC_VERSION_1, @@ -52,6 +52,7 @@ Union[Dict[ActionId, float], Dict[ActionId, Probability], Dict[ActionId, List[Probability]]], ) ACTION_IDS_PREFIX = "action_ids_" +ACTIONS = "actions" class _classproperty(property): @@ -96,6 +97,18 @@ def _apply_version_adjusted_method(self, v2_method_name: str, v1_method_name: st def _get_value_with_default(cls, key: str, values: Dict[str, Any]) -> Any: return values.get(key, cls.model_fields[key].default) + @classmethod + def _get_field_type(cls, key: str) -> Any: + if pydantic_version == PYDANTIC_VERSION_1: + annotation = cls.model_fields[key].type_ + elif pydantic_version == PYDANTIC_VERSION_2: + annotation = cls.model_fields[key].annotation + if isinstance(annotation, _GenericAlias) and get_origin(annotation) is dict: + annotation = get_args(annotation)[1] # refer to the type of the Dict values + else: + raise ValueError(f"Unsupported pydantic version: {pydantic_version}") + return annotation + if pydantic_version == PYDANTIC_VERSION_1: @_classproperty diff --git a/pybandits/cmab.py b/pybandits/cmab.py index 9b405a1..e4a928a 100644 --- a/pybandits/cmab.py +++ b/pybandits/cmab.py @@ -20,16 +20,18 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from typing import Dict, List, Optional, Set, Union +from abc import ABC +from typing import List, Optional, Set, Union from numpy import array from numpy.random import choice from numpy.typing import ArrayLike +from pybandits.actions_manager import CmabActionsManager from pybandits.base import ActionId, BinaryReward, CmabPredictions from pybandits.mab import BaseMab from pybandits.model import BayesianLogisticRegression, BayesianLogisticRegressionCC -from pybandits.pydantic_version_compatibility import field_validator, validate_call +from pybandits.pydantic_version_compatibility import validate_call from pybandits.strategy import ( BestActionIdentificationBandit, ClassicBandit, @@ -37,7 +39,7 @@ ) -class BaseCmabBernoulli(BaseMab): +class BaseCmabBernoulli(BaseMab, ABC): """ Base model for a Contextual Multi-Armed Bandit for Bernoulli bandits with Thompson Sampling. @@ -54,27 +56,10 @@ class BaseCmabBernoulli(BaseMab): bandit strategy. """ - actions: Dict[ActionId, BayesianLogisticRegression] + actions_manager: CmabActionsManager[BayesianLogisticRegression] predict_with_proba: bool predict_actions_randomly: bool - @field_validator("actions", mode="after") - @classmethod - def check_bayesian_logistic_regression_models(cls, v): - action_models = list(v.values()) - first_action = action_models[0] - first_action_type = type(first_action) - for action in action_models[1:]: - if not isinstance(action, first_action_type): - raise AttributeError("All actions should follow the same type.") - if not len(action.betas) == len(first_action.betas): - raise AttributeError("All actions should have the same number of betas.") - if not action.update_method == first_action.update_method: - raise AttributeError("All actions should have the same update method.") - if not action.update_kwargs == first_action.update_kwargs: - raise AttributeError("All actions should have the same update kwargs.") - return v - @validate_call(config=dict(arbitrary_types_allowed=True)) def predict( self, @@ -169,20 +154,7 @@ def update( If strategy is MultiObjectiveBandit, rewards should be a list of list, e.g. (with n_objectives=2): rewards = [[1, 1], [1, 0], [1, 1], [1, 0], [1, 1], ...] """ - self._validate_update_params(actions=actions, rewards=rewards) - if len(context) != len(rewards): - raise AttributeError(f"Shape mismatch: actions and rewards should have the same length {len(actions)}.") - - # cast inputs to numpy arrays to facilitate their manipulation - context, actions, rewards = array(context), array(actions), array(rewards) - - for a in set(actions): - # get context and rewards of the samples associated to action a - context_of_a = context[actions == a] - rewards_of_a = rewards[actions == a].tolist() - - # update model associated to action a - self.actions[a].update(context=context_of_a, rewards=rewards_of_a) + super().update(actions=actions, rewards=rewards, context=context) # always set predict_actions_randomly after update self.predict_actions_randomly = False @@ -208,7 +180,7 @@ class CmabBernoulli(BaseCmabBernoulli): bandit strategy. """ - actions: Dict[ActionId, BayesianLogisticRegression] + actions_manager: CmabActionsManager[BayesianLogisticRegression] strategy: ClassicBandit predict_with_proba: bool = False predict_actions_randomly: bool = False @@ -234,7 +206,7 @@ class CmabBernoulliBAI(BaseCmabBernoulli): bandit strategy. """ - actions: Dict[ActionId, BayesianLogisticRegression] + actions_manager: CmabActionsManager[BayesianLogisticRegression] strategy: BestActionIdentificationBandit predict_with_proba: bool = False predict_actions_randomly: bool = False @@ -268,7 +240,7 @@ class CmabBernoulliCC(BaseCmabBernoulli): bandit strategy. """ - actions: Dict[ActionId, BayesianLogisticRegressionCC] + actions_manager: CmabActionsManager[BayesianLogisticRegressionCC] strategy: CostControlBandit predict_with_proba: bool = True predict_actions_randomly: bool = False diff --git a/pybandits/mab.py b/pybandits/mab.py index 38c83b5..1626a0c 100644 --- a/pybandits/mab.py +++ b/pybandits/mab.py @@ -19,16 +19,23 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - -import warnings +import json from abc import ABC, abstractmethod -from collections import defaultdict -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, get_args +from inspect import isclass +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, + get_origin, +) import numpy as np +from pybandits.actions_manager import ActionsManager from pybandits.base import ( - ACTION_IDS_PREFIX, ActionId, ActionRewardLikelihood, BinaryReward, @@ -36,11 +43,10 @@ Predictions, PyBanditsBaseModel, ) -from pybandits.model import Model +from pybandits.model import BaseModel, Model from pybandits.pydantic_version_compatibility import ( PYDANTIC_VERSION_1, PYDANTIC_VERSION_2, - field_validator, model_validator, pydantic_version, validate_call, @@ -69,46 +75,46 @@ class BaseMab(PyBanditsBaseModel, ABC): which in turn will be used to instantiate the strategy. """ - actions: Dict[ActionId, Model] + actions_manager: ActionsManager strategy: Strategy epsilon: Optional[Float01] = None default_action: Optional[ActionId] = None def __init__( self, - actions: Dict[ActionId, Model], epsilon: Optional[Float01] = None, default_action: Optional[ActionId] = None, - **strategy_kwargs, + **kwargs, ): - if "strategy" in strategy_kwargs: - strategy = strategy_kwargs["strategy"] - if len(strategy_kwargs) > 1: - raise ValueError("strategy should be the only keyword argument.") + class_attributes = { + attribute_name: self._get_instantiated_class_attribute(attribute_name, kwargs) + for attribute_name in self._get_class_type_attributes() + } + if kwargs: + raise ValueError(f"Unknown arguments: {kwargs.keys()}") + super().__init__(**class_attributes, epsilon=epsilon, default_action=default_action) + + @classmethod + def _get_instantiated_class_attribute(cls, attribute_name: str, kwargs: Dict[str, Any]) -> PyBanditsBaseModel: + if attribute_name in kwargs: + attribute = kwargs[attribute_name] else: - strategy_class = self.model_fields["strategy"].annotation - strategy = strategy_class(**strategy_kwargs) + attribute_class = cls._get_attribute_type(attribute_name) + required_sub_attributes = extract_argument_names_from_function(attribute_class.__init__, True) + if not required_sub_attributes: # case of no native __init__ method, just pydantic generic __init__ + required_sub_attributes = list(attribute_class.model_fields.keys()) + sub_attributes = {k: kwargs.pop(k) for k in required_sub_attributes if k in kwargs} + else: + sub_attributes = {k: kwargs.pop(k) for k in required_sub_attributes if k in kwargs} + if "kwargs" in required_sub_attributes: + sub_attributes["kwargs"] = kwargs - super().__init__(actions=actions, strategy=strategy, epsilon=epsilon, default_action=default_action) + attribute = attribute_class(**sub_attributes) + kwargs.pop(attribute_name, None) + return attribute ############################################ Instance Input Validators ############################################# - @field_validator("actions", mode="before") - @classmethod - def at_least_one_action_is_defined(cls, v): - # validate number of actions - if len(v) == 0: - raise AttributeError("At least one action should be defined.") - elif len(v) == 1: - warnings.warn("Only a single action was supplied. This MAB will be deterministic.") - # validate that all actions are of the same configuration - action_models = list(v.values()) - first_action = action_models[0] - first_action_type = type(first_action) - if any(not isinstance(action, first_action_type) for action in action_models[1:]): - raise AttributeError("All actions should follow the same type.") - return v - if pydantic_version == PYDANTIC_VERSION_1: @model_validator(mode="before") @@ -118,7 +124,7 @@ def check_default_action(cls, values): default_action = cls._get_value_with_default("default_action", values) if not epsilon and default_action: raise AttributeError("A default action should only be defined when epsilon is defined.") - if default_action and default_action not in values["actions"]: + if default_action and default_action not in values["actions_manager"].actions: raise AttributeError("The default action must be valid action defined in the actions set.") return values @@ -153,10 +159,10 @@ def _get_valid_actions(self, forbidden_actions: Optional[Set[ActionId]]) -> Set[ """ if forbidden_actions is None: forbidden_actions = set() - - if not all(a in self.actions.keys() for a in forbidden_actions): + action_ids = set(self.actions.keys()) + if not all(a in action_ids for a in forbidden_actions): raise ValueError("forbidden_actions contains invalid action IDs.") - valid_actions = set(self.actions.keys()) - forbidden_actions + valid_actions = action_ids - forbidden_actions if len(valid_actions) == 0: raise ValueError("All actions are forbidden. You must allow at least 1 action.") if self.default_action and self.default_action not in valid_actions: @@ -164,41 +170,29 @@ def _get_valid_actions(self, forbidden_actions: Optional[Set[ActionId]]) -> Set[ return valid_actions - def _validate_update_params( - self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]] - ): - """ - Verify that the given list of action IDs is a subset of the currently defined actions and that - the rewards type matches the strategy type. - - Parameters - ---------- - actions : List[ActionId] - The selected action for each sample. - rewards: List[Union[BinaryReward, List[BinaryReward]]] - The reward for each sample. - """ - invalid = set(actions) - set(self.actions.keys()) - if invalid: - raise AttributeError(f"The following invalid action(s) were specified: {invalid}.") - if len(actions) != len(rewards): - raise AttributeError(f"Shape mismatch: actions and rewards should have the same length {len(actions)}.") - #################################################################################################################### - @abstractmethod + @property + def actions(self) -> Dict[ActionId, Model]: + return self.actions_manager.actions + @validate_call - def update( - self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]], *args, **kwargs - ): + def update(self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]], **kwargs): """ Update the multi-armed bandit model. + Parameters + ---------- actions: List[ActionId] The selected action for each sample. - rewards: List[Union[BinaryReward, List[BinaryReward]]] - The reward for each sample. + rewards : List[Union[BinaryReward, List[BinaryReward]]] of shape (n_samples, n_objectives) + The binary reward for each sample. + If strategy is not MultiObjectiveBandit, rewards should be a list, e.g. + rewards = [1, 0, 1, 1, 1, ...] + If strategy is MultiObjectiveBandit, rewards should be a list of list, e.g. (with n_objectives=2): + rewards = [[1, 1], [1, 0], [1, 1], [1, 0], [1, 1], ...] """ + self.actions_manager.update(actions=actions, rewards=rewards, **kwargs) @abstractmethod @validate_call @@ -234,14 +228,15 @@ def get_state(self) -> (str, dict): The internal state of the model (actions, scores, etc.). """ model_name = self.__class__.__name__ - state: dict = self._apply_version_adjusted_method("model_dump", "dict") + json_state = self._apply_version_adjusted_method("model_dump_json", "json") + state = json.loads(json_state) return model_name, state @validate_call def _select_epsilon_greedy_action( self, p: ActionRewardLikelihood, - actions: Optional[Dict[ActionId, Model]] = None, + actions: Optional[Dict[ActionId, BaseModel]] = None, ) -> ActionId: """ Wraps self.strategy.select_action function with epsilon-greedy strategy, @@ -300,15 +295,67 @@ def from_state(cls, state: dict) -> "BaseMab": """ model_attributes = extract_argument_names_from_function(cls.__init__, True) - strategy_attributes = list(state["strategy"].keys()) - attributes_mapping = {k: state[k] for k in model_attributes if k not in strategy_attributes and k in state} - attributes_mapping.update({k: state["strategy"][k] for k in strategy_attributes}) - return cls(**attributes_mapping) + class_attributes = { + attribute_name: list(state[attribute_name].keys()) for attribute_name in cls._get_class_type_attributes() + } + flattened_class_attributes = [item for sublist in class_attributes.values() for item in sublist] + class_attributes_mapping = { + k: state[k] for k in model_attributes if k not in flattened_class_attributes and k in state + } + class_attributes_mapping.update( + { + k: state[attribute_name][k] + for attribute_name, sub_class_attributes in class_attributes.items() + for k in sub_class_attributes + } + ) + return cls(**class_attributes_mapping) + + @classmethod + def from_old_state(cls, state: dict) -> "BaseMab": + """ + Create a new instance of the class from a given model state. + The state can be obtained by applying get_state() to a model. + + Parameters + ---------- + state: dict + The internal state of a model (actions, strategy, etc.) of the same type. + The state is expected to be in the old format of PyBandits < 2.0.0. + + Returns + ------- + model: BaseMab + The new model instance. + + """ + if "actions" not in state: + raise ValueError("The state is expected to be in the old format of PyBandits < 2.0.0.") + state["actions_manager"] = {} + state["actions_manager"]["actions"] = state.pop("actions") + + return cls.from_state(state) + + @classmethod + def _get_class_type_attributes(cls) -> List[str]: + return [ + attribute_name + for attribute_name in cls.model_fields.keys() + if isclass(class_ := cls._get_attribute_type(attribute_name)) + and issubclass( + class_, + PyBanditsBaseModel, + ) + ] + + @classmethod + def _get_attribute_type(cls, attribute_name: str) -> PyBanditsBaseModel: + attribute_type = cls._get_field_type(attribute_name) + return get_origin(attribute_type) or attribute_type @classmethod def cold_start( cls, - action_ids: Optional[Set[ActionId]] = None, epsilon: Optional[Float01] = None, default_action: Optional[ActionId] = None, **kwargs, @@ -319,8 +366,6 @@ def cold_start( Parameters ---------- - action_ids: Optional[Set[ActionId]] - The list of possible actions. epsilon: Optional[Float01] epsilon for epsilon-greedy approach. If None, epsilon-greedy is not used. default_action: Optional[ActionId] @@ -334,85 +379,12 @@ def cold_start( mab: BaseMab Multi-Armed Bandit """ - action_specific_kwargs, kwargs = cls._extract_action_specific_kwargs(**kwargs) - - # Extract inner_action_ids - inner_action_ids = action_ids or set(action_specific_kwargs.keys()) - if not inner_action_ids: - raise ValueError( - "inner_action_ids should be provided either directly or via keyword argument in the form of " - "action_id_{model argument name} = {action_id: value}." - ) - - # Assign model for each action - action_model_cold_start, action_general_kwargs = cls._extract_action_model_class_and_attributes(**kwargs) - actions = {} - for a in inner_action_ids: - actions[a] = action_model_cold_start(**action_general_kwargs, **action_specific_kwargs.get(a, {})) # Instantiate the MAB - strategy_kwargs = {k: kwargs[k] for k in kwargs.keys() if k not in action_general_kwargs.keys()} - strategy_class = cls.model_fields["strategy"].annotation - strategy = strategy_class(**strategy_kwargs) - mab = cls(actions=actions, strategy=strategy, epsilon=epsilon, default_action=default_action) + mab = cls(epsilon=epsilon, default_action=default_action, **kwargs) + # For contextual multi-armed bandit, until the very first update the model will predict actions randomly, # where each action has equal probability to be selected. if hasattr(mab, "predict_actions_randomly"): mab.predict_actions_randomly = True return mab - - @staticmethod - def _extract_action_specific_kwargs(**kwargs) -> Tuple[Dict[str, Dict], Dict[str, Any]]: - """ - Utility function to extract kwargs that are specific for each action when constructing the action model. - - Parameters - ---------- - kwargs : Dict[str, Any] - Additional parameters for the mab and for the action model. - - Returns - ------- - action_specific_kwargs : Dict[str, Dict] - Dictionary of actions and the parameters of their associated model. - kwargs : Dict[str, Any] - Dictionary of parameters and their values, without the action_specific_kwargs. - """ - action_specific_kwargs = defaultdict(dict) - for keyword in list(kwargs): - argument = kwargs[keyword] - if keyword.startswith(ACTION_IDS_PREFIX) and type(argument) is dict: - kwargs.pop(keyword) - inner_keyword = keyword.split(ACTION_IDS_PREFIX)[1] - for action_id, value in argument.items(): - action_specific_kwargs[action_id][inner_keyword] = value - return dict(action_specific_kwargs), kwargs - - @classmethod - def _extract_action_model_class_and_attributes(cls, **kwargs) -> Tuple[Callable, Dict[str, Dict]]: - """ - Utility function to extract kwargs that are specific for each action when constructing the action model. - - Parameters - ---------- - kwargs : Dict[str, Any] - Additional parameters for the mab and for the action model. - - Returns - ------- - action_model_cold_start : Callable - Function handle for factoring the required action model. - action_general_kwargs : Dict[str, any] - Dictionary of parameters and their values for the action model. - """ - action_model_class = get_args(cls.model_fields["actions"].annotation)[1] - if hasattr(action_model_class, "cold_start"): - action_model_cold_start_init = action_model_cold_start = action_model_class.cold_start - else: - action_model_cold_start_init = action_model_class.__init__ - action_model_cold_start = action_model_class - - action_model_attributes = extract_argument_names_from_function(action_model_cold_start_init, True) - - action_general_kwargs = {k: kwargs[k] for k in action_model_attributes if k in kwargs.keys()} - return action_model_cold_start, action_general_kwargs diff --git a/pybandits/model.py b/pybandits/model.py index 2993645..102361e 100644 --- a/pybandits/model.py +++ b/pybandits/model.py @@ -22,7 +22,7 @@ import warnings from abc import ABC, abstractmethod from random import betavariate -from typing import Any, List, Literal, Optional, Tuple, Union +from typing import List, Literal, Optional, Tuple, TypeVar, Union import numpy as np import pymc.math as pmath @@ -50,27 +50,35 @@ UpdateMethods = Literal["MCMC", "VI"] -class Model(PyBanditsBaseModel, ABC): - """ - Class to model the prior distributions. - """ - +class BaseModel(PyBanditsBaseModel, ABC): @abstractmethod def sample_proba(self) -> Probability: """ Sample the probability of getting a positive reward. """ + @validate_call @abstractmethod - def update(self, rewards: List[Any]): + def update(self, rewards: Union[List[BinaryReward], List[List[BinaryReward]]], **kwargs): """ - Update the model parameters. + Update the model. + + Parameters + ---------- + rewards: Union[List[BinaryReward], List[List[BinaryReward]]] + A list of binary rewards. """ + @abstractmethod + def reset(self): + """ + Reset the model. + """ -class BaseBeta(Model): + +class Model(BaseModel, ABC): """ - Beta Distribution model for Bernoulli multi-armed bandits. + Class to model the prior distributions. Parameters ---------- @@ -83,6 +91,105 @@ class BaseBeta(Model): n_successes: PositiveInt = 1 n_failures: PositiveInt = 1 + @validate_call + def update(self, rewards: List[BinaryReward], **kwargs): + """ + Update n_successes and n_failures. + + Parameters + ---------- + rewards: List[BinaryReward] + A list of binary rewards. + """ + self.n_successes += sum(rewards) + self.n_failures += len(rewards) - sum(rewards) + self._update(rewards=rewards, **kwargs) + + @abstractmethod + def _update(self, rewards: List[BinaryReward], **kwargs): + """ + Update the model. + + Parameters + ---------- + rewards: List[BinaryReward] + A list of binary rewards. + """ + + def reset(self): + """ + Reset the model. + """ + self.n_successes = 1 + self.n_failures = 1 + self._reset() + + @abstractmethod + def _reset(self): + """ + Reset the model. + """ + + +class ModelMO(BaseModel, ABC): + """ + Multi-objective extension of Model. + Parameters + ---------- + models : List[Model] + List of models. + """ + + if pydantic_version == PYDANTIC_VERSION_1: + models: List[Model] = Field(..., min_items=1) + elif pydantic_version == PYDANTIC_VERSION_2: + models: List[Model] = Field(..., min_length=1) + else: + raise ValueError("Invalid version.") + + @validate_call + def sample_proba(self, **kwargs) -> List[Probability]: + """ + Sample the probability of getting a positive reward. + Returns + ------- + prob: List[Probability] + Probabilities of getting a positive reward for each objective. + """ + return [x.sample_proba(**kwargs) for x in self.models] + + @validate_call + def update(self, rewards: List[List[BinaryReward]], **kwargs): + """ + Update the Beta model using the provided rewards. + Parameters + ---------- + rewards: List[List[BinaryReward]] + A list of rewards, where each reward is in turn a list containing the reward of the Beta model + associated to each objective. + For example, `[[1, 1], [1, 0], [1, 1], [1, 0], [1, 1]]`. + kwargs: Dict[str, Any] + Additional arguments for the Bayesian Logistic Regression MO child model. + """ + if any(len(x) != len(self.models) for x in rewards): + raise AttributeError("The shape of rewards is incorrect") + + for i, model in enumerate(self.models): + model.update(rewards=[r[i] for r in rewards], **kwargs) + + def reset(self): + """ + Reset the model. + """ + for model in self.models: + model.reset() + + +class BaseBeta(Model): + """ + Beta Distribution model for Bernoulli multi-armed bandits. + """ + @model_validator(mode="before") @classmethod def both_or_neither_counters_are_defined(cls, values): @@ -111,19 +218,6 @@ def std(self) -> float: """ return sqrt((self.n_successes * self.n_failures) / (self.count * (self.count - 1))) - @validate_call - def update(self, rewards: List[BinaryReward]): - """ - Update n_successes and and n_failures. - - Parameters - ---------- - rewards: List[BinaryReward] - A list of binary rewards. - """ - self.n_successes += sum(rewards) - self.n_failures += len(rewards) - sum(rewards) - def sample_proba(self) -> Probability: """ Sample the probability of getting a positive reward. @@ -135,6 +229,20 @@ def sample_proba(self) -> Probability: """ return betavariate(self.n_successes, self.n_failures) # type: ignore + def _update(self, rewards: List[BinaryReward], **kwargs): + """ + Update the model. + + Parameters + ---------- + rewards: List[BinaryReward] + A list of binary rewards. + """ + pass + + def _reset(self): + pass + class Beta(BaseBeta): """ @@ -155,17 +263,17 @@ class BetaCC(BaseBeta): cost: NonNegativeFloat -class BetaMO(Model): +class BetaMO(ModelMO): """ Beta Distribution model for Bernoulli multi-armed bandits with multi-objectives. Parameters ---------- - counters: List[Beta] of shape (n_objectives,) + models: List[Beta] of shape (n_objectives,) List of Beta distributions. """ - counters: List[Beta] + models: List[Beta] @validate_call def sample_proba(self) -> List[Probability]: @@ -177,10 +285,10 @@ def sample_proba(self) -> List[Probability]: prob: List[Probability] Probabilities of getting a positive reward for each objective. """ - return [x.sample_proba() for x in self.counters] + return [x.sample_proba() for x in self.models] @validate_call - def update(self, rewards: List[List[BinaryReward]]): + def _update(self, rewards: List[List[BinaryReward]]): """ Update the Beta model using the provided rewards. @@ -191,10 +299,10 @@ def update(self, rewards: List[List[BinaryReward]]): associated to each objective. For example, `[[1, 1], [1, 0], [1, 1], [1, 0], [1, 1]]`. """ - if any(len(x) != len(self.counters) for x in rewards): + if any(len(x) != len(self.models) for x in rewards): raise AttributeError("The shape of rewards is incorrect") - for i, counter in enumerate(self.counters): + for i, counter in enumerate(self.models): counter.update([r[i] for r in rewards]) @classmethod @@ -222,10 +330,14 @@ def cold_start(cls, n_objectives: PositiveInt, **kwargs) -> "BetaMO": blr: BayesianLogisticRegrssion The Bayesian Logistic Regression model. """ - counters = n_objectives * [Beta()] - blr = cls(counters=counters, **kwargs) + models = n_objectives * [Beta()] + blr = cls(models=models, **kwargs) return blr + def _reset(self): + for model in self.models: + model._reset() + class BetaMOCC(BetaMO): """ @@ -233,7 +345,7 @@ class BetaMOCC(BetaMO): Parameters ---------- - counters: List[BetaCC] of shape (n_objectives,) + models: List[BetaCC] of shape (n_objectives,) List of Beta distributions. cost: NonNegativeFloat Cost associated to the Beta distribution. @@ -242,6 +354,9 @@ class BetaMOCC(BetaMO): cost: NonNegativeFloat +SmabModelType = TypeVar("SmabModelType", bound=Union[BaseBeta, BetaMO]) + + class StudentT(PyBanditsBaseModel): """ Student's t-distribution. @@ -432,7 +547,7 @@ def sample_proba(self, context: ArrayLike) -> Tuple[Probability, float]: return prob, weighted_sum @validate_call(config=dict(arbitrary_types_allowed=True)) - def update(self, context: ArrayLike, rewards: List[BinaryReward]): + def _update(self, context: ArrayLike, rewards: List[BinaryReward]): """ Update the model parameters. @@ -443,7 +558,6 @@ def update(self, context: ArrayLike, rewards: List[BinaryReward]): rewards: List[BinaryReward] A list of binary rewards. """ - # check input args self.check_context_matrix(context=context) if len(context) != len(rewards): @@ -535,6 +649,10 @@ def cold_start( **kwargs, ) + def _reset(self): + self.alpha = StudentT() + self.betas = [StudentT() for _ in range(len(self.betas))] + class BayesianLogisticRegressionCC(BayesianLogisticRegression): """ @@ -563,3 +681,6 @@ class BayesianLogisticRegressionCC(BayesianLogisticRegression): """ cost: NonNegativeFloat + + +CmabModelType = TypeVar("CmabModelType", bound=BayesianLogisticRegression) diff --git a/pybandits/pydantic_version_compatibility.py b/pybandits/pydantic_version_compatibility.py index b032ecd..c40f28d 100644 --- a/pybandits/pydantic_version_compatibility.py +++ b/pybandits/pydantic_version_compatibility.py @@ -39,6 +39,7 @@ conint, constr, ) +from pydantic.generics import GenericModel from pydantic.version import VERSION as _VERSION # Define the pydantic versions @@ -278,4 +279,5 @@ def _convert_config_param(config: Dict[str, Any], v2_name: str, v1_name: str) -> "constr", "Field", "PrivateAttr", + "GenericModel", ] diff --git a/pybandits/smab.py b/pybandits/smab.py index 614434c..bb32ba2 100644 --- a/pybandits/smab.py +++ b/pybandits/smab.py @@ -20,10 +20,10 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - -from collections import defaultdict +from abc import ABC from typing import Dict, List, Optional, Set, Union +from pybandits.actions_manager import SmabActionsManager from pybandits.base import ( ActionId, BinaryReward, @@ -32,7 +32,7 @@ ) from pybandits.mab import BaseMab from pybandits.model import BaseBeta, Beta, BetaCC, BetaMO, BetaMOCC -from pybandits.pydantic_version_compatibility import PositiveInt, field_validator, validate_call +from pybandits.pydantic_version_compatibility import PositiveInt, validate_call from pybandits.strategy import ( BestActionIdentificationBandit, ClassicBandit, @@ -43,7 +43,7 @@ ) -class BaseSmabBernoulli(BaseMab): +class BaseSmabBernoulli(BaseMab, ABC): """ Base model for a Stochastic Bernoulli Multi-Armed Bandit with Thompson Sampling. @@ -55,7 +55,7 @@ class BaseSmabBernoulli(BaseMab): The strategy used to select actions. """ - actions: Dict[ActionId, BaseBeta] + actions_manager: SmabActionsManager[BaseBeta] @validate_call def predict( @@ -111,16 +111,7 @@ def update(self, actions: List[ActionId], rewards: Union[List[BinaryReward], Lis If strategy is MultiObjectiveBandit, rewards should be a list of list, e.g. (with n_objectives=2): rewards = [[1, 1], [1, 0], [1, 1], [1, 0], [1, 1], ...] """ - - self._validate_update_params(actions=actions, rewards=rewards) - - rewards_dict = defaultdict(list) - - for a, r in zip(actions, rewards): - rewards_dict[a].append(r) - - for a in set(actions): - self.actions[a].update(rewards=rewards_dict[a]) + super().update(actions=actions, rewards=rewards) class SmabBernoulli(BaseSmabBernoulli): @@ -138,7 +129,7 @@ class SmabBernoulli(BaseSmabBernoulli): The strategy used to select actions. """ - actions: Dict[ActionId, Beta] + actions_manager: SmabActionsManager[Beta] strategy: ClassicBandit @@ -157,7 +148,7 @@ class SmabBernoulliBAI(BaseSmabBernoulli): The strategy used to select actions. """ - actions: Dict[ActionId, Beta] + actions_manager: SmabActionsManager[Beta] strategy: BestActionIdentificationBandit @@ -184,7 +175,7 @@ class SmabBernoulliCC(BaseSmabBernoulli): The strategy used to select actions. """ - actions: Dict[ActionId, BetaCC] + actions_manager: SmabActionsManager[BetaCC] strategy: CostControlBandit @@ -201,17 +192,9 @@ class BaseSmabBernoulliMO(BaseSmabBernoulli): The strategy used to select actions. """ - actions: Dict[ActionId, BetaMO] + actions_manager: SmabActionsManager[BetaMO] strategy: Strategy - @field_validator("actions", mode="after") - @classmethod - def all_actions_have_same_number_of_objectives(cls, actions: Dict[ActionId, BetaMO]): - n_objs_per_action = [len(beta.counters) for beta in actions.values()] - if len(set(n_objs_per_action)) != 1: - raise ValueError("All actions should have the same number of objectives") - return actions - class SmabBernoulliMO(BaseSmabBernoulliMO): """ @@ -233,7 +216,7 @@ class SmabBernoulliMO(BaseSmabBernoulliMO): The strategy used to select actions. """ - actions: Dict[ActionId, BetaMO] + actions_manager: SmabActionsManager[BetaMO] strategy: MultiObjectiveBandit @@ -253,5 +236,5 @@ class SmabBernoulliMOCC(BaseSmabBernoulliMO): The strategy used to select actions. """ - actions: Dict[ActionId, BetaMOCC] + actions_manager: SmabActionsManager[BetaMOCC] strategy: MultiObjectiveCostControlBandit diff --git a/pybandits/strategy.py b/pybandits/strategy.py index a67be09..5fba36e 100644 --- a/pybandits/strategy.py +++ b/pybandits/strategy.py @@ -22,16 +22,18 @@ from abc import ABC, abstractmethod from random import random -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, TypeVar, Union import numpy as np from scipy.stats import ttest_ind_from_stats from typing_extensions import Self from pybandits.base import ActionId, Float01, Probability, PyBanditsBaseModel -from pybandits.model import Beta, BetaMOCC, Model +from pybandits.model import BaseModel, Beta, BetaMOCC, Model from pybandits.pydantic_version_compatibility import field_validator, validate_call +StrategyType = TypeVar("StrategyType", bound="Strategy") + class Strategy(PyBanditsBaseModel, ABC): """ @@ -60,7 +62,7 @@ def _with_argument(self, argument_name: str, argument_value: Any) -> Self: return mutated_strategy @abstractmethod - def select_action(self, p: Dict[ActionId, Probability], actions: Optional[Dict[ActionId, Model]]) -> ActionId: + def select_action(self, p: Dict[ActionId, Probability], actions: Optional[Dict[ActionId, BaseModel]]) -> ActionId: """ Select the action. """ @@ -245,7 +247,7 @@ def _average(cls, p_of_action: Union[Probability, List[Probability]]): def _evaluate_and_select( cls, p: Union[Dict[ActionId, Probability], Dict[ActionId, List[Probability]]], - actions: Dict[ActionId, Model], + actions: Dict[ActionId, BaseModel], feasible_actions: List[ActionId], ) -> ActionId: """ diff --git a/pyproject.toml b/pyproject.toml index fb3e0e8..ada11d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pybandits" -version = "1.1.0" +version = "2.0.0" description = "Python Multi-Armed Bandit Library" authors = [ "Dario d'Andrea ", @@ -16,7 +16,7 @@ readme = "README.md" python = ">=3.8.1,<3.12" loguru = "^0.6" numpy = "^1.23" -pydantic = "1.10.*" +pydantic = ">=1.10.*,<3" scipy = "^1.9" pymc = "^5.3" scikit-learn = "^1.1" diff --git a/tests/test_actions_manager.py b/tests/test_actions_manager.py new file mode 100644 index 0000000..c401d39 --- /dev/null +++ b/tests/test_actions_manager.py @@ -0,0 +1,91 @@ +from typing import List, Union + +import pytest +from hypothesis import given +from hypothesis import strategies as st + +from pybandits.actions_manager import ActionsManager, CmabActionsManager, SmabActionsManager +from pybandits.base import ActionId, BinaryReward +from pybandits.model import BayesianLogisticRegression, Beta + + +class DummyActionsManager(ActionsManager): + def _update_actions( + self, actions: List[ActionId], rewards: Union[List[BinaryReward], List[List[BinaryReward]]], *args, **kwargs + ): + pass + + +def test_init_with_valid_actions(): + actions = {"action1": Beta(), "action2": Beta()} + manager = DummyActionsManager(actions=actions) + assert len(manager.actions) == 2 + assert manager.adaptive_window_size is None + assert manager.delta is None + + +def test_update_with_valid_inputs(action_list=("action1", "action2", "action1"), rewards=(1, 0, 1)): + actions = {"action1": Beta(), "action2": Beta()} + manager = DummyActionsManager(actions=actions, adaptive_window_size="inf") + + manager.update(actions=list(action_list), rewards=list(rewards)) + assert list(manager.actions_memory) == list(action_list) + assert list(manager.rewards_memory) == list(rewards) + + +def test_empty_actions_raises_error(): + with pytest.raises(AttributeError) as exc_info: + DummyActionsManager(actions={}) + assert str(exc_info.value) == "At least one action should be defined." + + +def test_single_action_warning(): + with pytest.warns(UserWarning) as warning_info: + DummyActionsManager(actions={"action1": Beta()}) + assert str(warning_info[0].message) == "Only a single action was supplied. This MAB will be deterministic." + + +def test_mixed_action_types_error(n_features=1): + actions = {"action1": BayesianLogisticRegression.cold_start(n_features=n_features), "action2": Beta()} + with pytest.raises((AttributeError, TypeError)): + SmabActionsManager[Beta](actions=actions) + with pytest.raises((AttributeError, TypeError)): + CmabActionsManager[BayesianLogisticRegression](actions=actions) + + +def test_invalid_memory_initialization(n_actions=1, int_adaptive_window_size=5): + actions = {f"action{i}": Beta() for i in range(n_actions)} + with pytest.raises(AttributeError): + DummyActionsManager(actions=actions, adaptive_window_size="inf", actions_memory=["action1"], rewards_memory=[]) + with pytest.raises(AttributeError): + DummyActionsManager(actions=actions, adaptive_window_size="inf", actions_memory=[], rewards_memory=[0]) + + with pytest.raises(AttributeError): # memory length should be 0 as action models are cold started + DummyActionsManager(actions=actions, adaptive_window_size="inf", actions_memory=[0], rewards_memory=[0]) + + with pytest.raises(AttributeError): + DummyActionsManager( + actions=actions, + adaptive_window_size=int_adaptive_window_size, + actions_memory=[0] * (int_adaptive_window_size + 1), + rewards_memory=[0] * (int_adaptive_window_size + 1), + ) + + +@given( + n_successes=st.just(10), + n_failures=st.just(1), + adaptive_window_size=st.sampled_from([10]), + delta=st.just(0.0001), + reference=st.just(5), +) +def test_change_detection(n_successes, n_failures, adaptive_window_size, delta, reference): + actions = {"action1": Beta(), "action2": Beta()} + manager = SmabActionsManager[Beta](actions=actions, adaptive_window_size=adaptive_window_size, delta=delta) + manager.update(actions=["action1"] * (n_successes - 1), rewards=[1] * (n_successes - 1)) + assert manager.actions["action1"].n_successes == n_successes + assert manager.actions["action1"].n_failures == n_failures + manager.update(actions=["action1"] * 10, rewards=[0] * 10) + assert manager.actions["action1"].n_successes == 1 + assert manager.actions["action1"].n_failures == reference + assert list(manager.actions_memory) == ["action1"] * (reference - 1) diff --git a/tests/test_cmab.py b/tests/test_cmab.py index 208f381..16f3f10 100644 --- a/tests/test_cmab.py +++ b/tests/test_cmab.py @@ -27,6 +27,7 @@ import pytest from hypothesis import given, settings from hypothesis import strategies as st +from hypothesis.strategies import composite from pybandits.base import Float01 from pybandits.cmab import CmabBernoulli, CmabBernoulliBAI, CmabBernoulliCC @@ -46,8 +47,8 @@ def _apply_update_method_to_state(state, update_method): - for action in state["actions"]: - state["actions"][action]["update_method"] = update_method + for model_state in state["actions_manager"]["actions"].values(): + model_state["update_method"] = update_method ######################################################################################################################## @@ -78,13 +79,13 @@ def test_create_cmab_bernoulli_cold_start(a_int): @settings(deadline=500) @given(st.integers(min_value=1, max_value=10)) def test_cmab_can_instantiate(n_features): - with pytest.raises(TypeError): + with pytest.raises(ValueError): CmabBernoulli() with pytest.raises(AttributeError): CmabBernoulli(actions={}) with pytest.warns(UserWarning): CmabBernoulli(actions={"a1": BayesianLogisticRegression.cold_start(n_features=n_features)}) - with pytest.raises(ValidationError): # predict_with_proba is not an argument of init + with pytest.raises(ValueError): # predict_with_proba is not an argument of init CmabBernoulli( actions={ "a1": BayesianLogisticRegression.cold_start(n_features=n_features), @@ -92,7 +93,7 @@ def test_cmab_can_instantiate(n_features): }, predict_with_proba=True, ) - with pytest.raises(ValidationError): + with pytest.raises((ValidationError, TypeError)): CmabBernoulli( actions={ "a1": None, @@ -164,7 +165,7 @@ def test_cmab_init_with_wrong_blr_models(n_features, other_n_features, update_me ) -@settings(deadline=60000) +@settings(deadline=None) @given(st.just(100), st.just(3), st.sampled_from(literal_update_methods)) def test_cmab_update(n_samples, n_features, update_method): actions = np.random.choice(["a1", "a2"], size=n_samples).tolist() @@ -205,7 +206,7 @@ def run_update(context): run_update(context=context) -@settings(deadline=10000) +@settings(deadline=None) @given(st.just(100), st.just(3), st.sampled_from(literal_update_methods)) def test_cmab_update_not_all_actions(n_samples, n_feat, update_method): actions = np.random.choice(["a3", "a4"], size=n_samples).tolist() @@ -365,7 +366,13 @@ def test_cmab_get_state(mu, sigma, n_features): cmab = CmabBernoulli(actions=actions) expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": {}, "predict_with_proba": False, "predict_actions_randomly": False, @@ -381,48 +388,102 @@ def test_cmab_get_state(mu, sigma, n_features): assert is_serializable(cmab_state), "Internal state is not serializable" -@settings(deadline=500) -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "alpha": st.fixed_dictionaries( +@composite +def cmab_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + "alpha": st.fixed_dictionaries( + { + "mu": st.floats(min_value=-100, max_value=100), + "nu": st.floats(min_value=0, max_value=100), + "sigma": st.floats(min_value=0, max_value=100), + } + ), + "betas": st.lists( + st.fixed_dictionaries( { "mu": st.floats(min_value=-100, max_value=100), "nu": st.floats(min_value=0, max_value=100), "sigma": st.floats(min_value=0, max_value=100), } ), - "betas": st.lists( - st.fixed_dictionaries( - { - "mu": st.floats(min_value=-100, max_value=100), - "nu": st.floats(min_value=0, max_value=100), - "sigma": st.floats(min_value=0, max_value=100), - } - ), - min_size=3, - max_size=3, - ), - }, - ), - min_size=2, + min_size=3, + max_size=3, + ), + }, ), - "strategy": st.fixed_dictionaries({}), - } - ), - update_method=st.sampled_from(literal_update_methods), -) -def test_cmab_from_state(state, update_method): + min_size=2, + ) + ) + + actions_manager = {"actions": actions} + strategy = {} + + state = {"actions_manager": actions_manager, "strategy": strategy} + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + + state["epsilon"] = epsilon + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + elif default_action_index := draw(st.sampled_from([None, 1])) is not None: + default_action = list(actions.keys())[default_action_index] + else: + default_action = None + state["default_action"] = default_action + update_method = draw(st.sampled_from(literal_update_methods)) _apply_update_method_to_state(state, update_method) + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum([a["n_successes"] + a["n_failures"] - 2 for a in actions.values()]) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + return state + + +@settings(deadline=500) +@given(state=cmab_state()) +def test_cmab_from_state(state): cmab = CmabBernoulli.from_state(state) assert isinstance(cmab, CmabBernoulli) + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + CmabBernoulli.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_cmab = CmabBernoulli.from_old_state(old_state) + assert old_cmab == cmab + actual_actions = to_serializable_dict(cmab.actions) # Normalize the dict - expected_actions = {k: {**v, **state["actions"][k]} for k, v in actual_actions.items()} + expected_actions = {k: {**v, **state["actions_manager"]["actions"][k]} for k, v in actual_actions.items()} assert expected_actions == actual_actions # Ensure get_state and from_state compatibility @@ -471,13 +532,13 @@ def test_create_cmab_bernoulli_bai_cold_start(a_int): @settings(deadline=500) @given(st.integers(min_value=1, max_value=10)) def test_cmab_bai_can_instantiate(n_features): - with pytest.raises(TypeError): + with pytest.raises(ValueError): CmabBernoulliBAI() with pytest.raises(AttributeError): CmabBernoulliBAI(actions={}) with pytest.warns(UserWarning): CmabBernoulliBAI(actions={"a1": BayesianLogisticRegression.cold_start(n_features=2)}) - with pytest.raises(ValidationError): # predict_with_proba is not an argument of init + with pytest.raises(ValueError): # predict_with_proba is not an argument of init CmabBernoulliBAI( actions={ "a1": BayesianLogisticRegression.cold_start(n_features=n_features), @@ -485,7 +546,7 @@ def test_cmab_bai_can_instantiate(n_features): }, predict_with_proba=True, ) - with pytest.raises(ValidationError): + with pytest.raises((ValidationError, TypeError)): CmabBernoulliBAI( actions={ "a1": None, @@ -552,7 +613,7 @@ def test_cmab_bai_predict(n_samples, n_features): assert len(selected_actions) == len(probs) == len(weighted_sums) == n_samples -@settings(deadline=10000) +@settings(deadline=None) @given(st.just(100), st.just(3), st.sampled_from(literal_update_methods)) def test_cmab_bai_update(n_samples, n_features, update_method): actions = np.random.choice(["a1", "a2"], size=n_samples).tolist() @@ -592,7 +653,13 @@ def test_cmab_bai_get_state(mu, sigma, n_features, exploit_p: Float01): cmab = CmabBernoulliBAI(actions=actions, exploit_p=exploit_p) expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": {"exploit_p": exploit_p}, "predict_with_proba": False, "predict_actions_randomly": False, @@ -608,52 +675,107 @@ def test_cmab_bai_get_state(mu, sigma, n_features, exploit_p: Float01): assert is_serializable(cmab_state), "Internal state is not serializable" -@settings(deadline=500) -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "alpha": st.fixed_dictionaries( +@composite +def cmab_bai_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + "alpha": st.fixed_dictionaries( + { + "mu": st.floats(min_value=-100, max_value=100), + "nu": st.floats(min_value=0, max_value=100), + "sigma": st.floats(min_value=0, max_value=100), + } + ), + "betas": st.lists( + st.fixed_dictionaries( { "mu": st.floats(min_value=-100, max_value=100), "nu": st.floats(min_value=0, max_value=100), "sigma": st.floats(min_value=0, max_value=100), } ), - "betas": st.lists( - st.fixed_dictionaries( - { - "mu": st.floats(min_value=-100, max_value=100), - "nu": st.floats(min_value=0, max_value=100), - "sigma": st.floats(min_value=0, max_value=100), - } - ), - min_size=3, - max_size=3, - ), - }, - ), - min_size=2, + min_size=3, + max_size=3, + ), + }, ), - "strategy": st.one_of( - st.just({}), - st.just({"exploit_p": None}), - st.builds(lambda x: {"exploit_p": x}, st.floats(min_value=0, max_value=1)), - ), - } - ), - update_method=st.sampled_from(literal_update_methods), -) -def test_cmab_bai_from_state(state, update_method): + min_size=2, + ) + ) + + actions_manager = {"actions": actions} + strategy = draw( + st.one_of( + st.just({}), + st.just({"exploit_p": None}), + st.builds(lambda x: {"exploit_p": x}, st.floats(min_value=0, max_value=1)), + ) + ) + state = {"actions_manager": actions_manager, "strategy": strategy} + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + + state["epsilon"] = epsilon + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + elif default_action_index := draw(st.sampled_from([None, 1])) is not None: + default_action = list(actions.keys())[default_action_index] + else: + default_action = None + state["default_action"] = default_action + update_method = draw(st.sampled_from(literal_update_methods)) _apply_update_method_to_state(state, update_method) + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum([a["n_successes"] + a["n_failures"] - 2 for a in actions.values()]) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + return state + + +@settings(deadline=500) +@given(state=cmab_bai_state()) +def test_cmab_bai_from_state(state): cmab = CmabBernoulliBAI.from_state(state) assert isinstance(cmab, CmabBernoulliBAI) + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + CmabBernoulliBAI.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_cmab = CmabBernoulliBAI.from_old_state(old_state) + assert old_cmab == cmab + actual_actions = to_serializable_dict(cmab.actions) # Normalize the dict - expected_actions = {k: {**v, **state["actions"][k]} for k, v in actual_actions.items()} + expected_actions = {k: {**v, **state["actions_manager"]["actions"][k]} for k, v in actual_actions.items()} assert expected_actions == actual_actions expected_exploit_p = cmab.strategy.get_expected_value_from_state(state, "exploit_p") @@ -707,13 +829,13 @@ def test_create_cmab_bernoulli_cc_cold_start(a_int): @settings(deadline=500) @given(st.integers(min_value=1, max_value=10)) def test_cmab_cc_can_instantiate(n_features): - with pytest.raises(TypeError): + with pytest.raises(ValueError): CmabBernoulliCC() with pytest.raises(AttributeError): CmabBernoulliCC(actions={}) with pytest.warns(UserWarning): CmabBernoulliCC(actions={"a1": BayesianLogisticRegressionCC.cold_start(n_features=n_features, cost=10)}) - with pytest.raises(ValidationError): # predict_with_proba is not an argument of init + with pytest.raises(ValueError): # predict_with_proba is not an argument of init CmabBernoulliCC( actions={ "a1": BayesianLogisticRegressionCC.cold_start(n_features=n_features, cost=10), @@ -721,7 +843,7 @@ def test_cmab_cc_can_instantiate(n_features): }, predict_with_proba=True, ) - with pytest.raises(ValidationError): + with pytest.raises((ValidationError, TypeError)): CmabBernoulliCC( actions={ "a1": None, @@ -821,8 +943,8 @@ def test_cmab_cc_update(n_samples, n_features, update_method): st.integers(min_value=1), st.integers(min_value=1), st.integers(min_value=2, max_value=100), - st.floats(min_value=0), - st.floats(min_value=0), + st.floats(min_value=0, max_value=1), + st.floats(min_value=0, max_value=1), st.floats(min_value=0, max_value=1), ) def test_cmab_cc_get_state( @@ -838,7 +960,13 @@ def test_cmab_cc_get_state( cmab = CmabBernoulliCC(actions=actions, subsidy_factor=subsidy_factor) expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": {"subsidy_factor": subsidy_factor}, "predict_with_proba": True, "predict_actions_randomly": False, @@ -854,53 +982,108 @@ def test_cmab_cc_get_state( assert is_serializable(cmab_state), "Internal state is not serializable" -@settings(deadline=500) -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "alpha": st.fixed_dictionaries( +@composite +def cmab_cc_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + "alpha": st.fixed_dictionaries( + { + "mu": st.floats(min_value=-100, max_value=100), + "nu": st.floats(min_value=0, max_value=100), + "sigma": st.floats(min_value=0, max_value=100), + } + ), + "betas": st.lists( + st.fixed_dictionaries( { "mu": st.floats(min_value=-100, max_value=100), "nu": st.floats(min_value=0, max_value=100), "sigma": st.floats(min_value=0, max_value=100), } ), - "betas": st.lists( - st.fixed_dictionaries( - { - "mu": st.floats(min_value=-100, max_value=100), - "nu": st.floats(min_value=0, max_value=100), - "sigma": st.floats(min_value=0, max_value=100), - } - ), - min_size=3, - max_size=3, - ), - "cost": st.floats(min_value=0), - }, - ), - min_size=2, + min_size=3, + max_size=3, + ), + "cost": st.floats(min_value=0, max_value=1), + }, ), - "strategy": st.one_of( - st.just({}), - st.just({"subsidy_factor": None}), - st.builds(lambda x: {"subsidy_factor": x}, st.floats(min_value=0, max_value=1)), - ), - } - ), - update_method=st.sampled_from(literal_update_methods), -) -def test_cmab_cc_from_state(state, update_method): + min_size=2, + ) + ) + + actions_manager = {"actions": actions} + strategy = draw( + st.one_of( + st.just({}), + st.just({"subsidy_factor": None}), + st.builds(lambda x: {"subsidy_factor": x}, st.floats(min_value=0, max_value=1)), + ) + ) + state = {"actions_manager": actions_manager, "strategy": strategy} + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + + state["epsilon"] = epsilon + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + elif default_action_index := draw(st.sampled_from([None, 1])) is not None: + default_action = list(actions.keys())[default_action_index] + else: + default_action = None + state["default_action"] = default_action + update_method = draw(st.sampled_from(literal_update_methods)) _apply_update_method_to_state(state, update_method) + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum([a["n_successes"] + a["n_failures"] - 2 for a in actions.values()]) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + return state + + +@settings(deadline=500) +@given(state=cmab_cc_state()) +def test_cmab_cc_from_state(state): cmab = CmabBernoulliCC.from_state(state) assert isinstance(cmab, CmabBernoulliCC) + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + CmabBernoulliCC.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_cmab = CmabBernoulliCC.from_old_state(old_state) + assert old_cmab == cmab + actual_actions = to_serializable_dict(cmab.actions) # Normalize the dict - expected_actions = {k: {**v, **state["actions"][k]} for k, v in actual_actions.items()} + expected_actions = {k: {**v, **state["actions_manager"]["actions"][k]} for k, v in actual_actions.items()} assert expected_actions == actual_actions expected_subsidy_factor = cmab.strategy.get_expected_value_from_state(state, "subsidy_factor") @@ -961,3 +1144,130 @@ def test_epsilon_greedy_cmab_cc_predict(n_samples, n_features): assert len(selected_actions) == n_samples assert probs == n_samples * [{"a1": 0.5, "a2": 0.5}] assert weighted_sums == n_samples * [{"a1": 0, "a2": 0}] + + +######################################################################################################################## + + +# Cmab with adaptive window size + + +@settings(deadline=500) +@given( + st.integers(min_value=1, max_value=1000), + st.integers(min_value=1, max_value=100), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_cmab_predict_cold_start(n_samples, n_features, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + with pytest.raises(AttributeError): + CmabBernoulli.cold_start( + action_ids={"a1", "a2"}, n_features=n_features, adaptive_window_size=adaptive_window_size, delta=delta + ) + else: + context = np.random.uniform(low=-1.0, high=1.0, size=(n_samples, n_features)) + + mab = CmabBernoulli.cold_start( + action_ids={"a1", "a2"}, n_features=n_features, adaptive_window_size=adaptive_window_size, delta=delta + ) + selected_actions, probs, weighted_sums = mab.predict(context=context) + assert mab.predict_actions_randomly + assert all([a in ["a1", "a2"] for a in selected_actions]) + assert len(selected_actions) == n_samples + assert probs == n_samples * [{"a1": 0.5, "a2": 0.5}] + assert weighted_sums == n_samples * [{"a1": 0, "a2": 0}] + + +@settings(deadline=500) +@given( + st.integers(min_value=1, max_value=100), + st.integers(min_value=1, max_value=3), + st.one_of( + st.integers(min_value=1, max_value=100), + st.none(), + st.just("inf"), + ), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_cmab_bai_predict(n_samples, n_features, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + context = np.random.uniform(low=-1.0, high=1.0, size=(n_samples, n_features)) + + mab = CmabBernoulliBAI.cold_start( + action_ids={"a1", "a2"}, n_features=n_features, adaptive_window_size=adaptive_window_size, delta=delta + ) + selected_actions, probs, weighted_sums = mab.predict(context=context) + assert mab.predict_actions_randomly + assert all([a in ["a1", "a2"] for a in selected_actions]) + assert len(selected_actions) == n_samples + assert probs == n_samples * [{"a1": 0.5, "a2": 0.5}] + assert weighted_sums == n_samples * [{"a1": 0, "a2": 0}] + + +@settings(deadline=500) +@given( + st.integers(min_value=1, max_value=100), + st.integers(min_value=1, max_value=3), + st.one_of( + st.integers(min_value=1, max_value=100), + st.none(), + st.just("inf"), + ), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_cmab_cc_predict(n_samples, n_features, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + context = np.random.uniform(low=-1.0, high=1.0, size=(n_samples, n_features)) + + # cold start + mab = CmabBernoulliCC.cold_start( + action_ids_cost={"a1": 10, "a2": 20.5}, + n_features=n_features, + adaptive_window_size=adaptive_window_size, + delta=delta, + ) + selected_actions, probs, weighted_sums = mab.predict(context=context) + assert mab.predict_actions_randomly + assert all([a in ["a1", "a2"] for a in selected_actions]) + assert len(selected_actions) == n_samples + assert probs == n_samples * [{"a1": 0.5, "a2": 0.5}] + assert weighted_sums == n_samples * [{"a1": 0, "a2": 0}] + + +@settings(deadline=500) +@given( + st.integers(min_value=20, max_value=100), + st.integers(min_value=1, max_value=3), + st.one_of( + st.integers(min_value=1, max_value=100), + st.just("inf"), + ), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_cmab_update(n_samples, n_features, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + with pytest.raises(AttributeError): + CmabBernoulli.cold_start( + action_ids={"a1", "a2"}, n_features=n_features, adaptive_window_size=adaptive_window_size, delta=delta + ) + else: + actions = np.random.choice(["a1", "a2"], size=n_samples).tolist() + rewards = np.random.choice([0, 1], size=n_samples).tolist() + context = np.random.uniform(low=-1.0, high=1.0, size=(n_samples, n_features)) + + mab = CmabBernoulli.cold_start( + action_ids={"a1", "a2"}, n_features=n_features, adaptive_window_size=adaptive_window_size, delta=delta + ) + mab.update(context=context, actions=actions, rewards=rewards) + expected_length = adaptive_window_size if adaptive_window_size != "inf" else n_samples + assert list(mab.actions_manager.rewards_memory) == rewards[-expected_length:] + assert list(mab.actions_manager.actions_memory) == actions[-expected_length:] + + # Change reward statistic, expect to hold only part of the data in the memory + new_rewards = [1] * n_samples + mab.update(context=context, actions=actions, rewards=new_rewards) + assert len(mab.actions_manager.rewards_memory) < expected_length + assert len(mab.actions_manager.actions_memory) < expected_length diff --git a/tests/test_model.py b/tests/test_model.py index b5ade18..69848ab 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -55,7 +55,7 @@ def test_can_init_beta(success_counter, failure_counter): assert (b.n_successes, b.n_failures) == (1, 1) -def test_both_or_neither_counters_are_defined(): +def test_both_or_neither_models_are_defined(): with pytest.raises(ValidationError): Beta(n_successes=0) with pytest.raises(ValidationError): @@ -112,21 +112,21 @@ def test_can_init_betaCC(a_float): def test_can_init_base_beta_mo(): # init with default params - b = BetaMO(counters=[Beta(), Beta()]) - assert b.counters[0].n_successes == 1 and b.counters[0].n_failures == 1 - assert b.counters[1].n_successes == 1 and b.counters[1].n_failures == 1 + b = BetaMO(models=[Beta(), Beta()]) + assert b.models[0].n_successes == 1 and b.models[0].n_failures == 1 + assert b.models[1].n_successes == 1 and b.models[1].n_failures == 1 # init with empty dict - b = BetaMO(counters=[{}, {}]) - assert b.counters[0] == Beta() + b = BetaMO(models=[{}, {}]) + assert b.models[0] == Beta() # invalid init with BetaCC instead of Beta with pytest.raises(ValidationError): - BetaMO(counters=[BetaCC(cost=1), BetaCC(cost=1)]) + BetaMO(models=[BetaCC(cost=1), BetaCC(cost=1)]) def test_calculate_proba_beta_mo(): - b = BetaMO(counters=[Beta(), Beta()]) + b = BetaMO(models=[Beta(), Beta()]) b.sample_proba() @@ -139,12 +139,12 @@ def test_beta_update_mo(rewards1, rewards2): rewards1, rewards2 = rewards1[:min_len], rewards2[:min_len] rewards = [[a, b] for a, b in zip(rewards1, rewards2)] - b = BetaMO(counters=[Beta(n_successes=11, n_failures=22), Beta(n_successes=33, n_failures=44)]) + b = BetaMO(models=[Beta(n_successes=11, n_failures=22), Beta(n_successes=33, n_failures=44)]) b.update(rewards=rewards) assert b == BetaMO( - counters=[ + models=[ Beta(n_successes=11 + sum(rewards1), n_failures=22 + len(rewards1) - sum(rewards1)), Beta(n_successes=33 + sum(rewards2), n_failures=44 + len(rewards2) - sum(rewards2)), ] @@ -162,16 +162,16 @@ def test_beta_update_mo(rewards1, rewards2): def test_can_init_beta_mo(): # init with default params - b = BetaMO(counters=[Beta(), Beta()]) - assert b.counters == [Beta(), Beta()] + b = BetaMO(models=[Beta(), Beta()]) + assert b.models == [Beta(), Beta()] # init with empty dict - b = BetaMO(counters=[{}, {}]) - assert b.counters == [Beta(), Beta()] + b = BetaMO(models=[{}, {}]) + assert b.models == [Beta(), Beta()] # invalid init with BetaCC instead of Beta with pytest.raises(ValidationError): - BetaMO(counters=[BetaCC(cost=1), BetaCC(cost=1)]) + BetaMO(models=[BetaCC(cost=1), BetaCC(cost=1)]) ######################################################################################################################## @@ -184,21 +184,21 @@ def test_can_init_beta_mo(): def test_can_init_beta_mo_cc(a_float): if a_float < 0 or np.isnan(a_float): with pytest.raises(ValidationError): - BetaMOCC(counters=[Beta(), Beta()], cost=a_float) + BetaMOCC(models=[Beta(), Beta()], cost=a_float) else: # init with default params - b = BetaMOCC(counters=[Beta(), Beta()], cost=a_float) - assert b.counters == [Beta(), Beta()] + b = BetaMOCC(models=[Beta(), Beta()], cost=a_float) + assert b.models == [Beta(), Beta()] assert b.cost == a_float # init with empty dict - b = BetaMOCC(counters=[{}, {}], cost=a_float) - assert b.counters == [Beta(), Beta()] + b = BetaMOCC(models=[{}, {}], cost=a_float) + assert b.models == [Beta(), Beta()] assert b.cost == a_float # invalid init with BetaCC instead of Beta with pytest.raises(ValidationError): - BetaMOCC(counters=[BetaCC(cost=1), BetaCC(cost=1)], cost=a_float) + BetaMOCC(models=[BetaCC(cost=1), BetaCC(cost=1)], cost=a_float) ######################################################################################################################## diff --git a/tests/test_smab.py b/tests/test_smab.py index 2c8a34e..8b6f444 100644 --- a/tests/test_smab.py +++ b/tests/test_smab.py @@ -25,8 +25,9 @@ from typing import List import pytest -from hypothesis import given +from hypothesis import given, settings from hypothesis import strategies as st +from hypothesis.strategies import composite from pybandits.base import BinaryReward, Float01 from pybandits.model import Beta, BetaCC, BetaMO, BetaMOCC @@ -67,13 +68,13 @@ def test_base_smab_update_ok(r1, r2): def test_can_instantiate_smab(): - with pytest.raises(TypeError): + with pytest.raises(ValueError): SmabBernoulli() with pytest.raises(AttributeError): SmabBernoulli(actions={}) with pytest.warns(UserWarning): SmabBernoulli(actions={"action1": Beta()}) - with pytest.raises(ValidationError): + with pytest.raises((ValueError, TypeError)): SmabBernoulli( actions={ "action1": None, @@ -204,7 +205,13 @@ def test_smab_get_state(a, b, c, d): expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": {}, "epsilon": None, "default_action": None, @@ -216,28 +223,83 @@ def test_smab_get_state(a, b, c, d): assert smab_state == expected_state -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "n_successes": st.integers(min_value=1, max_value=100), - "n_failures": st.integers(min_value=1, max_value=100), - }, - ), - min_size=2, +@composite +def smab_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + }, ), - "strategy": st.fixed_dictionaries({}), - } + min_size=2, + ) ) -) + + actions_manager = {"actions": actions} + state = { + "actions_manager": actions_manager, + "strategy": {}, + } + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + + state["epsilon"] = epsilon + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + elif default_action_index := draw(st.sampled_from([None, 1])) is not None: + default_action = list(actions.keys())[default_action_index] + else: + default_action = None + state["default_action"] = default_action + + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum([a["n_successes"] + a["n_failures"] - 2 for a in actions.values()]) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + + return state + + +@given(state=smab_state()) def test_smab_from_state(state): smab = SmabBernoulli.from_state(state) assert isinstance(smab, SmabBernoulli) - expected_actions = state["actions"] + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + SmabBernoulli.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_smab = SmabBernoulli.from_old_state(old_state) + assert old_smab == smab + + expected_actions = state["actions_manager"]["actions"] actual_actions = to_serializable_dict(smab.actions) # Normalize the dict assert expected_actions == actual_actions @@ -302,7 +364,7 @@ def test_smabbai_update(): def test_smabbai_with_betacc(): # Fails because smab bernoulli with BAI shouldn't support BetaCC - with pytest.raises(ValidationError): + with pytest.raises((ValidationError, TypeError)): SmabBernoulliBAI( actions={ "a1": BetaCC(cost=10), @@ -323,7 +385,13 @@ def test_smab_bai_get_state(a, b, c, d, exploit_p: Float01): smab = SmabBernoulliBAI(actions=actions, exploit_p=exploit_p) expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": {"exploit_p": exploit_p}, "epsilon": None, "default_action": None, @@ -337,32 +405,95 @@ def test_smab_bai_get_state(a, b, c, d, exploit_p: Float01): assert is_serializable(smab_state), "Internal state is not serializable" -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "n_successes": st.integers(min_value=1, max_value=100), - "n_failures": st.integers(min_value=1, max_value=100), - }, - ), - min_size=2, - ), - "strategy": st.one_of( - st.just({}), - st.just({"exploit_p": None}), - st.builds(lambda x: {"exploit_p": x}, st.floats(min_value=0, max_value=1)), +@st.composite +def smab_bai_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + } ), - } + min_size=2, + ) ) -) + + actions_manager = {"actions": actions} + + # Draw the strategy separately + strategy = draw( + st.one_of( + st.just({}), + st.just({"exploit_p": None}), + st.builds(lambda x: {"exploit_p": x}, st.floats(min_value=0, max_value=1)), + ) + ) + + state = { + "actions_manager": actions_manager, + "strategy": strategy, + } + + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + state["epsilon"] = epsilon + + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + else: + default_action_index = draw(st.sampled_from([None, 1])) + default_action = ( + list(actions.keys())[default_action_index] if default_action_index is not None else None + ) + + state["default_action"] = default_action + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum([a["n_successes"] + a["n_failures"] - 2 for a in actions.values()]) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + + return state + + +@given(state=smab_bai_state()) def test_smab_bai_from_state(state): smab = SmabBernoulliBAI.from_state(state) assert isinstance(smab, SmabBernoulliBAI) - expected_actions = state["actions"] + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + SmabBernoulliBAI.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_smab = SmabBernoulliBAI.from_old_state(old_state) + assert old_smab == smab + + expected_actions = state["actions_manager"]["actions"] actual_actions = to_serializable_dict(smab.actions) # Normalize the dict assert expected_actions == actual_actions expected_exploit_p = smab.strategy.get_expected_value_from_state(state, "exploit_p") @@ -441,8 +572,8 @@ def test_smabcc_update(): st.integers(min_value=1), st.integers(min_value=1), st.integers(min_value=1), - st.floats(min_value=0), - st.floats(min_value=0), + st.floats(min_value=0, max_value=1), + st.floats(min_value=0, max_value=1), st.floats(min_value=0, max_value=1), ) def test_smab_cc_get_state(a, b, c, d, cost1: NonNegativeFloat, cost2: NonNegativeFloat, subsidy_factor: Float01): @@ -453,7 +584,13 @@ def test_smab_cc_get_state(a, b, c, d, cost1: NonNegativeFloat, cost2: NonNegati smab = SmabBernoulliCC(actions=actions, subsidy_factor=subsidy_factor) expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": { "subsidy_factor": subsidy_factor, }, @@ -469,33 +606,97 @@ def test_smab_cc_get_state(a, b, c, d, cost1: NonNegativeFloat, cost2: NonNegati assert is_serializable(smab_state), "Internal state is not serializable" -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "n_successes": st.integers(min_value=1, max_value=100), - "n_failures": st.integers(min_value=1, max_value=100), - "cost": st.floats(min_value=0), - }, - ), - min_size=2, - ), - "strategy": st.one_of( - st.just({}), - st.just({"subsidy_factor": None}), - st.builds(lambda x: {"subsidy_factor": x}, st.floats(min_value=0, max_value=1)), +@st.composite +def smab_cc_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + "cost": st.floats(min_value=0, max_value=1), + } ), - } + min_size=2, + ) ) -) + + actions_manager = {"actions": actions} + + # Draw the strategy separately + strategy = draw( + st.one_of( + st.just({}), + st.just({"subsidy_factor": None}), + st.builds(lambda x: {"subsidy_factor": x}, st.floats(min_value=0, max_value=1)), + ) + ) + + state = { + "actions_manager": actions_manager, + "strategy": strategy, + } + + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + state["epsilon"] = epsilon + + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + else: + default_action_index = draw(st.sampled_from([None, 1])) + default_action = ( + list(actions.keys())[default_action_index] if default_action_index is not None else None + ) + + state["default_action"] = default_action + + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum([a["n_successes"] + a["n_failures"] - 2 for a in actions.values()]) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + + return state + + +@given(state=smab_cc_state()) def test_smab_cc_from_state(state): smab = SmabBernoulliCC.from_state(state) assert isinstance(smab, SmabBernoulliCC) - expected_actions = state["actions"] + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + SmabBernoulliCC.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_smab = SmabBernoulliCC.from_old_state(old_state) + assert old_smab == smab + + expected_actions = state["actions_manager"]["actions"] actual_actions = json.loads(json.dumps(smab.actions, default=dict)) # Normalize the dict assert expected_actions == actual_actions expected_subsidy_factor = smab.strategy.get_expected_value_from_state(state, "subsidy_factor") @@ -520,14 +721,14 @@ def test_can_init_smab_mo(a_list): s = SmabBernoulliMO( actions={ "a1": BetaMO( - counters=[ + models=[ Beta(n_successes=a, n_failures=b), Beta(n_successes=c, n_failures=d), Beta(n_successes=e, n_failures=f), ] ), "a2": BetaMO( - counters=[ + models=[ Beta(n_successes=d, n_failures=a), Beta(n_successes=e, n_failures=b), Beta(n_successes=f, n_failures=c), @@ -536,14 +737,14 @@ def test_can_init_smab_mo(a_list): }, ) assert s.actions["a1"] == BetaMO( - counters=[ + models=[ Beta(n_successes=a, n_failures=b), Beta(n_successes=c, n_failures=d), Beta(n_successes=e, n_failures=f), ] ) assert s.actions["a2"] == BetaMO( - counters=[ + models=[ Beta(n_successes=d, n_failures=a), Beta(n_successes=e, n_failures=b), Beta(n_successes=f, n_failures=c), @@ -556,9 +757,9 @@ def test_all_actions_must_have_same_number_of_objectives_smab_mo(): with pytest.raises(ValueError): SmabBernoulliMO( actions={ - "a1": BetaMO(counters=[Beta(), Beta()]), - "a2": BetaMO(counters=[Beta(), Beta()]), - "a3": BetaMO(counters=[Beta(), Beta(), Beta()]), + "a1": BetaMO(models=[Beta(), Beta()]), + "a2": BetaMO(models=[Beta(), Beta()]), + "a3": BetaMO(models=[Beta(), Beta(), Beta()]), }, ) @@ -602,14 +803,14 @@ def test_smab_mo_get_state(a_list): actions = { "a1": BetaMO( - counters=[ + models=[ Beta(n_successes=a, n_failures=b), Beta(n_successes=c, n_failures=d), Beta(n_successes=e, n_failures=f), ] ), "a2": BetaMO( - counters=[ + models=[ Beta(n_successes=d, n_failures=a), Beta(n_successes=e, n_failures=b), Beta(n_successes=f, n_failures=c), @@ -619,7 +820,13 @@ def test_smab_mo_get_state(a_list): smab = SmabBernoulliMO(actions=actions) expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": {}, "epsilon": None, "default_action": None, @@ -633,36 +840,92 @@ def test_smab_mo_get_state(a_list): assert is_serializable(smab_state), "Internal state is not serializable" -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "counters": st.lists( - st.fixed_dictionaries( - { - "n_successes": st.integers(min_value=1, max_value=100), - "n_failures": st.integers(min_value=1, max_value=100), - }, - ), - min_size=3, - max_size=3, - ) - } - ), - min_size=2, +@composite +def smab_mo_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "models": st.lists( + st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + }, + ), + min_size=3, + max_size=3, + ) + } ), - "strategy": st.fixed_dictionaries({}), - } + min_size=2, + ) ) -) + + actions_manager = {"actions": actions} + state = { + "actions_manager": actions_manager, + "strategy": {}, + } + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + + state["epsilon"] = epsilon + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + elif default_action_index := draw(st.sampled_from([None, 1])) is not None: + default_action = list(actions.keys())[default_action_index] + else: + default_action = None + state["default_action"] = default_action + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum( + [a["models"][0]["n_successes"] + a["models"][0]["n_failures"] - 2 for a in actions.values()] + ) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + + return state + + +@given(state=smab_mo_state()) def test_smab_mo_from_state(state): smab = SmabBernoulliMO.from_state(state) assert isinstance(smab, SmabBernoulliMO) - expected_actions = state["actions"] + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + SmabBernoulliMO.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_smab = SmabBernoulliMO.from_old_state(old_state) + assert old_smab == smab + + expected_actions = state["actions_manager"]["actions"] actual_actions = json.loads(json.dumps(smab.actions, default=dict)) # Normalize the dict assert expected_actions == actual_actions @@ -684,7 +947,7 @@ def test_can_init_smab_mo_cc(a_list): s = SmabBernoulliMOCC( actions={ "a1": BetaMOCC( - counters=[ + models=[ Beta(n_successes=a, n_failures=b), Beta(n_successes=c, n_failures=d), Beta(n_successes=e, n_failures=f), @@ -692,7 +955,7 @@ def test_can_init_smab_mo_cc(a_list): cost=g, ), "a2": BetaMOCC( - counters=[ + models=[ Beta(n_successes=d, n_failures=a), Beta(n_successes=e, n_failures=b), Beta(n_successes=f, n_failures=c), @@ -702,7 +965,7 @@ def test_can_init_smab_mo_cc(a_list): }, ) assert s.actions["a1"] == BetaMOCC( - counters=[ + models=[ Beta(n_successes=a, n_failures=b), Beta(n_successes=c, n_failures=d), Beta(n_successes=e, n_failures=f), @@ -710,7 +973,7 @@ def test_can_init_smab_mo_cc(a_list): cost=g, ) assert s.actions["a2"] == BetaMOCC( - counters=[ + models=[ Beta(n_successes=d, n_failures=a), Beta(n_successes=e, n_failures=b), Beta(n_successes=f, n_failures=c), @@ -724,9 +987,9 @@ def test_all_actions_must_have_same_number_of_objectives_smab_mo_cc(): with pytest.raises(ValueError): SmabBernoulliMOCC( actions={ - "action 1": BetaMOCC(counters=[Beta(), Beta()], cost=1), - "action 2": BetaMOCC(counters=[Beta(), Beta()], cost=1), - "action 3": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=1), + "action 1": BetaMOCC(models=[Beta(), Beta()], cost=1), + "action 2": BetaMOCC(models=[Beta(), Beta()], cost=1), + "action 3": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=1), }, ) @@ -782,7 +1045,7 @@ def test_smab_mo_cc_get_state(a_list): actions = { "a1": BetaMOCC( - counters=[ + models=[ Beta(n_successes=a, n_failures=b), Beta(n_successes=c, n_failures=d), Beta(n_successes=e, n_failures=f), @@ -790,7 +1053,7 @@ def test_smab_mo_cc_get_state(a_list): cost=g, ), "a2": BetaMOCC( - counters=[ + models=[ Beta(n_successes=d, n_failures=a), Beta(n_successes=e, n_failures=b), Beta(n_successes=f, n_failures=c), @@ -801,7 +1064,13 @@ def test_smab_mo_cc_get_state(a_list): smab = SmabBernoulliMOCC(actions=actions) expected_state = to_serializable_dict( { - "actions": actions, + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, "strategy": {}, "epsilon": None, "default_action": None, @@ -815,37 +1084,92 @@ def test_smab_mo_cc_get_state(a_list): assert is_serializable(smab_state), "Internal state is not serializable" -@given( - state=st.fixed_dictionaries( - { - "actions": st.dictionaries( - keys=st.text(min_size=1, max_size=10), - values=st.fixed_dictionaries( - { - "counters": st.lists( - st.fixed_dictionaries( - { - "n_successes": st.integers(min_value=1, max_value=100), - "n_failures": st.integers(min_value=1, max_value=100), - }, - ), - min_size=3, - max_size=3, +@composite +def smab_mocc_state(draw): + # Define individual components + actions = draw( + st.dictionaries( + keys=st.text(min_size=1, max_size=10), + values=st.fixed_dictionaries( + { + "models": st.lists( + st.fixed_dictionaries( + { + "n_successes": st.integers(min_value=1, max_value=100), + "n_failures": st.integers(min_value=1, max_value=100), + }, ), - "cost": st.floats(min_value=0), - } - ), - min_size=2, + min_size=3, + max_size=3, + ), + "cost": st.floats(min_value=0, max_value=1), + } ), - "strategy": st.fixed_dictionaries({}), - } + min_size=2, + ) ) -) + + actions_manager = {"actions": actions} + state = { + "actions_manager": actions_manager, + "strategy": {}, + } + if draw(st.booleans()): + epsilon = draw(st.sampled_from([None, 0.1])) + + state["epsilon"] = epsilon + # Adjust default_action based on epsilon and actions + if draw(st.booleans()): + if epsilon is None: + default_action = None + elif default_action_index := draw(st.sampled_from([None, 1])) is not None: + default_action = list(actions.keys())[default_action_index] + else: + default_action = None + state["default_action"] = default_action + if draw(st.booleans()): + actions_manager_state = state["actions_manager"] + actions_manager_state["adaptive_window_size"] = draw( + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")) + ) + if actions_manager_state["adaptive_window_size"] is not None: + actions_manager_state["delta"] = draw(st.one_of(st.floats(min_value=0, max_value=1), st.none())) + if draw(st.booleans()): + memory_limit = sum( + [a["models"][0]["n_successes"] + a["models"][0]["n_failures"] - 2 for a in actions.values()] + ) + if actions_manager_state["adaptive_window_size"] != "inf": + max_size = min(actions_manager_state["adaptive_window_size"], memory_limit) + else: + max_size = memory_limit + actions_manager_state["actions_memory"] = draw( + st.lists( + st.sampled_from(list(actions.keys())), + min_size=0, + max_size=max_size, + ) + ) + size = len(actions_manager_state["actions_memory"]) + actions_manager_state["rewards_memory"] = draw( + st.lists(st.integers(min_value=0, max_value=1), min_size=size, max_size=size) + ) + return state + + +@given(state=smab_mocc_state()) def test_smab_mo_cc_from_state(state): smab = SmabBernoulliMOCC.from_state(state) assert isinstance(smab, SmabBernoulliMOCC) - expected_actions = state["actions"] + if state["actions_manager"].get("adaptive_window_size", None) is None: + with pytest.raises(ValueError): + SmabBernoulliMOCC.from_old_state(state) + old_state = state.copy() + old_state["actions"] = old_state.pop("actions_manager")["actions"] + old_smab = SmabBernoulliMOCC.from_old_state(old_state) + assert old_smab == smab + + expected_actions = state["actions_manager"]["actions"] actual_actions = to_serializable_dict(smab.actions) # Normalize the dict assert expected_actions == actual_actions @@ -864,7 +1188,7 @@ def test_smab_mo_cc_from_state(state): st.integers(min_value=1), st.integers(min_value=1), ) -def test_can_instantiate_epsilon_greddy_smab_with_params(a, b): +def test_can_instantiate_epsilon_greedy_smab_with_params(a, b): s = SmabBernoulli( actions={ "action1": Beta(n_successes=a, n_failures=b), @@ -897,13 +1221,13 @@ def test_epsilon_greedy_smab_predict(n_samples: int): _, _ = s.predict(n_samples=n_samples, forbidden_actions=forbidden_actions) -def test_epsilon_greddy_smabbai_predict(n_samples: int): +def test_epsilon_greedy_smabbai_predict(n_samples: int): n_samples = 1000 s = SmabBernoulliBAI(actions={"a1": Beta(), "a2": Beta()}, epsilon=0.1, default_action="a1") _, _ = s.predict(n_samples=n_samples) -def test_epsilon_greddy_smabcc_predict(n_samples: int): +def test_epsilon_greedy_smabcc_predict(n_samples: int): n_samples = 1000 s = SmabBernoulliCC( actions={ @@ -917,7 +1241,7 @@ def test_epsilon_greddy_smabcc_predict(n_samples: int): _, _ = s.predict(n_samples=n_samples) -def test_epsilon_greddy_smab_mo_predict(n_samples: int): +def test_epsilon_greedy_smab_mo_predict(n_samples: int): n_samples = 1000 s = SmabBernoulliMO.cold_start(action_ids={"a1", "a2"}, n_objectives=3, epsilon=0.1, default_action="a1") @@ -926,7 +1250,7 @@ def test_epsilon_greddy_smab_mo_predict(n_samples: int): s.predict(n_samples=n_samples, forbidden_actions=forbidden) -def test_epsilon_greddy_smab_mo_cc_predict(n_samples: int): +def test_epsilon_greedy_smab_mo_cc_predict(n_samples: int): n_samples = 1000 s = SmabBernoulliMOCC.cold_start( @@ -935,3 +1259,229 @@ def test_epsilon_greddy_smab_mo_cc_predict(n_samples: int): forbidden = None s.predict(n_samples=n_samples, forbidden_actions=forbidden) + + +@given( + st.integers(min_value=1), + st.integers(min_value=1), + st.integers(min_value=1), + st.integers(min_value=1), + st.sampled_from([None, 0.1]), + st.sampled_from([None, "action1"]), +) +def test_epsilon_greedy_smab_get_state(a, b, c, d, epsilon, default_action): + if default_action is not None and epsilon is None: + with pytest.raises(AttributeError): + SmabBernoulli( + actions={ + "action1": Beta(n_successes=a, n_failures=b), + "action2": Beta(n_successes=c, n_failures=d), + }, + epsilon=epsilon, + default_action=default_action, + ) + else: + actions = {"action1": Beta(n_successes=a, n_failures=b), "action2": Beta(n_successes=c, n_failures=d)} + smab = SmabBernoulli(actions=actions, epsilon=epsilon, default_action=default_action) + + expected_state = to_serializable_dict( + { + "actions_manager": { + "actions": actions, + "adaptive_window_size": None, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, + "strategy": {}, + "epsilon": epsilon, + "default_action": default_action, + } + ) + + class_name, smab_state = smab.get_state() + assert class_name == "SmabBernoulli" + assert smab_state == expected_state + + +######################################################################################################################## + + +# Smab with adaptive window size + + +@given( + st.integers(min_value=1), + st.integers(min_value=1), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_can_instantiate_adaptive_window_smab_with_params(a, b, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + with pytest.raises(AttributeError): + SmabBernoulli( + actions={ + "action1": Beta(n_successes=a, n_failures=b), + "action2": Beta(n_successes=a, n_failures=b), + }, + adaptive_window_size=adaptive_window_size, + delta=delta, + ) + + else: + s = SmabBernoulli( + actions={ + "action1": Beta(n_successes=a, n_failures=b), + "action2": Beta(n_successes=a, n_failures=b), + }, + adaptive_window_size=adaptive_window_size, + delta=delta, + ) + assert (s.actions["action1"].n_successes == a) and (s.actions["action1"].n_failures == b) + assert s.actions["action1"] == s.actions["action2"] + + +@given( + st.just(100), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_smab_predict(n_samples, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + s = SmabBernoulli( + actions={ + "a0": Beta(), + "a1": Beta(n_successes=5, n_failures=5), + "forb_1": Beta(n_successes=10, n_failures=1), + "best": Beta(n_successes=10, n_failures=5), + "forb_2": Beta(n_successes=100, n_failures=4), + "a5": Beta(), + }, + adaptive_window_size=adaptive_window_size, + delta=delta, + ) + forbidden_actions = set(["forb_1", "forb_2"]) + + _, _ = s.predict(n_samples=n_samples, forbidden_actions=forbidden_actions) + + +@settings(deadline=500) +@given( + st.just(100), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_smabbai_predict(n_samples, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + s = SmabBernoulliBAI(actions={"a1": Beta(), "a2": Beta()}, adaptive_window_size=adaptive_window_size, delta=delta) + _, _ = s.predict(n_samples=n_samples) + + +@settings(deadline=1000) +@given( + st.just(100), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_smabcc_predict(n_samples, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + s = SmabBernoulliCC( + actions={ + "a1": BetaCC(n_successes=1, n_failures=2, cost=10), + "a2": BetaCC(n_successes=3, n_failures=4, cost=20), + }, + subsidy_factor=0.7, + adaptive_window_size=adaptive_window_size, + delta=delta, + ) + _, _ = s.predict(n_samples=n_samples) + + +@settings(deadline=500) +@given( + st.just(100), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_smab_mo_predict(n_samples, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + + s = SmabBernoulliMO.cold_start( + action_ids={"a1", "a2"}, n_objectives=3, adaptive_window_size=adaptive_window_size, delta=delta + ) + + forbidden = None + s.predict(n_samples=n_samples, forbidden_actions=forbidden) + + +@settings(deadline=1000) +@given( + st.just(100), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_smab_mo_cc_predict(n_samples, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + + s = SmabBernoulliMOCC.cold_start( + action_ids_cost={"a1": 1, "a2": 2}, n_objectives=2, adaptive_window_size=adaptive_window_size, delta=delta + ) + + forbidden = None + s.predict(n_samples=n_samples, forbidden_actions=forbidden) + + +@given( + st.integers(min_value=1), + st.integers(min_value=1), + st.integers(min_value=1), + st.integers(min_value=1), + st.one_of(st.integers(min_value=1, max_value=100), st.none(), st.just("inf")), + st.one_of(st.floats(min_value=0, max_value=1), st.none()), +) +def test_adaptive_window_smab_get_state(a, b, c, d, adaptive_window_size, delta): + if adaptive_window_size is None and delta is not None: + delta = None + elif adaptive_window_size is not None and delta is None: + delta = 0.1 + actions = {"action1": Beta(n_successes=a, n_failures=b), "action2": Beta(n_successes=c, n_failures=d)} + smab = SmabBernoulli(actions=actions, adaptive_window_size=adaptive_window_size, delta=delta) + if adaptive_window_size is not None: + expected_state = to_serializable_dict( + { + "actions_manager": { + "actions": actions, + "adaptive_window_size": adaptive_window_size, + "delta": delta, + "actions_memory": [], + "rewards_memory": [], + }, + "strategy": {}, + "default_action": None, + "epsilon": None, + } + ) + else: + expected_state = to_serializable_dict( + { + "actions_manager": { + "actions": actions, + "adaptive_window_size": adaptive_window_size, + "delta": None, + "actions_memory": None, + "rewards_memory": None, + }, + "strategy": {}, + "default_action": None, + "epsilon": None, + } + ) + + class_name, smab_state = smab.get_state() + assert class_name == "SmabBernoulli" + assert smab_state == expected_state diff --git a/tests/test_strategy.py b/tests/test_strategy.py index 8773fe1..9fdfef7 100644 --- a/tests/test_strategy.py +++ b/tests/test_strategy.py @@ -351,11 +351,11 @@ def test_select_action_mo_cc(): m = MultiObjectiveCostControlBandit() actions = { - "a1": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=8), - "a2": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=2), - "a3": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=5), - "a4": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=1), - "a5": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=7), + "a1": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=8), + "a2": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=2), + "a3": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=5), + "a4": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=1), + "a5": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=7), } p = { "a1": [0.1, 0.3, 0.5], @@ -369,9 +369,9 @@ def test_select_action_mo_cc(): assert m.select_action(p=p, actions=actions) == "a4" actions = { - "a1": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=2), - "a2": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=2), - "a3": BetaMOCC(counters=[Beta(), Beta(), Beta()], cost=5), + "a1": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=2), + "a2": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=2), + "a3": BetaMOCC(models=[Beta(), Beta(), Beta()], cost=5), } p = { "a1": [0.6, 0.1, 0.1],