Skip to content

Commit

Permalink
Added more ruff rules (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
amessing-bdai authored Jan 26, 2024
1 parent 6cae625 commit 9fe8e88
Show file tree
Hide file tree
Showing 42 changed files with 512 additions and 411 deletions.
8 changes: 5 additions & 3 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ def __init__(self, action_type: Type[Action], action_name: str, node: Optional[N
self._node.get_logger().info("Found server")

def send_goal_and_wait(
self, action_name: str, goal: Action.Goal, timeout_sec: Optional[float] = None
self,
action_name: str,
goal: Action.Goal,
timeout_sec: Optional[float] = None,
) -> Optional[Action.Result]:
"""Sends an action goal and waits for the result
Expand Down Expand Up @@ -80,8 +83,7 @@ def send_goal_async_handle(
feedback_callback: Optional[Callable[[Action.Feedback], None]] = None,
on_failure_callback: Optional[Callable[[], None]] = None,
) -> ActionHandle:
"""Sends an action goal asynchronously and create an ActionHandle for managing the asynchronous lifecycle of
the action request
"""Sends an action goal asynchronously and create an `ActionHandle`
Args:
action_name (str): A representative name of the action for logging
Expand Down
13 changes: 7 additions & 6 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
from bdai_ros2_wrappers.type_hints import Action


class ActionHandle(object):
"""Handles the two stage process that is getting a result after sending an ActionGoal to an ActionServer as well
class ActionHandle:
"""Handle for the lifecycle of an action.
Handles the two stage process that is getting a result after sending an ActionGoal to an ActionServer as well
as holding the various callbacks for sending an ActionGoal (cancel, failure, feedback, result)
"""

Expand All @@ -23,6 +25,7 @@ def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None, con
Args:
action_name (str): The name of the action (for logging purposes)
logger (Optional[RcutilsLogger]): An optional logger to use. If none is provided, one is created
context (Optional[Context]): An optional ros context
"""
if context is None:
context = get_default_context()
Expand Down Expand Up @@ -72,8 +75,7 @@ def wait_for_result(self, timeout_sec: Optional[float] = None) -> bool:
)

def wait_for_acceptance(self, timeout_sec: Optional[float] = None) -> bool:
"""
Waits until goal is accepted or timeout.
"""Waits until goal is accepted or timeout.
Args:
timeout_sec (Optional[float]): A timeout in seconds. No timeout is used if None
Expand All @@ -86,8 +88,7 @@ def wait_for_acceptance(self, timeout_sec: Optional[float] = None) -> bool:
)

def set_send_goal_future(self, send_goal_future: Future) -> None:
"""Sets the future received from sending the Action.Goal and sets up the callback for when a response is
received"""
"""Sets the future received from sending the `Action.Goal`"""
self._send_goal_future = send_goal_future
self._send_goal_future.add_done_callback(self._goal_response_callback)

Expand Down
29 changes: 26 additions & 3 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/callback_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@


class NonReentrantCallbackGroup(rclpy.callback_groups.CallbackGroup):
"""
A callback group to prevent concurrent execution of the same callback
while allowing it for different callbacks.
"""A callback group to prevent concurrent execution of the same callback while allowing it for different callbacks.
Note this behavior sits in between that offered by
rclpy.callback_groups.MutuallyExclusiveCallbackGroup and
Expand All @@ -23,16 +21,36 @@ class NonReentrantCallbackGroup(rclpy.callback_groups.CallbackGroup):
"""

def __init__(self) -> None:
"""Constructor"""
super().__init__()
self._active_entities: typing.Set[rclpy.executors.WaitableEntityType] = set()
self._lock = threading.Lock()

def can_execute(self, entity: rclpy.executors.WaitableEntityType) -> bool:
"""Determine if a callback for an entity can be executed.
Args:
entity: A subscription, timer, client, service, or waitable instance.
Returns:
`True` if entity callback can be executed, `False` otherwise.
"""
with self._lock:
assert weakref.ref(entity) in self.entities
return entity not in self._active_entities

def beginning_execution(self, entity: rclpy.executors.WaitableEntityType) -> bool:
"""Get permission for the callback from the group to begin executing an entity.
If this returns `True` then `CallbackGroup.ending_execution` must be called after
the callback has been executed.
Arg:
entity:A subscription, timer, client, service, or waitable instance.
Returns:
`True` if the callback can be executed, `False` otherwise.
"""
with self._lock:
assert weakref.ref(entity) in self.entities
if entity not in self._active_entities:
Expand All @@ -41,6 +59,11 @@ def beginning_execution(self, entity: rclpy.executors.WaitableEntityType) -> boo
return False

def ending_execution(self, entity: rclpy.executors.WaitableEntityType) -> None:
"""Notify group that a callback has finished executing.
Args:
entity: A subscription, timer, client, service, or waitable instance.
"""
with self._lock:
assert entity in self._active_entities
self._active_entities.remove(entity)
3 changes: 1 addition & 2 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@


def wait_for_shutdown(*, timeout_sec: Optional[float] = None, context: Optional[Context] = None) -> bool:
"""
Wait for context shutdown.
"""Wait for context shutdown.
Args:
timeout_sec: optional timeout for wait, wait indefinitely by default.
Expand Down
72 changes: 44 additions & 28 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@


class AutoScalingThreadPool(concurrent.futures.Executor):
"""
A concurrent.futures.Executor subclass based on a thread pool.
"""A concurrent.futures.Executor subclass based on a thread pool.
Akin to the concurrent.futures.ThreadPoolExecutor class but with
autoscaling capabilities. Within a given range, the number of
Expand Down Expand Up @@ -109,8 +108,7 @@ class Worker(threading.Thread):
"""A worker in its own daemonized OS thread."""

def __init__(self, executor_weakref: weakref.ref, stop_on_timeout: bool = True) -> None:
"""
Initializes the worker.
"""Initializes the worker.
Args:
executor_weakref: a weak reference to the parent autoscaling thread pool.
Expand Down Expand Up @@ -209,8 +207,7 @@ def __init__(
max_idle_time: typing.Optional[float] = None,
logger: typing.Optional[logging.Logger] = None,
):
"""
Initializes the thread pool.
"""Initializes the thread pool.
Args:
min_workers: optional minimum number of workers in the pool, 0 by default.
Expand Down Expand Up @@ -273,7 +270,7 @@ def __init__(
self._scaling_event = threading.Condition()

self._waitqueues: typing.Dict[typing.Callable[..., typing.Any], collections.deque] = collections.defaultdict(
collections.deque
collections.deque,
)
self._runlists: typing.Dict[typing.Callable[..., typing.Any], typing.List[AutoScalingThreadPool.Work]] = (
collections.defaultdict(list)
Expand Down Expand Up @@ -334,8 +331,7 @@ def capped(self) -> bool:
return any(work.pending() for waitqueue in self._waitqueues.values() for work in waitqueue)

def wait(self, timeout: typing.Optional[float] = None) -> bool:
"""
Waits for all work in the pool to complete.
"""Waits for all work in the pool to complete.
Only ongoing work at the time of invocation is watched after.
Work added during the wait will not be considered.
Expand Down Expand Up @@ -403,10 +399,13 @@ def _do_submit(self, work: "AutoScalingThreadPool.Work") -> None:
# NOTE(hidmic): cannot recreate type signature for method override
# See https://github.com/python/typeshed/blob/main/stdlib/concurrent/futures/_base.pyi.
def submit( # type: ignore
self, fn: typing.Callable[..., typing.Any], /, *args: typing.Any, **kwargs: typing.Any
self,
fn: typing.Callable[..., typing.Any],
/,
*args: typing.Any,
**kwargs: typing.Any,
) -> concurrent.futures.Future:
"""
Submits work to the pool.
"""Submits work to the pool.
Args:
fn: a callable to execute. Must be immutable and hashable for
Expand Down Expand Up @@ -457,8 +456,7 @@ def submit( # type: ignore
return future

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
"""
Shuts down the pool.
"""Shuts down the pool.
Args:
wait: whether to wait for all worker threads to shutdown.
Expand Down Expand Up @@ -499,8 +497,7 @@ def predicate() -> bool:


class AutoScalingMultiThreadedExecutor(rclpy.executors.Executor):
"""
An rclpy.executors.Executor subclass based on an AutoScalingThreadPool.
"""An rclpy.executors.Executor subclass based on an AutoScalingThreadPool.
Akin to the rclpy.executors.MultiThreadedExecutor class but with autoscaling capabilities.
Moreover, a concurrency quota can be defined on a per callback + callback group basis to
Expand All @@ -519,12 +516,17 @@ def __init__(self, task: rclpy.task.Task, entity: typing.Optional[rclpy.executor
self.callback_group = entity.callback_group if entity is not None else None

def __call__(self) -> None:
"""Run or resume a task
See rclpy.task.Task documentation for further reference.
"""
self.task.__call__()

def __getattr__(self, name: str) -> typing.Any:
return getattr(self.task, name)

def cancel(self) -> None:
"""Cancels the task"""
# This is a re-implementation to cope with rclpy.task.Task
# leaving coroutines unawaited upon cancellation.
schedule_callbacks = False
Expand Down Expand Up @@ -552,8 +554,7 @@ def __init__(
context: typing.Optional[rclpy.context.Context] = None,
logger: typing.Optional[logging.Logger] = None,
) -> None:
"""
Initializes the executor.
"""Initializes the executor.
Args:
max_threads: optional maximum number of threads to spin at any given time.
Expand All @@ -564,6 +565,8 @@ def __init__(
max_threads_per_callback_group: optional maximum number of concurrent
callbacks to service for a given callback group. Useful to avoid
reentrant callback groups from starving the pool.
context: An optional instance of the ros context
logger: An optional logger instance
"""
super().__init__(context=context)
if logger is None:
Expand Down Expand Up @@ -610,11 +613,11 @@ def _do_spin_once(self, *args: typing.Any, **kwargs: typing.Any) -> None:
for task in list(self._wip):
if task.done():
del self._wip[task]
try:

# ignore concurrent entity destruction
with contextlib.suppress(rclpy.executors.InvalidHandle):
task.result()
except rclpy.executors.InvalidHandle:
# ignore concurrent entity destruction
pass

except rclpy.executors.ConditionReachedException:
pass
except rclpy.executors.ExternalShutdownException:
Expand All @@ -625,15 +628,29 @@ def _do_spin_once(self, *args: typing.Any, **kwargs: typing.Any) -> None:
pass

def spin_once(self, timeout_sec: typing.Optional[float] = None) -> None:
"""Complete all immediately available work"""
self._do_spin_once(timeout_sec)

def spin_once_until_future_complete(
self, future: rclpy.task.Future, timeout_sec: typing.Optional[float] = None
self,
future: rclpy.task.Future,
timeout_sec: typing.Optional[float] = None,
) -> None:
"""Complete all work until the provided future is done.
Args:
future: The ros future instance
timeout_sec: The timeout for working
"""
future.add_done_callback(lambda f: self.wake())
self._do_spin_once(timeout_sec, condition=future.done)

def shutdown(self, timeout_sec: typing.Optional[float] = None) -> bool:
"""Shutdown the executor.
Args:
timeout_sec: The timeout for shutting down
"""
with self._shutdown_lock:
# Before actual shutdown and resource cleanup, all pending work
# must be waited on. Work tracking in rclpy.executors.Executor
Expand All @@ -656,8 +673,7 @@ def shutdown(self, timeout_sec: typing.Optional[float] = None) -> bool:

@contextlib.contextmanager
def background(executor: rclpy.executors.Executor) -> typing.Iterator[rclpy.executors.Executor]:
"""
Pushes an executor to a background thread.
"""Pushes an executor to a background thread.
Upon context entry, the executor starts spinning in a background thread.
Upon context exit, the executor is shutdown and the background thread is joined.
Expand All @@ -673,7 +689,8 @@ def background(executor: rclpy.executors.Executor) -> typing.Iterator[rclpy.exec
executor.spin_once = bind_to_thread(executor.spin_once, background_thread)
executor.spin_until_future_complete = bind_to_thread(executor.spin_until_future_complete, background_thread)
executor.spin_once_until_future_complete = bind_to_thread(
executor.spin_once_until_future_complete, background_thread
executor.spin_once_until_future_complete,
background_thread,
)
background_thread.start()
try:
Expand All @@ -685,8 +702,7 @@ def background(executor: rclpy.executors.Executor) -> typing.Iterator[rclpy.exec

@contextlib.contextmanager
def foreground(executor: rclpy.executors.Executor) -> typing.Iterator[rclpy.executors.Executor]:
"""
Manages an executor in the current thread.
"""Manages an executor in the current thread.
Upon context exit, the executor is shutdown.
Expand Down
19 changes: 15 additions & 4 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
tree = ET.parse(os.path.join(share_directory, "package.xml"))
version = tree.getroot().find("version")
if packaging.version.parse(version.text) >= packaging.version.parse("3.9.0"):
warnings.warn("TODO: use subloggers in RcutilsLogHandler implementation")
warnings.warn("TODO: use subloggers in RcutilsLogHandler implementation", stacklevel=1)


class RcutilsLogHandler(logging.Handler):
Expand All @@ -43,6 +43,7 @@ class RcutilsLogHandler(logging.Handler):
default_formatter = logging.Formatter("(logging.%(name)s) %(message)s")

def __init__(self, node: Node, level: typing.Union[int, str] = logging.NOTSET) -> None:
"""Constructor"""
super().__init__(level=level)
self.node = node
self.logger = node.get_logger()
Expand All @@ -51,26 +52,36 @@ def __init__(self, node: Node, level: typing.Union[int, str] = logging.NOTSET) -
self.setFormatter(self.default_formatter)

def setLevel(self, level: typing.Union[int, str]) -> None:
"""Sets the threshold for this handler to level.
Logging messages which are less severe than level will be ignored. When a handler is created, the level is set
to NOTSET (which causes all messages to be processed).
"""
super().setLevel(level)
self.logger.set_level(SEVERITY_MAP[self.level])

def emit(self, record: logging.LogRecord) -> None:
"""Do whatever it takes to actually log the specified logging record."""
try:
message = self.format(record)
severity = SEVERITY_MAP[record.levelno]
# NOTE(hidmic): this bypasses the rclpy logger API to avoid the extra meaningless
# computations (e.g. call stack inspection).
impl.rclpy_logging_rcutils_log(
severity, self.logger.name, message, record.funcName, record.pathname, record.lineno
severity,
self.logger.name,
message,
record.funcName,
record.pathname,
record.lineno,
)
except Exception:
self.handleError(record)


@contextlib.contextmanager
def logs_to_ros(node: Node) -> typing.Iterator[None]:
"""
Forwards root `logging.Logger` logs to the ROS 2 logging system.
"""Forwards root `logging.Logger` logs to the ROS 2 logging system.
Note that logs are subject to severity level thresholds and propagation
semantics at both the `logging` module and the ROS 2 logging system. For
Expand Down
Loading

0 comments on commit 9fe8e88

Please sign in to comment.