Skip to content

Commit

Permalink
attempt at a refactor!
Browse files Browse the repository at this point in the history
  • Loading branch information
NishanthJKumar committed Dec 10, 2023
1 parent 94a237e commit acc6c25
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 53 deletions.
87 changes: 56 additions & 31 deletions predicators/approaches/active_sampler_learning_approach.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
KNeighborsClassifier, MLPBinaryClassifier, MLPRegressor
from predicators.settings import CFG
from predicators.structs import NSRT, Array, GroundAtom, LowLevelTrajectory, \
Metrics, NSRTSampler, Object, ParameterizedOption, Predicate, Segment, \
State, Task, Type, _GroundNSRT, _GroundSTRIPSOperator, _Option
Metrics, NSRTSampler, NSRTSamplerWithEpsilonIndicator, Object, \
ParameterizedOption, Predicate, Segment, State, Task, Type, _GroundNSRT, \
_GroundSTRIPSOperator, _Option

# Dataset for sampler learning: includes (s, option, s', label) per param opt.
_OptionSamplerDataset = List[Tuple[State, _Option, State, Any]]
Expand Down Expand Up @@ -65,7 +66,8 @@ def __init__(self, initial_predicates: Set[Predicate],
# differ from those used for execution (they will differ precisely
# in their sampler). Thus, we will keep around a separate mapping from
# NSRTs to samplers to be used at exploration time.
self._nsrt_to_explorer_sampler: Dict[NSRT, NSRTSampler] = {}
self._nsrt_to_explorer_sampler: Dict[
NSRT, NSRTSamplerWithEpsilonIndicator] = {}

# Record what train tasks have been seen during exploration so far.
self._seen_train_task_idxs: Set[int] = set()
Expand Down Expand Up @@ -312,7 +314,10 @@ def _learn_wrapped_samplers(self,
for old_nsrt in self._nsrts:
if old_nsrt.option not in new_nsrt_options:
new_test_nsrts.add(old_nsrt)
self._nsrt_to_explorer_sampler[old_nsrt] = old_nsrt._sampler # pylint: disable=protected-access
self._nsrt_to_explorer_sampler[
old_nsrt] = _wrap_sampler_exploration(
old_nsrt._sampler,
lambda o, _, params: [1.0] * len(params), "greedy") # pylint: disable=protected-access
self._nsrts = new_test_nsrts
# Re-save the NSRTs now that we've updated them.
save_path = utils.get_approach_save_path_str()
Expand All @@ -331,26 +336,30 @@ def __init__(self, nsrts: Set[NSRT], predicates: Set[Predicate],
self._rng = np.random.default_rng(CFG.seed)
# We keep track of two samplers per NSRT: one to use at test time
# and another to use during exploration/play time.
self._learned_samplers: Optional[Dict[NSRT, Tuple[NSRTSampler,
NSRTSampler]]] = None
self._learned_samplers: Optional[Dict[NSRT, Tuple[
NSRTSampler, NSRTSamplerWithEpsilonIndicator]]] = None

def learn(self, data: _SamplerDataset) -> None:
"""Fit all of the samplers."""
new_samplers: Dict[NSRT, Tuple[NSRTSampler, NSRTSampler]] = {}
new_samplers: Dict[NSRT, Tuple[NSRTSampler,
NSRTSamplerWithEpsilonIndicator]] = {}
for param_opt, nsrt_data in data.items():
nsrt = utils.param_option_to_nsrt(param_opt, self._nsrts)
logging.info(f"Fitting wrapped sampler for {nsrt.name}...")
new_samplers[nsrt] = self._learn_nsrt_sampler(nsrt_data, nsrt)
self._learned_samplers = new_samplers

def get_samplers(self) -> Dict[NSRT, Tuple[NSRTSampler, NSRTSampler]]:
def get_samplers(
self
) -> Dict[NSRT, Tuple[NSRTSampler, NSRTSamplerWithEpsilonIndicator]]:
"""Expose the fitted samplers, organized by NSRTs."""
assert self._learned_samplers is not None
return self._learned_samplers

@abc.abstractmethod
def _learn_nsrt_sampler(self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSampler]:
def _learn_nsrt_sampler(
self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSamplerWithEpsilonIndicator]:
"""Learn the new test-time and exploration samplers for a single NSRT
and return them."""

Expand All @@ -359,8 +368,9 @@ class _ClassifierWrappedSamplerLearner(_WrappedSamplerLearner):
"""Using boolean class labels on transitions, learn a classifier, and then
use the probability of predicting True to select parameters."""

def _learn_nsrt_sampler(self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSampler]:
def _learn_nsrt_sampler(
self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSamplerWithEpsilonIndicator]:
X_classifier: List[Array] = []
y_classifier: List[int] = []
for state, option, _, label in nsrt_data:
Expand Down Expand Up @@ -411,10 +421,8 @@ def _learn_nsrt_sampler(self, nsrt_data: _OptionSamplerDataset,
# Easiest way to access the base sampler.
base_sampler = nsrt._sampler # pylint: disable=protected-access
score_fn = _classifier_to_score_fn(classifier, nsrt)
wrapped_sampler_test = _wrap_sampler(base_sampler,
score_fn,
strategy="greedy")
wrapped_sampler_exploration = _wrap_sampler(
wrapped_sampler_test = _wrap_sampler_test(base_sampler, score_fn)
wrapped_sampler_exploration = _wrap_sampler_exploration(
base_sampler,
score_fn,
strategy=CFG.active_sampler_learning_exploration_sample_strategy)
Expand All @@ -426,8 +434,9 @@ class _ClassifierEnsembleWrappedSamplerLearner(_WrappedSamplerLearner):
classifiers, and then use the entropy among the predictions, as well as the
probability of predicting True to select parameters."""

def _learn_nsrt_sampler(self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSampler]:
def _learn_nsrt_sampler(
self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSamplerWithEpsilonIndicator]:
X_classifier: List[Array] = []
y_classifier: List[int] = []
for state, option, _, label in nsrt_data:
Expand Down Expand Up @@ -470,13 +479,11 @@ def _learn_nsrt_sampler(self, nsrt_data: _OptionSamplerDataset,
test_score_fn = _classifier_ensemble_to_score_fn(classifier,
nsrt,
test_time=True)
wrapped_sampler_test = _wrap_sampler(base_sampler,
test_score_fn,
strategy="greedy")
wrapped_sampler_test = _wrap_sampler_test(base_sampler, test_score_fn)
explore_score_fn = _classifier_ensemble_to_score_fn(classifier,
nsrt,
test_time=False)
wrapped_sampler_exploration = _wrap_sampler(
wrapped_sampler_exploration = _wrap_sampler_exploration(
base_sampler,
explore_score_fn,
strategy=CFG.active_sampler_learning_exploration_sample_strategy)
Expand All @@ -502,8 +509,9 @@ def learn(self, data: _SamplerDataset) -> None:
# Update the score functions now that all children are processed.
self._nsrt_score_fns = self._next_nsrt_score_fns

def _learn_nsrt_sampler(self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSampler]:
def _learn_nsrt_sampler(
self, nsrt_data: _OptionSamplerDataset,
nsrt: NSRT) -> Tuple[NSRTSampler, NSRTSamplerWithEpsilonIndicator]:
# Build targets.
gamma = CFG.active_sampler_learning_score_gamma
num_a_samp = CFG.active_sampler_learning_num_lookahead_samples
Expand Down Expand Up @@ -533,10 +541,8 @@ def _learn_nsrt_sampler(self, nsrt_data: _OptionSamplerDataset,
score_fn = _regressor_to_score_fn(regressor, nsrt)
# Save the score function for use in later target computation.
self._next_nsrt_score_fns[nsrt] = score_fn
wrapped_sampler_test = _wrap_sampler(base_sampler,
score_fn,
strategy="greedy")
wrapped_sampler_exploration = _wrap_sampler(
wrapped_sampler_test = _wrap_sampler_test(base_sampler, score_fn)
wrapped_sampler_exploration = _wrap_sampler_exploration(
base_sampler,
score_fn,
strategy=CFG.active_sampler_learning_exploration_sample_strategy)
Expand Down Expand Up @@ -609,8 +615,8 @@ def _fit_regressor(self, nsrt_data: _OptionSamplerDataset) -> MLPRegressor:


# Helper functions.
def _wrap_sampler(base_sampler: NSRTSampler, score_fn: _ScoreFn,
strategy: str) -> NSRTSampler:
def _wrap_sampler_test(base_sampler: NSRTSampler,
score_fn: _ScoreFn) -> NSRTSampler:
"""Create a wrapped sampler that uses a score function to select among
candidates from a base sampler."""

Expand All @@ -621,17 +627,36 @@ def _sample(state: State, goal: Set[GroundAtom], rng: np.random.Generator,
for _ in range(CFG.active_sampler_learning_num_samples)
]
scores = score_fn(state, objects, samples)
idx = int(np.argmax(scores))
return samples[idx]

return _sample


def _wrap_sampler_exploration(
base_sampler: NSRTSampler, score_fn: _ScoreFn,
strategy: str) -> NSRTSamplerWithEpsilonIndicator:

def _sample(state: State, goal: Set[GroundAtom], rng: np.random.Generator,
objects: Sequence[Object]) -> Tuple[Array, bool]:
samples = [
base_sampler(state, goal, rng, objects)
for _ in range(CFG.active_sampler_learning_num_samples)
]
scores = score_fn(state, objects, samples)
if strategy in ["greedy", "epsilon_greedy"]:
idx = int(np.argmax(scores))
epsilon_bool = False
if strategy == "epsilon_greedy" and rng.uniform(
) <= CFG.active_sampler_learning_exploration_epsilon:
# Randomly select a sample to pick, following the epsilon
# greedy strategy!
idx = rng.integers(0, len(scores))
epsilon_bool = True
else:
raise NotImplementedError('Exploration strategy ' +
f'{strategy} ' + 'is not implemented.')
return samples[idx]
return (samples[idx], epsilon_bool)

return _sample

Expand Down
8 changes: 5 additions & 3 deletions predicators/explorers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from predicators.ml_models import MapleQFunction
from predicators.option_model import _OptionModelBase
from predicators.settings import CFG
from predicators.structs import NSRT, GroundAtom, NSRTSampler, \
ParameterizedOption, Predicate, State, Task, Type, _GroundSTRIPSOperator
from predicators.structs import NSRT, GroundAtom, \
NSRTSamplerWithEpsilonIndicator, ParameterizedOption, Predicate, State, \
Task, Type, _GroundSTRIPSOperator

__all__ = ["BaseExplorer"]

Expand All @@ -37,7 +38,8 @@ def create_explorer(
ground_op_hist: Optional[Dict[_GroundSTRIPSOperator, List[bool]]] = None,
competence_models: Optional[Dict[_GroundSTRIPSOperator,
SkillCompetenceModel]] = None,
nsrt_to_explorer_sampler: Optional[Dict[NSRT, NSRTSampler]] = None,
nsrt_to_explorer_sampler: Optional[Dict[
NSRT, NSRTSamplerWithEpsilonIndicator]] = None,
seen_train_task_idxs: Optional[Set[int]] = None,
pursue_task_goal_first: Optional[bool] = None,
maple_q_function: Optional[MapleQFunction] = None,
Expand Down
52 changes: 36 additions & 16 deletions predicators/explorers/active_sampler_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from collections import deque
from typing import Callable, Dict, Iterator, List, Optional, Set, Tuple
from typing import Callable, Deque, Dict, Iterator, List, Optional, Set, Tuple

import numpy as np
from gym.spaces import Box
Expand All @@ -15,8 +15,9 @@
run_task_plan_once
from predicators.settings import CFG
from predicators.structs import NSRT, Action, ExplorationStrategy, \
GroundAtom, NSRTSampler, ParameterizedOption, Predicate, State, Task, \
Type, _GroundNSRT, _GroundSTRIPSOperator, _Option
GroundAtom, NSRTSamplerWithEpsilonIndicator, ParameterizedOption, \
Predicate, State, Task, Type, _GroundNSRT, _GroundSTRIPSOperator, \
_Option

# Helper type to distinguish training tasks from replanning tasks.
_TaskID = Tuple[str, int]
Expand All @@ -41,7 +42,8 @@ def __init__(self, predicates: Set[Predicate],
ground_op_hist: Dict[_GroundSTRIPSOperator, List[bool]],
competence_models: Dict[_GroundSTRIPSOperator,
SkillCompetenceModel],
nsrt_to_explorer_sampler: Dict[NSRT, NSRTSampler],
nsrt_to_explorer_sampler: Dict[
NSRT, NSRTSamplerWithEpsilonIndicator],
seen_train_task_idxs: Set[int],
pursue_task_goal_first: bool) -> None:

Expand All @@ -56,6 +58,11 @@ def __init__(self, predicates: Set[Predicate],
self._ground_op_hist = ground_op_hist
self._competence_models = competence_models
self._last_executed_nsrt: Optional[_GroundNSRT] = None
# Indicator that tells us whether the last executed NSRT used
# a random sample for the purposes of exploration, or whether
# it used something from the current learned sampler distribution
# (exploitation).
self._last_executed_nsrt_was_exploration: Optional[bool] = None
self._nsrt_to_explorer_sampler = nsrt_to_explorer_sampler
self._seen_train_task_idxs = seen_train_task_idxs
self._pursue_task_goal_first = pursue_task_goal_first
Expand All @@ -73,7 +80,7 @@ def __init__(self, predicates: Set[Predicate],

# Tasks created through re-planning.
n = CFG.active_sampler_explorer_planning_progress_max_replan_tasks
self._replanning_tasks: deque[Task] = deque([], maxlen=n)
self._replanning_tasks: Deque[Task] = deque([], maxlen=n)

@classmethod
def get_name(cls) -> str:
Expand Down Expand Up @@ -105,7 +112,7 @@ def _get_exploration_strategy(self, train_task_idx: int,
current_task_repeat_goal: Optional[Set[GroundAtom]] = None
using_random = False

def _option_policy(state: State) -> _Option:
def _option_policy(state: State) -> Tuple[_Option, bool]:
logging.info("[Explorer] Option policy called.")
nonlocal assigned_task_finished, current_policy, \
next_practice_nsrt, using_random, assigned_task_horizon
Expand All @@ -115,7 +122,7 @@ def _option_policy(state: State) -> _Option:

if using_random:
logging.info("[Explorer] Using random option policy.")
return self._get_random_option(state)
return (self._get_random_option(state), False)

# Record if we've reached the assigned goal; can now practice.
if not assigned_task_finished and \
Expand Down Expand Up @@ -143,13 +150,20 @@ def _option_policy(state: State) -> _Option:
f"[Explorer] Practicing NSRT: {next_practice_nsrt}")
exploration_sampler = self._nsrt_to_explorer_sampler[
next_practice_nsrt.parent]
practice_nsrt_for_exploration = next_practice_nsrt.copy_with(
_sampler=exploration_sampler)
option = practice_nsrt_for_exploration.sample_option(
state, g, self._rng)
# We want to generate a sample to use to ground the option,
# and also save the epsilon indicator.
params, indicator = exploration_sampler(
state, g, self._rng, next_practice_nsrt.option_objs)
# Clip the params into the params_space of self.option,
# for safety.
low = next_practice_nsrt.option.params_space.low
high = next_practice_nsrt.option.params_space.high
params = np.clip(params, low, high)
option = next_practice_nsrt.option.ground(
next_practice_nsrt.option_objs, params)
next_practice_nsrt = None
current_policy = None
return option
return option, indicator

# Check if it's time to select a new goal and re-plan.
if current_policy is None:
Expand Down Expand Up @@ -249,12 +263,12 @@ def generate_goals() -> Iterator[Set[GroundAtom]]:
logging.info("[Explorer] No reachable goal found. "
"Switching to random exploration.")
using_random = True
return self._get_random_option(state)
return (self._get_random_option(state), False)
# Query the current policy.
assert current_policy is not None
try:
act = current_policy(state)
return act
return (act, False)
except utils.OptionExecutionFailure:
logging.info("[Explorer] Option execution failure!")
current_policy = None
Expand All @@ -273,11 +287,12 @@ def _wrapped_option_policy(state: State) -> _Option:
# Update ground_op_hist.
self._update_ground_op_hist(state)
# Record last executed NSRT.
option = _option_policy(state)
option, exploration_indicator = _option_policy(state)
ground_nsrt = utils.option_to_ground_nsrt(option, self._nsrts)
logging.info(f"[Explorer] Starting NSRT: {ground_nsrt.name}"
f"{ground_nsrt.objects}")
self._last_executed_nsrt = ground_nsrt
self._last_executed_nsrt_was_exploration = exploration_indicator
return option

# Finalize policy.
Expand All @@ -303,8 +318,10 @@ def _wrapped_policy(state: State) -> Action:
def _update_ground_op_hist(self, state: State) -> None:
"""Should be called when an NSRT has just terminated."""
nsrt = self._last_executed_nsrt
exploration_indicator = self._last_executed_nsrt_was_exploration
if nsrt is None:
return
assert exploration_indicator is not None
# NOTE: checking just the add effects doesn't work in general, but
# is probably fine for now. The right thing to do here is check
# the necessary atoms, which we will compute with a utility function
Expand All @@ -322,7 +339,10 @@ def _update_ground_op_hist(self, state: State) -> None:
skill_name = f"{last_executed_op.name}{last_executed_op.objects}"
model = create_competence_model(model_name, skill_name)
self._competence_models[last_executed_op] = model
self._competence_models[last_executed_op].observe(success)
# Only update the competence model if this action was not an
# exploratory action.
if not exploration_indicator:
self._competence_models[last_executed_op].observe(success)

def _get_option_policy_for_task(self,
task: Task) -> Callable[[State], _Option]:
Expand Down
6 changes: 6 additions & 0 deletions predicators/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,12 @@ def __len__(self) -> int:
Datastore = List[Tuple[Segment, VarToObjSub]]
NSRTSampler = Callable[
[State, Set[GroundAtom], np.random.Generator, Sequence[Object]], Array]
# NSRT Sampler that also returns a boolean indicating whether the sample was
# generated randomly (for exploration) or from the current learned
# distribution.
NSRTSamplerWithEpsilonIndicator = Callable[
[State, Set[GroundAtom], np.random.Generator, Sequence[Object]],
Tuple[Array, bool]]
Metrics = DefaultDict[str, float]
LiftedOrGroundAtom = TypeVar("LiftedOrGroundAtom", LiftedAtom, GroundAtom,
_Atom)
Expand Down
Loading

0 comments on commit acc6c25

Please sign in to comment.