Skip to content

Commit

Permalink
More manual mypy formatting. Fixed a bug that removed an unusedfuncti…
Browse files Browse the repository at this point in the history
…on argument that broke a test
  • Loading branch information
khughes-bdai committed Jan 26, 2024
1 parent 963a3fe commit 5969668
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
6 changes: 3 additions & 3 deletions spot_wrapper/spot_graph_nav.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _get_lease(self) -> Lease:

def _init_current_graph_nav_state(self) -> None:
# Store the most recent knowledge of the state of the robot based on rpc calls.
self._current_graph = None
self._current_graph: typing.Optional[map_pb2.Graph] = None
self._current_edges: typing.Dict[str, typing.List[str]] = {} # maps to_waypoint to list(from_waypoint)
self._current_waypoint_snapshots = {} # maps id to waypoint snapshot
self._current_edge_snapshots = {} # maps id to edge snapshot
Expand Down Expand Up @@ -781,8 +781,8 @@ def _update_waypoints_and_edges(
self, graph: map_pb2.Graph, localization_id: str, logger: logging.Logger
) -> typing.Tuple[typing.Dict[str, str], typing.Dict[str, str]]:
"""Update and print waypoint ids and edge ids."""
name_to_id = dict()
edges = dict()
name_to_id: typing.Dict[str, str] = dict()
edges: typing.Dict[str, str] = dict()

short_code_to_count = {}
waypoint_to_timestamp = []
Expand Down
68 changes: 49 additions & 19 deletions spot_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
manipulation_api_pb2,
point_cloud_pb2,
robot_command_pb2,
robot_id_pb2,
robot_state_pb2,
world_object_pb2,
)
Expand Down Expand Up @@ -118,14 +119,18 @@ class AsyncRobotState(AsyncPeriodicQuery):
"""

def __init__(
self, client: RobotCommandClient, logger: logging.Logger, rate: float, callback: typing.Callable
self,
client: RobotCommandClient,
logger: logging.Logger,
rate: float,
callback: typing.Optional[typing.Callable],
) -> None:
super(AsyncRobotState, self).__init__("robot-state", client, logger, period_sec=1.0 / max(rate, 1.0))
self._callback = None
if rate > 0.0:
self._callback = callback

def _start_query(self):
def _start_query(self): # TODO [mypy] this probably refers to the ROS future message type
if self._callback:
callback_future = self._client.get_robot_state_async()
callback_future.add_done_callback(self._callback)
Expand All @@ -144,14 +149,18 @@ class AsyncMetrics(AsyncPeriodicQuery):
"""

def __init__(
self, client: RobotCommandClient, logger: logging.Logger, rate: float, callback: typing.Callable
self,
client: RobotCommandClient,
logger: logging.Logger,
rate: float,
callback: typing.Optional[typing.Callable],
) -> None:
super(AsyncMetrics, self).__init__("robot-metrics", client, logger, period_sec=1.0 / max(rate, 1.0))
self._callback = None
if rate > 0.0:
self._callback = callback

def _start_query(self):
def _start_query(self): # TODO [mypy] this probably refers to the ROS future message type
if self._callback:
callback_future = self._client.get_robot_metrics_async()
callback_future.add_done_callback(self._callback)
Expand All @@ -170,14 +179,18 @@ class AsyncLease(AsyncPeriodicQuery):
"""

def __init__(
self, client: RobotCommandClient, logger: logging.Logger, rate: float, callback: typing.Callable
self,
client: RobotCommandClient,
logger: logging.Logger,
rate: float,
callback: typing.Optional[typing.Callable],
) -> None:
super(AsyncLease, self).__init__("lease", client, logger, period_sec=1.0 / max(rate, 1.0))
self._callback = None
if rate > 0.0:
self._callback = callback

def _start_query(self):
def _start_query(self): # TODO [mypy] this probably refers to the ROS future message type
if self._callback:
callback_future = self._client.list_leases_async()
callback_future.add_done_callback(self._callback)
Expand All @@ -194,7 +207,7 @@ def __init__(
client: RobotCommandClient,
logger: logging.Logger,
rate: float,
spot_wrapper,
spot_wrapper, # TODO [mypy] have this recognize SpotWrapper type
) -> None:
"""
Attributes:
Expand Down Expand Up @@ -308,8 +321,9 @@ class AsyncEStopMonitor(AsyncPeriodicQuery):
"""

def __init__(self, client: RobotCommandClient, logger: logging.Logger, rate: float, spot_wrapper) -> None:
# TODO [mypy] have this recognize SpotWrapper type
super(AsyncEStopMonitor, self).__init__("estop_alive", client, logger, period_sec=1.0 / rate)
self._spot_wrapper = spot_wrapper
self._spot_wrapper: SpotWrapper = spot_wrapper

def _start_query(self) -> None:
if not self._spot_wrapper._estop_keepalive:
Expand Down Expand Up @@ -506,7 +520,7 @@ def __init__(
self._idle_task = AsyncIdle(self._robot_command_client, self._logger, 10.0, self)
self._estop_monitor = AsyncEStopMonitor(self._estop_client, self._logger, 20.0, self)

self._estop_endpoint = None
self._estop_endpoint: typing.Optional[EstopEndpoint] = None
self._estop_keepalive: typing.Optional[EstopKeepAlive] = None

robot_tasks = [
Expand Down Expand Up @@ -592,8 +606,8 @@ def __init__(
if self._is_licensed_for_choreography:
self._spot_dance = SpotDance(self._robot, self._choreography_client, self._logger)

self._robot_id: typing.Optional[str] = None
self._lease = None
self._robot_id: typing.Optional[robot_id_pb2.RobotId] = None
self._lease: typing.Optional[lease_pb2.Lease] = None

def decorate_functions(self) -> None:
"""
Expand All @@ -602,7 +616,7 @@ def decorate_functions(self) -> None:
is the case, assuming the get_lease_on_action variable is true. Otherwise, it is up to the user to ensure
that the lease is claimed and the power is on before running commands, otherwise the commands will fail.
"""
decorated_funcs = [
decorated_funcs: typing.List[typing.Callable] = [
self.stop,
self.self_right,
self.sit,
Expand All @@ -615,7 +629,7 @@ def decorate_functions(self) -> None:
self._robot_command,
self._manipulation_request,
]
decorated_funcs_no_power = [
decorated_funcs_no_power: typing.List[typing.Callable] = [
self.stop,
self.power_on,
self.safe_power_off,
Expand Down Expand Up @@ -745,7 +759,7 @@ def is_valid(self) -> bool:
return self._valid

@property
def id(self) -> typing.Optional[str]:
def id(self) -> typing.Optional[robot_id_pb2.RobotId]:
"""Return robot's ID"""
return self._robot_id

Expand Down Expand Up @@ -790,9 +804,12 @@ def world_objects(self) -> world_object_pb2.ListWorldObjectResponse:
return self.spot_world_objects.async_task.proto

@property
def point_clouds(self) -> typing.List[point_cloud_pb2.PointCloudResponse]:
def point_clouds(self) -> typing.Optional[typing.List[point_cloud_pb2.PointCloudResponse]]:
"""Return latest proto from the _point_cloud_task"""
return self.spot_eap_lidar.async_task.proto
if self.spot_eap_lidar:
return self.spot_eap_lidar.async_task.proto
else:
return None

@property
def is_standing(self) -> bool:
Expand Down Expand Up @@ -957,6 +974,8 @@ def assertEStop(self, severe: bool = True) -> typing.Tuple[bool, str]:
severe: Default True - If true, will cut motor power immediately. If false, will try to settle the robot
on the ground first
"""
if self._estop_keepalive is None:
return False, "Already in estop state"
try:
if severe:
self._estop_keepalive.stop()
Expand All @@ -969,6 +988,8 @@ def assertEStop(self, severe: bool = True) -> typing.Tuple[bool, str]:

def disengageEStop(self) -> typing.Tuple[bool, str]:
"""Disengages the E-Stop"""
if self._estop_keepalive is None:
return False, "Already in estop state"
try:
self._estop_keepalive.allow()
return True, "Success"
Expand Down Expand Up @@ -1041,7 +1062,12 @@ def _robot_command(
self._logger.error(f"Unable to execute robot command: {e}")
return False, str(e), None

def _manipulation_request(self, request_proto, end_time_secs=None, timesync_endpoint=None):
def _manipulation_request(
self,
request_proto: manipulation_api_pb2.ManipulationApiRequest,
end_time_secs: typing.Optional[float] = None,
timesync_endpoint: TimeSyncEndpoint = None,
) -> typing.Tuple[bool, str, typing.Optional[str]]:
"""Generic function for sending requests to the manipulation api of a robot.
Args:
Expand Down Expand Up @@ -1316,15 +1342,19 @@ def trajectory_cmd(
self.last_trajectory_command = response[2]
return response[0], response[1]

def robot_command(self, robot_command: robot_command_pb2.RobotCommand) -> typing.Tuple[bool, str]:
def robot_command(
self, robot_command: robot_command_pb2.RobotCommand
) -> typing.Tuple[bool, str, typing.Optional[str]]:
end_time = time.time() + MAX_COMMAND_DURATION
return self._robot_command(
robot_command,
end_time_secs=end_time,
timesync_endpoint=self._robot.time_sync.endpoint,
)

def manipulation_command(self, request):
def manipulation_command(
self, request: manipulation_api_pb2.ManipulationApiRequest
) -> typing.Tuple[bool, str, typing.Optional[str]]:
end_time = time.time() + MAX_COMMAND_DURATION
return self._manipulation_request(
request,
Expand Down

0 comments on commit 5969668

Please sign in to comment.