diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 10d1348..42ccb88 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,7 +4,7 @@ on: pull_request: branches: [ master ] push: - branches: [ master ] + jobs: test: @@ -33,7 +33,7 @@ jobs: path: ${{ steps.pip-cache.outputs.dir }} key: ${{ runner.os }}-${{ hashFiles('**/setup.py') }}-pip-cache restore-keys: | - ${{ runner.os }}-pip-cache + ${{ runner.os }}-pip-cache - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/docs/backends.rst b/docs/backends.rst index 83734ea..bc5d776 100644 --- a/docs/backends.rst +++ b/docs/backends.rst @@ -34,19 +34,20 @@ amount of round-trips the video frames take between devices. If you wish to use batching in your capsule, you may call the ``send_to_batch`` method in ``process_frame`` instead of doing analysis in that method directly. -The ``send_to_batch`` method collects one or more input objects into a list and -routinely calls your backend's ``batch_predict`` method with this list. As a -result, users of ``send_to_batch`` must override the ``batch_predict`` method -in addition to the other required methods. +The ``send_to_batch`` method sends the input to a ``BatchExecutor`` which collects +inference requests for this capsule from different streams. Then, the +``BatchExecutor`` routinely calls your backend's ``batch_predict`` method with a +list of the collected inputs. As a result, users of ``send_to_batch`` must +override the ``batch_predict`` method in addition to the other required methods. The ``send_to_batch`` method is asynchronous. Instead of immediately returning -analysis results, it returns a ``queue.Queue`` where the result will be provided. +analysis results, it returns a ``concurrent.futures.Future`` where the result will be provided. Simple batching capsules may call ``send_to_batch``, then immediately call -``get`` to block for the result. +``result`` to block for the result. .. code-block:: python - result = self.send_to_batch(frame).get() + result = self.send_to_batch(frame).result() An argument of any type may be provided to ``send_to_batch``, as the argument will be passed in a list to ``batch_predict`` without modification. In many diff --git a/tests/test_batch_executor.py b/tests/test_batch_executor.py new file mode 100644 index 0000000..46513f1 --- /dev/null +++ b/tests/test_batch_executor.py @@ -0,0 +1,127 @@ +import random +from concurrent.futures import Future +from typing import Any, Generator, List, Tuple + +import pytest + +from vcap.batch_executor import BatchExecutor, _Request + + +@pytest.fixture() +def batch_executor(): + """To use this fixture, replace batch_executor.batch_fn with your own + batch function.""" + + def batch_fn(inputs): + raise NotImplemented + + batch_executor = BatchExecutor(batch_fn=batch_fn) + yield batch_executor + batch_executor.close() + + +def batch_fn_base(inputs: List[int], raises: bool) \ + -> Generator[Any, None, None]: + """Process results and yield them as they are processed + + This function is to be used as a base for other test cases for batch_fn + variants. + + :param inputs: A list of inputs + :param raises: If True, raises an error on the 5th input. If False, + no exception will be raised. + """ + for i in inputs: + if i == 5 and raises: + # This occurs on the 5th input if raises=True. + # This is used to test BatchExecutor's handling of exceptions + raise RuntimeError("Oh no, a batch_fn error has occurred!") + yield i * 100 + + +def batch_fn_returns_generator(inputs: List[int]) \ + -> Generator[Any, None, None]: + return (o for o in batch_fn_base(inputs, raises=False)) + + +def batch_fn_returns_generator_raises(inputs: List[int]) \ + -> Generator[Any, None, None]: + return (o for o in batch_fn_base(inputs, raises=True)) + + +def batch_fn_returns_list(inputs: List[int]) -> List[Any]: + """Process results and yield them at the end, as a list.""" + return list(batch_fn_base(inputs, raises=False)) + + +def batch_fn_returns_list_raises(inputs: List[int]) -> List[Any]: + return list(batch_fn_base(inputs, raises=True)) + + +@pytest.mark.parametrize( + argnames=["batch_fn", "expect_partial_results"], + argvalues=[ + (batch_fn_returns_generator_raises, True), + (batch_fn_returns_list_raises, False) + ] +) +def test_exceptions_during_batch_fn( + batch_executor, batch_fn, expect_partial_results): + """Test that BatchExecutor catches exceptions that occur in the batch_fn + and propagates them through the requests Future objects. + + If an exception occurs after processing some of the batch, the expectation + is that the unprocessed inputs of the batch will get an exception + set (expect_partial_results=True). If the exception happens before + receiving any results, all future objects should have exceptions set. + """ + batch_executor.batch_fn = batch_fn + request_batch = [ + _Request( + future=Future(), + input_data=i) + for i in range(10) + ] + batch_executor._on_requests_ready(request_batch) + for i, request in enumerate(request_batch): + if expect_partial_results and i < 5: + result = request.future.result(timeout=5) + assert result == request.input_data * 100, \ + "The result for this future doesn't match the input that " \ + "was supposed to have been routed to it!" + else: + with pytest.raises(RuntimeError): + request.future.result(timeout=5) + + +@pytest.mark.parametrize( + argnames=["batch_fn"], + argvalues=[ + (batch_fn_returns_generator,), + (batch_fn_returns_list,) + ] +) +def test_relevant_input_outputs_match(batch_executor, batch_fn): + """Test the output for any given input is routed to the correct + Future object. """ + batch_executor.batch_fn = batch_fn + + # Submit input values in a random order + request_inputs = list(range(10000)) + random.seed("vcap? More like vgood") + random.shuffle(request_inputs) + + # Submit inputs to the BatchExecutor and keep track of their futures + inputs_and_futures: List[Tuple[int, Future]] = [] + for input_data in request_inputs: + future = batch_executor.submit(input_data) + inputs_and_futures.append((input_data, future)) + + # Verify that all outputs are the expected ones for their respective input + for input_data, future in inputs_and_futures: + result = future.result(timeout=5) + assert result == input_data * 100, \ + "The result for this future doesn't match the input that " \ + "was supposed to have been routed to it!" + + assert batch_executor.total_imgs_in_pipeline == 0 diff --git a/vcap/examples/classifier_gait_example/backend.py b/vcap/examples/classifier_gait_example/backend.py index 754b420..bb7ad85 100644 --- a/vcap/examples/classifier_gait_example/backend.py +++ b/vcap/examples/classifier_gait_example/backend.py @@ -20,7 +20,7 @@ def process_frame(self, frame: np.ndarray, .pad_percent(top=10, bottom=10, left=10, right=10) .apply(frame)) - prediction = self.send_to_batch(crop).get() + prediction = self.send_to_batch(crop).result() detection_node.attributes[config.category] = prediction.name detection_node.extra_data[config.extra_data] = prediction.confidence diff --git a/vcap/examples/detector_person_example/backend.py b/vcap/examples/detector_person_example/backend.py index 66ff8b7..dc0abe8 100644 --- a/vcap/examples/detector_person_example/backend.py +++ b/vcap/examples/detector_person_example/backend.py @@ -24,7 +24,7 @@ def process_frame(self, frame: np.ndarray, max_height=max_frame_side_length) frame = clamp.apply() - predictions = self.send_to_batch(frame).get() + predictions = self.send_to_batch(frame).result() results = [] diff --git a/vcap/setup.py b/vcap/setup.py index 72faf75..fd04d33 100644 --- a/vcap/setup.py +++ b/vcap/setup.py @@ -1,8 +1,13 @@ #!/usr/bin/env python3 import os +from pathlib import Path from setuptools import setup, find_namespace_packages +# Get package version/metadata +about = {} +exec(Path("vcap/version.py").read_text(), about) + test_packages = ["pytest", "mock"] PRE_RELEASE_SUFFIX = os.environ.get("PRE_RELEASE_SUFFIX", "") @@ -10,9 +15,9 @@ setup( name='vcap', description="A library for creating OpenVisionCapsules in Python", - author="Dilili Labs", + author="Aotu.ai", packages=find_namespace_packages(include=["vcap*"]), - version="0.2.5" + PRE_RELEASE_SUFFIX, + version=about["__version__"] + PRE_RELEASE_SUFFIX, install_requires=[ "pycryptodomex==3.9.7", diff --git a/vcap/vcap/__init__.py b/vcap/vcap/__init__.py index 5a87a36..9392f4b 100644 --- a/vcap/vcap/__init__.py +++ b/vcap/vcap/__init__.py @@ -1,3 +1,5 @@ +from .version import __version__ +from .deprecation import deprecated from .capsule import BaseCapsule from .stream_state import BaseStreamState from .backend import BaseBackend diff --git a/vcap/vcap/backend.py b/vcap/vcap/backend.py index 6ff5f87..23d76bf 100644 --- a/vcap/vcap/backend.py +++ b/vcap/vcap/backend.py @@ -1,11 +1,11 @@ import abc -from queue import Queue -from typing import Any, Dict, List, Union +from concurrent.futures import Future +from typing import Any, Dict, List import numpy as np +from vcap.batch_executor import BatchExecutor from vcap.node_description import DETECTION_NODE_TYPE -from vcap.ovens import Oven from vcap.options import OPTION_TYPE from vcap.stream_state import BaseStreamState @@ -16,18 +16,18 @@ class BaseBackend(abc.ABC): """ def __init__(self): - self._oven = Oven(self.batch_predict) + self._batch_executor = BatchExecutor(self.batch_predict) - def send_to_batch(self, input_data: Any) -> Queue: + def send_to_batch(self, input_data: Any) -> Future: """Sends the given object to the batch_predict method for processing. This call does not block. Instead, the result will be provided on the - returned queue. The batch_predict method must be overridden on the + returned Future. The batch_predict method must be overridden on the backend this method is being called on. :param input_data: The input object to send to batch_predict - :return: A queue where results will be stored + :return: A Future where results will be stored """ - return self._oven.submit(input_data) + return self._batch_executor.submit(input_data) @property def workload(self) -> float: @@ -36,7 +36,7 @@ def workload(self) -> float: is intended to give the scheduler the ability to pick the least busy backend. """ - return self._oven.total_imgs_in_pipeline + return self._batch_executor.total_imgs_in_pipeline @abc.abstractmethod def process_frame(self, @@ -90,4 +90,4 @@ def close(self) -> None: The backend will stop receiving frames before this method is called, and will not receive frames again. """ - self._oven.close() + self._batch_executor.close() diff --git a/vcap/vcap/ovens.py b/vcap/vcap/batch_executor.py similarity index 55% rename from vcap/vcap/ovens.py rename to vcap/vcap/batch_executor.py index 420e6f2..c3f1834 100644 --- a/vcap/vcap/ovens.py +++ b/vcap/vcap/batch_executor.py @@ -1,36 +1,37 @@ -"""Defines ovens. - -Ovens are classes that know how to do some kind of generic work in batches. -""" -import logging import queue +from concurrent.futures import Future from queue import Queue from threading import Thread -from typing import Any, Callable, List, NamedTuple, Optional +from typing import Any, Callable, Iterable, List, NamedTuple, Optional +from vcap import deprecated -class _OvenRequest(NamedTuple): - """A request that is sent to an oven to do some work on an image, and - push predictions into the output_queue - output_queue: The queue for the oven to put the results in - img_bgr: An OpenCV BGR image to run detection on +class _Request(NamedTuple): + """Used by BatchExecutor to keep track of requests and their respective + future objects. """ - output_queue: Queue + + future: Future + """The Future object for the BatchExecutor to return the output""" + input_data: Any + """A unit of input data expected by the batch_fn.""" -class Oven: - """This class simplifies receiving work from a multitude of sources and +class BatchExecutor: + """Feeds jobs into batch_fn in batches, returns results through Futures. + + This class simplifies centralizing work from a multitude of sources and running that work in a batched predict function, then returning that - work to the respective output queues.""" + work to the respective Futures. + """ - def __init__(self, batch_fn: Callable[[List[Any]], List[Any]], + def __init__(self, + batch_fn: Callable[[List[Any]], Iterable[Any]], max_batch_size=40, num_workers: int = 1): - """Initialize a new oven. - that the oven will wait between running a batch regardless of batch - size. + """Initialize a new BatchExecutor :param batch_fn: A function that takes in a list of inputs and iterates the outputs in the same order as the inputs. @@ -39,10 +40,10 @@ def __init__(self, batch_fn: Callable[[List[Any]], List[Any]], """ self.batch_fn = batch_fn self.max_batch_size = max_batch_size - self._request_queue = Queue() + self._request_queue: Queue[_Request] = Queue() self.workers = [Thread(target=self._worker, daemon=True, - name="OvenThread") + name="BatchExecutorThread") for _ in range(num_workers)] # The number of images currently in the work queue or being processed @@ -57,33 +58,44 @@ def __init__(self, batch_fn: Callable[[List[Any]], List[Any]], def total_imgs_in_pipeline(self) -> int: return self._request_queue.qsize() + self._num_imgs_being_processed - def submit(self, input_data: Any, output_queue: Queue = None) -> Queue: - """Creates an OvenRequest for you and returns the output queue""" - output_queue = output_queue if output_queue else Queue() - self._request_queue.put(_OvenRequest( - output_queue=output_queue, + def submit(self, input_data: Any, future: Future = None) -> Future: + """Submits a job and returns a Future that will be fulfilled later.""" + future = future or Future() + + # Add backwards compatibility for 0.2 + future.get = future.result + future.get = deprecated( + message="Use future.result() in place of future.get()", + remove_in="0.3.0" + )(future.get) + + self._request_queue.put(_Request( + future=future, input_data=input_data)) - return output_queue + return future - def _on_requests_ready(self, batch: List[_OvenRequest]) -> None: - """Push images through the given prediction backend + def _on_requests_ready(self, batch: List[_Request]) -> None: + """Push inputs through the given prediction backend :param batch: A list of requests to work on """ - # Extract the images from the requests + # Extract the futures from the requests inputs: List[Any] = [req.input_data for req in batch] - output_queues: List[Queue] = [req.output_queue for req in batch] + futures: List[Future] = [req.future for req in batch] # Route the results to each request - predictions: List[Any] = self.batch_fn(inputs) - - response_count = 0 - for response_count, output in enumerate(predictions, start=1): - output_queues[response_count - 1].put(output) - - if response_count != len(inputs): - logging.error(f"CRITICAL ERROR: Backend returned {response_count} " - f"responses. Expected {len(inputs)}") + try: + for prediction in self.batch_fn(inputs): + # Popping the futures ensures that if an error occurs, only + # the futures that haven't had a result set will have + # set_exception called + futures.pop(0).set_result(prediction) + except BaseException as exc: + # Catch exceptions and pass them to the futures, similar to the + # ThreadPoolExecutor implementation: + # https://github.com/python/cpython/blob/91e93794/Lib/concurrent/futures/thread.py#L51 + for future in futures: + future.set_exception(exc) def _worker(self): self._running = True @@ -106,12 +118,12 @@ def _worker(self): self._running = False - def _get_next_batch(self) -> Optional[List[_OvenRequest]]: + def _get_next_batch(self) -> Optional[List[_Request]]: """A helper function to help make the main thread loop more readable :returns: A non-empty list of collected items, or None if the worker is no longer running (i.e. self._continue == False) """ - batch: List[_OvenRequest] = [] + batch: List[_Request] = [] while len(batch) < self.max_batch_size: # Check if there's a new request try: @@ -135,7 +147,7 @@ def _get_next_batch(self) -> Optional[List[_OvenRequest]]: return batch def close(self) -> None: - """Stop the oven gracefully.""" + """Stop the BatchExecutor gracefully.""" self._running = False for worker in self.workers: worker.join() diff --git a/vcap/vcap/capsule.py b/vcap/vcap/capsule.py index 4333d04..5bfaf86 100644 --- a/vcap/vcap/capsule.py +++ b/vcap/vcap/capsule.py @@ -10,7 +10,7 @@ from .caching import cache from .device_mapping import DeviceMapper, get_all_devices from .node_description import DETECTION_NODE_TYPE, NodeDescription -from .options import Option, OPTION_TYPE +from .options import OPTION_TYPE, Option from .stream_state import BaseStreamState @@ -91,9 +91,11 @@ def process_frame(self, options: Dict[str, OPTION_TYPE], state: BaseStreamState) \ -> DETECTION_NODE_TYPE: - """Find the Backend that has an oven with the least amount of images - in the pipeline, and run with that Backend. In multi-GPU scenarios, - this is the logic that allows even usage across all GPUs.""" + """ + Find the Backend that has a BatchExecutor with the least amount of + images in the pipeline, and run with that Backend. In multi-GPU + scenarios, this is the logic that allows even usage across all GPUs. + """ with self.backends_lock: # Pick the backend with the lowest current 'workload' and # send the image there for inferencing. @@ -119,9 +121,7 @@ def clean_up(self, stream_id: int) -> None: del self._stream_states[stream_id] def close(self) -> None: - """Stops the capsule, de-initializing all backends.""" - # This method MUST close the oven first, then de-initialize the - # backend so as to clear up memory. + """De-initializes all backends.""" if self.backends is not None: with self.backends_lock: for backend in self.backends: diff --git a/vcap/vcap/deprecation.py b/vcap/vcap/deprecation.py new file mode 100644 index 0000000..30603cc --- /dev/null +++ b/vcap/vcap/deprecation.py @@ -0,0 +1,42 @@ +import warnings +from typing import Optional +from pkg_resources import parse_version + +import functools + +from . import __version__ + + +def deprecated(message: str = "", + remove_in: Optional[str] = None, + current_version=__version__): + """Mark a function as deprecated + :param message: Extra details to be added to the warning. For example, + the details may point users to a replacement method. + :param remove_in: The version when the decorated method will be removed. + The default is None, specifying that the function is not currently planned + to be removed. + :param current_version: Defaults to the vcaps current version. + """ + + def wrapper(function): + warning_msg = f"'{function.__qualname__}' is deprecated " + warning_msg += (f"and is scheduled to be removed in {remove_in}. " + if remove_in is not None else ". ") + warning_msg += str(message) + + # Only show the DeprecationWarning on the first call + warnings.simplefilter("once", DeprecationWarning) + + @functools.wraps(function) + def inner(*args, **kwargs): + if remove_in is not None and \ + parse_version(current_version) >= parse_version(remove_in): + raise DeprecationWarning(warning_msg) + else: + warnings.warn(warning_msg, category=DeprecationWarning) + return function(*args, **kwargs) + + return inner + + return wrapper diff --git a/vcap/vcap/testing/input_output_validation.py b/vcap/vcap/testing/input_output_validation.py index e90e5f0..5c6684c 100644 --- a/vcap/vcap/testing/input_output_validation.py +++ b/vcap/vcap/testing/input_output_validation.py @@ -229,9 +229,9 @@ def perform_capsule_tests(unpackaged_capsule_dir: Union[Path, str], # Since the __del__ method was wrapped, mock will not actually call the # underlying method when the object gets garbage collected- thus the - # capsule will never get 'close' called on it, thus the oven threads - # will not close. To fix that, we call 'close' now, and check that - # __del__ WOULD have been called. + # capsule will never get 'close' called on it, thus the BatchExecutor + # threads will not close. To fix that, we call 'close' now, and check + # that __del__ WOULD have been called. capsule.close() # Check that capsule.close() successfully killed any worker threads diff --git a/vcap/vcap/version.py b/vcap/vcap/version.py new file mode 100644 index 0000000..01ef120 --- /dev/null +++ b/vcap/vcap/version.py @@ -0,0 +1 @@ +__version__ = "0.2.6" diff --git a/vcap_utils/setup.py b/vcap_utils/setup.py index b75dae0..ac8de63 100644 --- a/vcap_utils/setup.py +++ b/vcap_utils/setup.py @@ -1,8 +1,13 @@ #!/usr/bin/env python3 import os +from pathlib import Path from setuptools import setup, find_namespace_packages +# Get package version/metadata +about = {} +exec(Path("vcap_utils/version.py").read_text(), about) + test_packages = ["pytest", "mock"] PRE_RELEASE_SUFFIX = os.environ.get("PRE_RELEASE_SUFFIX", "") @@ -10,9 +15,9 @@ setup( name='vcap-utils', description="Utilities for creating OpenVisionCapsules easily in Python", - author="Dilili Labs", + author="Aotu.ai", packages=find_namespace_packages(include=["vcap_utils*"]), - version="0.2.5" + PRE_RELEASE_SUFFIX, + version=about["__version__"] + PRE_RELEASE_SUFFIX, install_requires=[ "vcap", diff --git a/vcap_utils/vcap_utils/__init__.py b/vcap_utils/vcap_utils/__init__.py index 8864ce1..3ca0e86 100644 --- a/vcap_utils/vcap_utils/__init__.py +++ b/vcap_utils/vcap_utils/__init__.py @@ -1,2 +1,3 @@ +from .version import __version__ from .backends import * from .algorithms import * diff --git a/vcap_utils/vcap_utils/backends/base_openvino.py b/vcap_utils/vcap_utils/backends/base_openvino.py index d3370f6..f3a0ce4 100644 --- a/vcap_utils/vcap_utils/backends/base_openvino.py +++ b/vcap_utils/vcap_utils/backends/base_openvino.py @@ -1,11 +1,11 @@ import logging from threading import Event, RLock from typing import Dict, List, Tuple -from queue import Queue +from concurrent.futures import Future import numpy as np -from vcap import Resize, BaseBackend, DetectionNode +from vcap import Resize, BaseBackend, DetectionNode, deprecated _SUPPORTED_METRICS = "SUPPORTED_METRICS" _RANGE_FOR_ASYNC_INFER_REQUESTS = "RANGE_FOR_ASYNC_INFER_REQUESTS" @@ -112,17 +112,17 @@ def workload(self) -> float: This won't affect much unless a custom DeviceMapper filter is used that allows multiple Backends to be loaded, eg, a backend for CPU and a - backend for HDDL. In those cases, this workload measurement will be used - heavily to decide which backend is busier. + backend for HDDL. In those cases, this workload measurement will be + used heavily to decide which backend is busier. """ return self._num_ongoing_requests / self._total_requests - def send_to_batch(self, input_data: OV_INPUT_TYPE) -> Queue: + def send_to_batch(self, input_data: OV_INPUT_TYPE) -> Future: """Efficiently send the input to be inferenced by the network :param input_data: Input to the network - :returns: A queue that will yield 1 result, the output from the network + :returns: A Future that will be filled with the output from the network """ - out_queue = Queue(maxsize=1) + future = Future() with self._get_free_request_lock: # Try to get at least one request @@ -141,7 +141,7 @@ def send_to_batch(self, input_data: OV_INPUT_TYPE) -> Queue: request_free = self._request_free_events[request_id] def on_result(*args): - out_queue.put(request.outputs) + future.set_result(request.outputs) request_free.set() with self._num_ongoing_requests_lock: self._num_ongoing_requests -= 1 @@ -155,7 +155,15 @@ def on_result(*args): with self._num_ongoing_requests_lock: self._num_ongoing_requests += 1 request.async_infer(input_data) - return out_queue + + # Add backwards compatibility for 0.2 + future.get = future.result + future.get = deprecated( + message="Use future.result() in place of future.get()", + remove_in="0.3.0" + )(future.get) + + return future def prepare_inputs(self, frame: np.ndarray, frame_input_name: str = None) \ -> Tuple[OV_INPUT_TYPE, Resize]: diff --git a/vcap_utils/vcap_utils/version.py b/vcap_utils/vcap_utils/version.py new file mode 100644 index 0000000..01ef120 --- /dev/null +++ b/vcap_utils/vcap_utils/version.py @@ -0,0 +1 @@ +__version__ = "0.2.6"