Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred message filter/feed activation for race-free setups #132

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
self._link.registerCallback(
lambda *msgs: self._tape.write(msgs if len(msgs) > 1 else msgs[0]),
)
node.context.on_shutdown(self._tape.close)
node.context.on_shutdown(self.close)

@property
def link(self) -> Filter:
Expand Down Expand Up @@ -178,10 +178,17 @@ def stream(
timeout_sec=timeout_sec,
)

def close(self) -> None:
"""Closes the message feed."""
def start(self) -> None:
"""Start the message feed."""
self._link.start()

def stop(self) -> None:
"""Stop the message feed."""
self._link.stop()
self._tape.close()

close = stop


class AdaptedMessageFeed(MessageFeed[MessageT]):
"""A message feed decorator to simplify adapter patterns."""
Expand All @@ -190,6 +197,7 @@ def __init__(
self,
feed: MessageFeed,
fn: Callable[..., MessageT],
autostart: bool = True,
mhidalgo-bdai marked this conversation as resolved.
Show resolved Hide resolved
**kwargs: Any,
) -> None:
"""Initializes the message feed.
Expand All @@ -199,19 +207,20 @@ def __init__(
fn: message adapting callable.
kwargs: all other keyword arguments are forwarded
for `MessageFeed` initialization.
autostart: whether to start feeding messages immediately or not.
"""
super().__init__(Adapter(feed.link, fn), **kwargs)
super().__init__(Adapter(feed.link, fn, autostart=autostart), **kwargs)
self._feed = feed

@property
def feed(self) -> MessageFeed:
"""Gets the upstream message feed."""
return self._feed

def close(self) -> None:
"""Closes this message feed and the upstream one as well."""
self._feed.close()
super().close()
def stop(self) -> None:
"""Stop this message feed and the upstream one as well."""
self._feed.stop()
super().stop()


class FramedMessageFeed(MessageFeed[MessageT]):
Expand All @@ -226,6 +235,7 @@ def __init__(
tf_buffer: Optional[tf2_ros.Buffer] = None,
history_length: Optional[int] = None,
node: Optional[Node] = None,
autostart: bool = True,
) -> None:
"""Initializes the message feed.

Expand All @@ -238,6 +248,7 @@ def __init__(
history_length: optional historic data size, defaults to 1.
node: optional node for the underlying native subscription, defaults to
the current process node.
autostart: whether to start feeding messages immediately or not.
"""
if node is None:
node = scope.ensure_node()
Expand All @@ -251,6 +262,7 @@ def __init__(
tf_buffer,
tolerance_sec,
node.get_logger(),
autostart=autostart,
),
history_length=history_length,
node=node,
Expand All @@ -262,10 +274,10 @@ def feed(self) -> MessageFeed[MessageT]:
"""Gets the upstream message feed."""
return self._feed

def close(self) -> None:
"""Closes this message feed and the upstream one as well."""
self._feed.close()
super().close()
def stop(self) -> None:
"""Stop this message feed and the upstream one as well."""
self._feed.stop()
super().stop()


class SynchronizedMessageFeed(MessageFeed):
Expand All @@ -279,6 +291,7 @@ def __init__(
allow_headerless: bool = False,
history_length: Optional[int] = None,
node: Optional[Node] = None,
autostart: bool = True,
) -> None:
"""Initializes the message feed.

Expand All @@ -291,13 +304,15 @@ def __init__(
history_length: optional historic data size, defaults to 1.
node: optional node for the underlying native subscription, defaults to
the current process node.
autostart: whether to start feeding messages immediately or not.
"""
super().__init__(
ApproximateTimeSynchronizer(
[f.link for f in feeds],
queue_size,
delay,
allow_headerless=allow_headerless,
autostart=autostart,
),
history_length=history_length,
node=node,
Expand All @@ -309,8 +324,8 @@ def feeds(self) -> Iterable[MessageFeed]:
"""Gets all aggregated message feeds."""
return self._feeds

def close(self) -> None:
"""Closes this message feed and all upstream ones as well."""
def stop(self) -> None:
"""Stop this message feed and all upstream ones as well."""
for feed in self._feeds:
feed.close()
super().close()
feed.stop()
super().stop()
Loading
Loading