Skip to content

Commit

Permalink
major update to mock spot/robot perceiver; still in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
lf-zhao committed Jan 14, 2025
1 parent 25b0beb commit 5750b91
Showing 1 changed file with 66 additions and 57 deletions.
123 changes: 66 additions & 57 deletions predicators/perception/mock_spot_perceiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@

from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Set
from typing import Dict, Optional, Set

import numpy as np

from predicators.envs import get_or_create_env
from predicators.spot_utils.perception.perception_structs import RGBDImageWithContext
from predicators.perception.base_perceiver import BasePerceiver
from predicators.structs import Action, DefaultState, EnvironmentTask, GoalDescription, GroundAtom, Observation, Predicate, State, Task, Video
from predicators.settings import CFG


class MockSpotPerceiver:
class MockSpotPerceiver(BasePerceiver):
"""A mock perceiver for the Spot environment that returns simulated observations.
This class provides a simplified interface for testing perception-dependent behaviors
Expand All @@ -54,9 +58,7 @@ class MockSpotPerceiver:
- Objects visible to the robot
- Objects held in the gripper
- Gripper state (open/closed)
The perceiver is designed to work with MockSpotEnv for testing planning and
manipulation strategies that depend on visual perception.
- VLM predicates and atoms for perception-based planning
"""

def __init__(self, data_dir: str) -> None:
Expand All @@ -67,84 +69,91 @@ def __init__(self, data_dir: str) -> None:
the RGBD images that will be used during testing. Images can be
saved here using the save_image method.
"""
super().__init__()
self._data_dir = Path(data_dir)
self._current_rgbd: Optional[RGBDImageWithContext] = None
self._gripper_open: bool = True
self._objects_in_view: Set[str] = set()
self._objects_in_hand: Set[str] = set()

def get_name(self) -> str:
"""Get the name of this perceiver.

Returns:
The string identifier "mock_spot" for this perceiver type.
"""
# VLM-related state
self._camera_images = None
self._vlm_atom_dict = None
self._vlm_predicates = None
self._curr_env = None

@classmethod
def get_name(cls) -> str:
"""Get the name of this perceiver."""
return "mock_spot"

def get_observation(self) -> "MockSpotObservation":
"""Get the current observation of the environment.
This method returns a MockSpotObservation containing:
- The current RGBD image with camera context (if any)
- Current gripper state
- Set of objects currently visible
- Set of objects currently held
Returns:
A MockSpotObservation containing the current state.
"""
"""Get the current observation of the environment."""
return MockSpotObservation(
rgbd=self._current_rgbd,
gripper_open=self._gripper_open,
objects_in_view=self._objects_in_view,
objects_in_hand=self._objects_in_hand
objects_in_hand=self._objects_in_hand,
images=self._camera_images,
vlm_atom_dict=self._vlm_atom_dict,
vlm_predicates=self._vlm_predicates
)

def save_image(self, rgbd: RGBDImageWithContext) -> None:
"""Save a mock RGBD image to be returned in future observations.
Args:
rgbd: The RGBD image with context to save. This should include:
- RGB and depth image arrays
- Camera pose information
- Camera intrinsics and parameters
- Frame transformation data
"""
"""Save a mock RGBD image to be returned in future observations."""
self._current_rgbd = rgbd

def update_state(self, gripper_open: bool, objects_in_view: Set[str],
objects_in_hand: Set[str]) -> None:
"""Update the mock environment state.
This method allows updating the full state of the environment at once,
which is useful for testing different scenarios and transitions.
Args:
gripper_open: Whether the gripper is open (True) or closed (False)
objects_in_view: Complete set of objects currently visible to the robot.
Previous objects not in this set will be removed from view.
objects_in_hand: Complete set of objects currently held by the gripper.
Previous objects not in this set will be removed from hand.
"""
def update_state(self, gripper_open: bool, objects_in_view: Set[str],
objects_in_hand: Set[str], camera_images=None,
vlm_atom_dict=None, vlm_predicates=None) -> None:
"""Update the current state of the environment."""
self._gripper_open = gripper_open
self._objects_in_view = objects_in_view
self._objects_in_hand = objects_in_hand
if CFG.spot_vlm_eval_predicate:
self._camera_images = camera_images
self._vlm_atom_dict = vlm_atom_dict
self._vlm_predicates = vlm_predicates

def reset(self, env_task: EnvironmentTask) -> Task:
"""Reset the perceiver for a new task."""
self._current_rgbd = None
self._gripper_open = True
self._objects_in_view = set()
self._objects_in_hand = set()
self._camera_images = None
self._vlm_atom_dict = None
self._vlm_predicates = None
self._curr_env = get_or_create_env(CFG.env)
# NOTE: this seems to come from "dry run" version - we don't need it here! check and remove
return env_task.task

def update_perceiver_with_action(self, action: Action) -> None:
"""Update the perceiver with an action."""
pass # No action tracking needed for mock perceiver

def step(self, observation: Observation) -> State:
"""Process a new observation and return the current state."""
assert isinstance(observation, MockSpotObservation)
if CFG.spot_vlm_eval_predicate:
self._camera_images = observation.images
self._vlm_atom_dict = observation.vlm_atom_dict
self._vlm_predicates = observation.vlm_predicates
return DefaultState

def render_mental_images(self, observation: Optional[Observation] = None,
env_task: Optional[EnvironmentTask] = None) -> Video:
"""Render mental images for visualization."""
return [] # No mental image rendering needed for mock perceiver


@dataclass
class MockSpotObservation:
"""An observation from the mock Spot environment.
This dataclass encapsulates all the information that would normally be
perceived by the real Spot robot's sensors, including:
Attributes:
rgbd: Optional RGBD image with camera context. None if no image is available.
gripper_open: Boolean indicating if the gripper is currently open.
objects_in_view: Set of string identifiers for objects currently visible.
objects_in_hand: Set of string identifiers for objects currently held.
"""
"""An observation from the mock Spot environment."""
rgbd: Optional[RGBDImageWithContext]
gripper_open: bool
objects_in_view: Set[str]
objects_in_hand: Set[str]
images: Optional[Dict] = None
vlm_atom_dict: Optional[Dict] = None
vlm_predicates: Optional[Set[Predicate]] = None

0 comments on commit 5750b91

Please sign in to comment.