diff --git a/predicators/spot_utils/mock_env/mock_env_creator_base.py b/predicators/spot_utils/mock_env/mock_env_creator_base.py index fcfc160d8..6f2505e12 100644 --- a/predicators/spot_utils/mock_env/mock_env_creator_base.py +++ b/predicators/spot_utils/mock_env/mock_env_creator_base.py @@ -5,6 +5,7 @@ - RGB-D observations with object detections - Task-specific configurations - Planning and visualization utilities +- Support for belief space planning (when enabled in environment) The environment data is stored in a directory specified by CFG.mock_env_data_dir. This includes: @@ -12,8 +13,6 @@ - images/: Directory containing RGB-D images for each state - transitions/: Directory containing transition graph visualizations - transitions/transitions.png: Main transition graph visualization -- transitions/single_block.png: Single block task transition graph -- transitions/two_blocks.png: Two blocks task transition graph - transitions/cup_emptiness.png: Cup emptiness belief task transition graph Configuration: @@ -21,8 +20,6 @@ seed (int): Random seed for reproducibility sesame_task_planning_heuristic (str): Heuristic for task planning sesame_max_skeletons_optimized (int): Maximum number of skeletons to optimize - sesame_use_necessary_atoms (bool): Whether to use necessary atoms in planning - sesame_check_expected_atoms (bool): Whether to check expected atoms in planning Example usage: ```python @@ -31,15 +28,15 @@ # Create objects and predicates robot = Object("robot", creator.types["robot"]) - block = Object("block", creator.types["movable_object"]) + cup = Object("cup", creator.types["container"]) - # Create initial and goal atoms + # Create initial and goal atoms for belief space planning init_atoms = { GroundAtom(creator.predicates["HandEmpty"], [robot]), - GroundAtom(creator.predicates["On"], [block, table]) + GroundAtom(creator.predicates["ContainingWaterUnknown"], [cup]) } goal_atoms = { - GroundAtom(creator.predicates["Inside"], [block, container]) + GroundAtom(creator.predicates["ContainingWaterKnown"], [cup]) } # Plan and visualize transitions @@ -72,14 +69,6 @@ from predicators.settings import CFG from predicators.planning import task_plan_grounding, task_plan -# Keep the mock action info class but comment it out for now -# class MockActionExtraInfo: -# """Mock action info for mock environment.""" -# def __init__(self, name: str, objects: Sequence[Object], fn: Any, fn_args: Tuple) -> None: -# self.action_name = name -# self.operator_objects = objects -# self.real_world_fn = fn -# self.real_world_fn_args = fn_args class MockEnvCreatorBase(ABC): """Base class for mock environment creators. @@ -90,6 +79,7 @@ class MockEnvCreatorBase(ABC): - Define transitions between states - Generate task-specific state sequences - Plan and visualize transitions + - Support belief space planning when enabled in environment The environment data is stored in a directory specified by CFG.mock_env_data_dir, which is set during initialization. This includes: @@ -104,7 +94,7 @@ class MockEnvCreatorBase(ABC): transitions_dir (str): Directory for transition graph visualizations env (MockSpotEnv): Mock Spot environment instance types (Dict[str, Type]): Available object types - predicates (Dict[str, Predicate]): Available predicates + predicates (Dict[str, Predicate]): Available predicates (including belief predicates if enabled) options (Dict[str, ParameterizedOption]): Available options nsrts (Set[NSRT]): Available NSRTs console (Console): Rich console for pretty printing @@ -136,67 +126,35 @@ def __init__(self, path_dir: str) -> None: # Get environment info self.types: Dict[str, Type] = {t.name: t for t in self.env.types} + # Get predicates directly from environment - this will include belief predicates if enabled self.predicates: Dict[str, Predicate] = {p.name: p for p in self.env.predicates} - # Add observation predicates - self._add_observation_predicates() - # Create options self.options: Dict[str, ParameterizedOption] = {o.name: o for o in get_gt_options(self.env.get_name())} - # Add observation options - self._add_observation_options() - # Get NSRTs from factory self.nsrts: Set[NSRT] = self._create_nsrts() # Initialize rich console for pretty printing self.console = Console() - def _add_observation_predicates(self) -> None: - """Add predicates for cup observation and belief state.""" - # Get types - robot_type = self.types["robot"] - container_type = self.types["container"] - movable_type = self.types["movable_object"] - - # Create observation predicates - self.predicates.update({ - # Keep these predicates but comment out for now - # "ContainingWaterUnknown": Predicate("ContainingWaterUnknown", [container_type], lambda s, o: True), - # "ContainingWaterKnown": Predicate("ContainingWaterKnown", [container_type], lambda s, o: True), - # "ContainingWater": Predicate("ContainingWater", [container_type], lambda s, o: True), - # "NotContainingWater": Predicate("NotContainingWater", [container_type], lambda s, o: True), - # "InHandViewFromTop": Predicate("InHandViewFromTop", [robot_type, movable_type], lambda s, o: True) - }) - - def _add_observation_options(self) -> None: - """Add options for cup observation.""" - # TODO: Implement observation options after fixing base pick-place functionality - pass - def _create_nsrts(self) -> Set[NSRT]: - """Create NSRTs including observation operators.""" - # Get base NSRTs from factory + """Create NSRTs from the environment's predicates and options. + + Returns NSRTs that can work with both physical and belief predicates + (when belief space operators are enabled in the environment). + """ + # Get NSRTs from factory factory = MockSpotGroundTruthNSRTFactory() base_nsrts = factory.get_nsrts( self.env.get_name(), self.types, - self.predicates, + self.predicates, # This now uses predicates from env including belief ones if enabled self.options ) - # TODO: Add observation NSRTs after fixing base pick-place functionality - # observation_nsrts = self._create_observation_nsrts() - # return base_nsrts | observation_nsrts - return base_nsrts - def _create_observation_nsrts(self) -> Set[NSRT]: - """Create NSRTs for cup observation.""" - # TODO: Implement observation NSRTs after fixing base pick-place functionality - return set() - def plan_and_visualize(self, init_atoms: Set[GroundAtom], goal_atoms: Set[GroundAtom], objects: Set[Object], output_file: str = "transitions", timeout: float = 10.0) -> None: