diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/utilities.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/utilities.py index fddeb56..7233c25 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/utilities.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/utilities.py @@ -7,7 +7,8 @@ import queue import threading import warnings -from collections.abc import Mapping +import weakref +from collections.abc import Mapping, MutableSet from typing import Any, Callable, Generic, Iterator, List, Optional, Tuple, TypeVar, Union import rclpy.clock @@ -142,7 +143,7 @@ def __init__(self, max_length: Optional[int] = None) -> None: max_length: optional maximum tape length. """ self._lock = threading.Lock() - self._streams: List[Tape.Stream[T]] = [] + self._streams: MutableSet[Tape.Stream[T]] = weakref.WeakSet() self._content: Optional[collections.deque] = None if max_length is None or max_length > 0: self._content = collections.deque(maxlen=max_length) @@ -249,7 +250,7 @@ def content( stream: Optional[Tape.Stream] = None if follow and not self._closed: stream = Tape.Stream(buffer_size, label) - self._streams.append(stream) + self._streams.add(stream) def _generator() -> Iterator: nonlocal content, stream diff --git a/bdai_ros2_wrappers/test/test_utilities.py b/bdai_ros2_wrappers/test/test_utilities.py index a286002..faa2f57 100644 --- a/bdai_ros2_wrappers/test/test_utilities.py +++ b/bdai_ros2_wrappers/test/test_utilities.py @@ -4,7 +4,21 @@ import pytest -from bdai_ros2_wrappers.utilities import either_or, ensure, namespace_with +from bdai_ros2_wrappers.utilities import Tape, either_or, ensure, namespace_with + + +def test_tape_drops_unused_streams() -> None: + tape: Tape[int] = Tape(max_length=0) + + stream = tape.content(follow=True) + expected_value = 42 + tape.write(expected_value) + value = next(stream) + assert value == expected_value + + del stream + + assert len(tape._streams) == 0 def test_either_or() -> None: