From 5969668706108ad1a10a4b9c093909b603ec398c Mon Sep 17 00:00:00 2001 From: Katie Hughes Date: Fri, 26 Jan 2024 13:38:43 -0500 Subject: [PATCH] More manual mypy formatting. Fixed a bug that removed an unusedfunction argument that broke a test --- spot_wrapper/spot_graph_nav.py | 6 +-- spot_wrapper/wrapper.py | 68 ++++++++++++++++++++++++---------- 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/spot_wrapper/spot_graph_nav.py b/spot_wrapper/spot_graph_nav.py index 38ad086..13b3ff9 100644 --- a/spot_wrapper/spot_graph_nav.py +++ b/spot_wrapper/spot_graph_nav.py @@ -53,7 +53,7 @@ def _get_lease(self) -> Lease: def _init_current_graph_nav_state(self) -> None: # Store the most recent knowledge of the state of the robot based on rpc calls. - self._current_graph = None + self._current_graph: typing.Optional[map_pb2.Graph] = None self._current_edges: typing.Dict[str, typing.List[str]] = {} # maps to_waypoint to list(from_waypoint) self._current_waypoint_snapshots = {} # maps id to waypoint snapshot self._current_edge_snapshots = {} # maps id to edge snapshot @@ -781,8 +781,8 @@ def _update_waypoints_and_edges( self, graph: map_pb2.Graph, localization_id: str, logger: logging.Logger ) -> typing.Tuple[typing.Dict[str, str], typing.Dict[str, str]]: """Update and print waypoint ids and edge ids.""" - name_to_id = dict() - edges = dict() + name_to_id: typing.Dict[str, str] = dict() + edges: typing.Dict[str, str] = dict() short_code_to_count = {} waypoint_to_timestamp = [] diff --git a/spot_wrapper/wrapper.py b/spot_wrapper/wrapper.py index 12ba1e2..b02b787 100644 --- a/spot_wrapper/wrapper.py +++ b/spot_wrapper/wrapper.py @@ -10,6 +10,7 @@ manipulation_api_pb2, point_cloud_pb2, robot_command_pb2, + robot_id_pb2, robot_state_pb2, world_object_pb2, ) @@ -118,14 +119,18 @@ class AsyncRobotState(AsyncPeriodicQuery): """ def __init__( - self, client: RobotCommandClient, logger: logging.Logger, rate: float, callback: typing.Callable + self, + client: RobotCommandClient, + logger: logging.Logger, + rate: float, + callback: typing.Optional[typing.Callable], ) -> None: super(AsyncRobotState, self).__init__("robot-state", client, logger, period_sec=1.0 / max(rate, 1.0)) self._callback = None if rate > 0.0: self._callback = callback - def _start_query(self): + def _start_query(self): # TODO [mypy] this probably refers to the ROS future message type if self._callback: callback_future = self._client.get_robot_state_async() callback_future.add_done_callback(self._callback) @@ -144,14 +149,18 @@ class AsyncMetrics(AsyncPeriodicQuery): """ def __init__( - self, client: RobotCommandClient, logger: logging.Logger, rate: float, callback: typing.Callable + self, + client: RobotCommandClient, + logger: logging.Logger, + rate: float, + callback: typing.Optional[typing.Callable], ) -> None: super(AsyncMetrics, self).__init__("robot-metrics", client, logger, period_sec=1.0 / max(rate, 1.0)) self._callback = None if rate > 0.0: self._callback = callback - def _start_query(self): + def _start_query(self): # TODO [mypy] this probably refers to the ROS future message type if self._callback: callback_future = self._client.get_robot_metrics_async() callback_future.add_done_callback(self._callback) @@ -170,14 +179,18 @@ class AsyncLease(AsyncPeriodicQuery): """ def __init__( - self, client: RobotCommandClient, logger: logging.Logger, rate: float, callback: typing.Callable + self, + client: RobotCommandClient, + logger: logging.Logger, + rate: float, + callback: typing.Optional[typing.Callable], ) -> None: super(AsyncLease, self).__init__("lease", client, logger, period_sec=1.0 / max(rate, 1.0)) self._callback = None if rate > 0.0: self._callback = callback - def _start_query(self): + def _start_query(self): # TODO [mypy] this probably refers to the ROS future message type if self._callback: callback_future = self._client.list_leases_async() callback_future.add_done_callback(self._callback) @@ -194,7 +207,7 @@ def __init__( client: RobotCommandClient, logger: logging.Logger, rate: float, - spot_wrapper, + spot_wrapper, # TODO [mypy] have this recognize SpotWrapper type ) -> None: """ Attributes: @@ -308,8 +321,9 @@ class AsyncEStopMonitor(AsyncPeriodicQuery): """ def __init__(self, client: RobotCommandClient, logger: logging.Logger, rate: float, spot_wrapper) -> None: + # TODO [mypy] have this recognize SpotWrapper type super(AsyncEStopMonitor, self).__init__("estop_alive", client, logger, period_sec=1.0 / rate) - self._spot_wrapper = spot_wrapper + self._spot_wrapper: SpotWrapper = spot_wrapper def _start_query(self) -> None: if not self._spot_wrapper._estop_keepalive: @@ -506,7 +520,7 @@ def __init__( self._idle_task = AsyncIdle(self._robot_command_client, self._logger, 10.0, self) self._estop_monitor = AsyncEStopMonitor(self._estop_client, self._logger, 20.0, self) - self._estop_endpoint = None + self._estop_endpoint: typing.Optional[EstopEndpoint] = None self._estop_keepalive: typing.Optional[EstopKeepAlive] = None robot_tasks = [ @@ -592,8 +606,8 @@ def __init__( if self._is_licensed_for_choreography: self._spot_dance = SpotDance(self._robot, self._choreography_client, self._logger) - self._robot_id: typing.Optional[str] = None - self._lease = None + self._robot_id: typing.Optional[robot_id_pb2.RobotId] = None + self._lease: typing.Optional[lease_pb2.Lease] = None def decorate_functions(self) -> None: """ @@ -602,7 +616,7 @@ def decorate_functions(self) -> None: is the case, assuming the get_lease_on_action variable is true. Otherwise, it is up to the user to ensure that the lease is claimed and the power is on before running commands, otherwise the commands will fail. """ - decorated_funcs = [ + decorated_funcs: typing.List[typing.Callable] = [ self.stop, self.self_right, self.sit, @@ -615,7 +629,7 @@ def decorate_functions(self) -> None: self._robot_command, self._manipulation_request, ] - decorated_funcs_no_power = [ + decorated_funcs_no_power: typing.List[typing.Callable] = [ self.stop, self.power_on, self.safe_power_off, @@ -745,7 +759,7 @@ def is_valid(self) -> bool: return self._valid @property - def id(self) -> typing.Optional[str]: + def id(self) -> typing.Optional[robot_id_pb2.RobotId]: """Return robot's ID""" return self._robot_id @@ -790,9 +804,12 @@ def world_objects(self) -> world_object_pb2.ListWorldObjectResponse: return self.spot_world_objects.async_task.proto @property - def point_clouds(self) -> typing.List[point_cloud_pb2.PointCloudResponse]: + def point_clouds(self) -> typing.Optional[typing.List[point_cloud_pb2.PointCloudResponse]]: """Return latest proto from the _point_cloud_task""" - return self.spot_eap_lidar.async_task.proto + if self.spot_eap_lidar: + return self.spot_eap_lidar.async_task.proto + else: + return None @property def is_standing(self) -> bool: @@ -957,6 +974,8 @@ def assertEStop(self, severe: bool = True) -> typing.Tuple[bool, str]: severe: Default True - If true, will cut motor power immediately. If false, will try to settle the robot on the ground first """ + if self._estop_keepalive is None: + return False, "Already in estop state" try: if severe: self._estop_keepalive.stop() @@ -969,6 +988,8 @@ def assertEStop(self, severe: bool = True) -> typing.Tuple[bool, str]: def disengageEStop(self) -> typing.Tuple[bool, str]: """Disengages the E-Stop""" + if self._estop_keepalive is None: + return False, "Already in estop state" try: self._estop_keepalive.allow() return True, "Success" @@ -1041,7 +1062,12 @@ def _robot_command( self._logger.error(f"Unable to execute robot command: {e}") return False, str(e), None - def _manipulation_request(self, request_proto, end_time_secs=None, timesync_endpoint=None): + def _manipulation_request( + self, + request_proto: manipulation_api_pb2.ManipulationApiRequest, + end_time_secs: typing.Optional[float] = None, + timesync_endpoint: TimeSyncEndpoint = None, + ) -> typing.Tuple[bool, str, typing.Optional[str]]: """Generic function for sending requests to the manipulation api of a robot. Args: @@ -1316,7 +1342,9 @@ def trajectory_cmd( self.last_trajectory_command = response[2] return response[0], response[1] - def robot_command(self, robot_command: robot_command_pb2.RobotCommand) -> typing.Tuple[bool, str]: + def robot_command( + self, robot_command: robot_command_pb2.RobotCommand + ) -> typing.Tuple[bool, str, typing.Optional[str]]: end_time = time.time() + MAX_COMMAND_DURATION return self._robot_command( robot_command, @@ -1324,7 +1352,9 @@ def robot_command(self, robot_command: robot_command_pb2.RobotCommand) -> typing timesync_endpoint=self._robot.time_sync.endpoint, ) - def manipulation_command(self, request): + def manipulation_command( + self, request: manipulation_api_pb2.ManipulationApiRequest + ) -> typing.Tuple[bool, str, typing.Optional[str]]: end_time = time.time() + MAX_COMMAND_DURATION return self._manipulation_request( request,