Skip to content

Commit

Permalink
update creator base for belief planning stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
lf-zhao committed Jan 13, 2025
1 parent 64b9605 commit 7c70d6d
Showing 1 changed file with 15 additions and 57 deletions.
72 changes: 15 additions & 57 deletions predicators/spot_utils/mock_env/mock_env_creator_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
- RGB-D observations with object detections
- Task-specific configurations
- Planning and visualization utilities
- Support for belief space planning (when enabled in environment)
The environment data is stored in a directory specified by CFG.mock_env_data_dir.
This includes:
- graph.json: Contains state transitions and observations
- images/: Directory containing RGB-D images for each state
- transitions/: Directory containing transition graph visualizations
- transitions/transitions.png: Main transition graph visualization
- transitions/single_block.png: Single block task transition graph
- transitions/two_blocks.png: Two blocks task transition graph
- transitions/cup_emptiness.png: Cup emptiness belief task transition graph
Configuration:
mock_env_data_dir (str): Directory to store environment data (set during initialization)
seed (int): Random seed for reproducibility
sesame_task_planning_heuristic (str): Heuristic for task planning
sesame_max_skeletons_optimized (int): Maximum number of skeletons to optimize
sesame_use_necessary_atoms (bool): Whether to use necessary atoms in planning
sesame_check_expected_atoms (bool): Whether to check expected atoms in planning
Example usage:
```python
Expand All @@ -31,15 +28,15 @@
# Create objects and predicates
robot = Object("robot", creator.types["robot"])
block = Object("block", creator.types["movable_object"])
cup = Object("cup", creator.types["container"])
# Create initial and goal atoms
# Create initial and goal atoms for belief space planning
init_atoms = {
GroundAtom(creator.predicates["HandEmpty"], [robot]),
GroundAtom(creator.predicates["On"], [block, table])
GroundAtom(creator.predicates["ContainingWaterUnknown"], [cup])
}
goal_atoms = {
GroundAtom(creator.predicates["Inside"], [block, container])
GroundAtom(creator.predicates["ContainingWaterKnown"], [cup])
}
# Plan and visualize transitions
Expand Down Expand Up @@ -72,14 +69,6 @@
from predicators.settings import CFG
from predicators.planning import task_plan_grounding, task_plan

# Keep the mock action info class but comment it out for now
# class MockActionExtraInfo:
# """Mock action info for mock environment."""
# def __init__(self, name: str, objects: Sequence[Object], fn: Any, fn_args: Tuple) -> None:
# self.action_name = name
# self.operator_objects = objects
# self.real_world_fn = fn
# self.real_world_fn_args = fn_args

class MockEnvCreatorBase(ABC):
"""Base class for mock environment creators.
Expand All @@ -90,6 +79,7 @@ class MockEnvCreatorBase(ABC):
- Define transitions between states
- Generate task-specific state sequences
- Plan and visualize transitions
- Support belief space planning when enabled in environment
The environment data is stored in a directory specified by CFG.mock_env_data_dir,
which is set during initialization. This includes:
Expand All @@ -104,7 +94,7 @@ class MockEnvCreatorBase(ABC):
transitions_dir (str): Directory for transition graph visualizations
env (MockSpotEnv): Mock Spot environment instance
types (Dict[str, Type]): Available object types
predicates (Dict[str, Predicate]): Available predicates
predicates (Dict[str, Predicate]): Available predicates (including belief predicates if enabled)
options (Dict[str, ParameterizedOption]): Available options
nsrts (Set[NSRT]): Available NSRTs
console (Console): Rich console for pretty printing
Expand Down Expand Up @@ -136,67 +126,35 @@ def __init__(self, path_dir: str) -> None:

# Get environment info
self.types: Dict[str, Type] = {t.name: t for t in self.env.types}
# Get predicates directly from environment - this will include belief predicates if enabled
self.predicates: Dict[str, Predicate] = {p.name: p for p in self.env.predicates}

# Add observation predicates
self._add_observation_predicates()

# Create options
self.options: Dict[str, ParameterizedOption] = {o.name: o for o in get_gt_options(self.env.get_name())}

# Add observation options
self._add_observation_options()

# Get NSRTs from factory
self.nsrts: Set[NSRT] = self._create_nsrts()

# Initialize rich console for pretty printing
self.console = Console()

def _add_observation_predicates(self) -> None:
"""Add predicates for cup observation and belief state."""
# Get types
robot_type = self.types["robot"]
container_type = self.types["container"]
movable_type = self.types["movable_object"]

# Create observation predicates
self.predicates.update({
# Keep these predicates but comment out for now
# "ContainingWaterUnknown": Predicate("ContainingWaterUnknown", [container_type], lambda s, o: True),
# "ContainingWaterKnown": Predicate("ContainingWaterKnown", [container_type], lambda s, o: True),
# "ContainingWater": Predicate("ContainingWater", [container_type], lambda s, o: True),
# "NotContainingWater": Predicate("NotContainingWater", [container_type], lambda s, o: True),
# "InHandViewFromTop": Predicate("InHandViewFromTop", [robot_type, movable_type], lambda s, o: True)
})

def _add_observation_options(self) -> None:
"""Add options for cup observation."""
# TODO: Implement observation options after fixing base pick-place functionality
pass

def _create_nsrts(self) -> Set[NSRT]:
"""Create NSRTs including observation operators."""
# Get base NSRTs from factory
"""Create NSRTs from the environment's predicates and options.
Returns NSRTs that can work with both physical and belief predicates
(when belief space operators are enabled in the environment).
"""
# Get NSRTs from factory
factory = MockSpotGroundTruthNSRTFactory()
base_nsrts = factory.get_nsrts(
self.env.get_name(),
self.types,
self.predicates,
self.predicates, # This now uses predicates from env including belief ones if enabled
self.options
)

# TODO: Add observation NSRTs after fixing base pick-place functionality
# observation_nsrts = self._create_observation_nsrts()
# return base_nsrts | observation_nsrts

return base_nsrts

def _create_observation_nsrts(self) -> Set[NSRT]:
"""Create NSRTs for cup observation."""
# TODO: Implement observation NSRTs after fixing base pick-place functionality
return set()

def plan_and_visualize(self, init_atoms: Set[GroundAtom], goal_atoms: Set[GroundAtom],
objects: Set[Object], output_file: str = "transitions",
timeout: float = 10.0) -> None:
Expand Down

0 comments on commit 7c70d6d

Please sign in to comment.