Skip to content

Commit

Permalink
Convert Ovens to use a Future API, and rename Ovens to BatchExecutor
Browse files Browse the repository at this point in the history
This PR also adds a @deprecated wrapper, and changes where version info is stored for vcap/ and vcap_utils/
  • Loading branch information
apockill authored Nov 6, 2020
1 parent 17e0083 commit cdd7923
Show file tree
Hide file tree
Showing 17 changed files with 292 additions and 87 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
branches: [ master ]
push:
branches: [ master ]


jobs:
test:
Expand Down Expand Up @@ -33,7 +33,7 @@ jobs:
path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-${{ hashFiles('**/setup.py') }}-pip-cache
restore-keys: |
${{ runner.os }}-pip-cache
${{ runner.os }}-pip-cache
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
15 changes: 8 additions & 7 deletions docs/backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@ amount of round-trips the video frames take between devices.

If you wish to use batching in your capsule, you may call the ``send_to_batch``
method in ``process_frame`` instead of doing analysis in that method directly.
The ``send_to_batch`` method collects one or more input objects into a list and
routinely calls your backend's ``batch_predict`` method with this list. As a
result, users of ``send_to_batch`` must override the ``batch_predict`` method
in addition to the other required methods.
The ``send_to_batch`` method sends the input to a ``BatchExecutor`` which collects
inference requests for this capsule from different streams. Then, the
``BatchExecutor`` routinely calls your backend's ``batch_predict`` method with a
list of the collected inputs. As a result, users of ``send_to_batch`` must
override the ``batch_predict`` method in addition to the other required methods.

The ``send_to_batch`` method is asynchronous. Instead of immediately returning
analysis results, it returns a ``queue.Queue`` where the result will be provided.
analysis results, it returns a ``concurrent.futures.Future`` where the result will be provided.
Simple batching capsules may call ``send_to_batch``, then immediately call
``get`` to block for the result.
``result`` to block for the result.

.. code-block:: python
result = self.send_to_batch(frame).get()
result = self.send_to_batch(frame).result()
An argument of any type may be provided to ``send_to_batch``, as the argument
will be passed in a list to ``batch_predict`` without modification. In many
Expand Down
127 changes: 127 additions & 0 deletions tests/test_batch_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import random
from concurrent.futures import Future
from typing import Any, Generator, List, Tuple

import pytest

from vcap.batch_executor import BatchExecutor, _Request


@pytest.fixture()
def batch_executor():
"""To use this fixture, replace batch_executor.batch_fn with your own
batch function."""

def batch_fn(inputs):
raise NotImplemented

batch_executor = BatchExecutor(batch_fn=batch_fn)
yield batch_executor
batch_executor.close()


def batch_fn_base(inputs: List[int], raises: bool) \
-> Generator[Any, None, None]:
"""Process results and yield them as they are processed
This function is to be used as a base for other test cases for batch_fn
variants.
:param inputs: A list of inputs
:param raises: If True, raises an error on the 5th input. If False,
no exception will be raised.
"""
for i in inputs:
if i == 5 and raises:
# This occurs on the 5th input if raises=True.
# This is used to test BatchExecutor's handling of exceptions
raise RuntimeError("Oh no, a batch_fn error has occurred!")
yield i * 100


def batch_fn_returns_generator(inputs: List[int]) \
-> Generator[Any, None, None]:
return (o for o in batch_fn_base(inputs, raises=False))


def batch_fn_returns_generator_raises(inputs: List[int]) \
-> Generator[Any, None, None]:
return (o for o in batch_fn_base(inputs, raises=True))


def batch_fn_returns_list(inputs: List[int]) -> List[Any]:
"""Process results and yield them at the end, as a list."""
return list(batch_fn_base(inputs, raises=False))


def batch_fn_returns_list_raises(inputs: List[int]) -> List[Any]:
return list(batch_fn_base(inputs, raises=True))


@pytest.mark.parametrize(
argnames=["batch_fn", "expect_partial_results"],
argvalues=[
(batch_fn_returns_generator_raises, True),
(batch_fn_returns_list_raises, False)
]
)
def test_exceptions_during_batch_fn(
batch_executor, batch_fn, expect_partial_results):
"""Test that BatchExecutor catches exceptions that occur in the batch_fn
and propagates them through the requests Future objects.
If an exception occurs after processing some of the batch, the expectation
is that the unprocessed inputs of the batch will get an exception
set (expect_partial_results=True). If the exception happens before
receiving any results, all future objects should have exceptions set.
"""
batch_executor.batch_fn = batch_fn
request_batch = [
_Request(
future=Future(),
input_data=i)
for i in range(10)
]
batch_executor._on_requests_ready(request_batch)
for i, request in enumerate(request_batch):
if expect_partial_results and i < 5:
result = request.future.result(timeout=5)
assert result == request.input_data * 100, \
"The result for this future doesn't match the input that " \
"was supposed to have been routed to it!"
else:
with pytest.raises(RuntimeError):
request.future.result(timeout=5)


@pytest.mark.parametrize(
argnames=["batch_fn"],
argvalues=[
(batch_fn_returns_generator,),
(batch_fn_returns_list,)
]
)
def test_relevant_input_outputs_match(batch_executor, batch_fn):
"""Test the output for any given input is routed to the correct
Future object. """
batch_executor.batch_fn = batch_fn

# Submit input values in a random order
request_inputs = list(range(10000))
random.seed("vcap? More like vgood")
random.shuffle(request_inputs)

# Submit inputs to the BatchExecutor and keep track of their futures
inputs_and_futures: List[Tuple[int, Future]] = []
for input_data in request_inputs:
future = batch_executor.submit(input_data)
inputs_and_futures.append((input_data, future))

# Verify that all outputs are the expected ones for their respective input
for input_data, future in inputs_and_futures:
result = future.result(timeout=5)
assert result == input_data * 100, \
"The result for this future doesn't match the input that " \
"was supposed to have been routed to it!"

assert batch_executor.total_imgs_in_pipeline == 0
2 changes: 1 addition & 1 deletion vcap/examples/classifier_gait_example/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def process_frame(self, frame: np.ndarray,
.pad_percent(top=10, bottom=10, left=10, right=10)
.apply(frame))

prediction = self.send_to_batch(crop).get()
prediction = self.send_to_batch(crop).result()

detection_node.attributes[config.category] = prediction.name
detection_node.extra_data[config.extra_data] = prediction.confidence
Expand Down
2 changes: 1 addition & 1 deletion vcap/examples/detector_person_example/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def process_frame(self, frame: np.ndarray,
max_height=max_frame_side_length)
frame = clamp.apply()

predictions = self.send_to_batch(frame).get()
predictions = self.send_to_batch(frame).result()

results = []

Expand Down
9 changes: 7 additions & 2 deletions vcap/setup.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
#!/usr/bin/env python3
import os
from pathlib import Path

from setuptools import setup, find_namespace_packages

# Get package version/metadata
about = {}
exec(Path("vcap/version.py").read_text(), about)

test_packages = ["pytest", "mock"]

PRE_RELEASE_SUFFIX = os.environ.get("PRE_RELEASE_SUFFIX", "")

setup(
name='vcap',
description="A library for creating OpenVisionCapsules in Python",
author="Dilili Labs",
author="Aotu.ai",
packages=find_namespace_packages(include=["vcap*"]),
version="0.2.5" + PRE_RELEASE_SUFFIX,
version=about["__version__"] + PRE_RELEASE_SUFFIX,

install_requires=[
"pycryptodomex==3.9.7",
Expand Down
2 changes: 2 additions & 0 deletions vcap/vcap/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .version import __version__
from .deprecation import deprecated
from .capsule import BaseCapsule
from .stream_state import BaseStreamState
from .backend import BaseBackend
Expand Down
20 changes: 10 additions & 10 deletions vcap/vcap/backend.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import abc
from queue import Queue
from typing import Any, Dict, List, Union
from concurrent.futures import Future
from typing import Any, Dict, List

import numpy as np

from vcap.batch_executor import BatchExecutor
from vcap.node_description import DETECTION_NODE_TYPE
from vcap.ovens import Oven
from vcap.options import OPTION_TYPE
from vcap.stream_state import BaseStreamState

Expand All @@ -16,18 +16,18 @@ class BaseBackend(abc.ABC):
"""

def __init__(self):
self._oven = Oven(self.batch_predict)
self._batch_executor = BatchExecutor(self.batch_predict)

def send_to_batch(self, input_data: Any) -> Queue:
def send_to_batch(self, input_data: Any) -> Future:
"""Sends the given object to the batch_predict method for processing.
This call does not block. Instead, the result will be provided on the
returned queue. The batch_predict method must be overridden on the
returned Future. The batch_predict method must be overridden on the
backend this method is being called on.
:param input_data: The input object to send to batch_predict
:return: A queue where results will be stored
:return: A Future where results will be stored
"""
return self._oven.submit(input_data)
return self._batch_executor.submit(input_data)

@property
def workload(self) -> float:
Expand All @@ -36,7 +36,7 @@ def workload(self) -> float:
is intended to give the scheduler the ability to pick the least busy
backend.
"""
return self._oven.total_imgs_in_pipeline
return self._batch_executor.total_imgs_in_pipeline

@abc.abstractmethod
def process_frame(self,
Expand Down Expand Up @@ -90,4 +90,4 @@ def close(self) -> None:
The backend will stop receiving frames before this method is
called, and will not receive frames again.
"""
self._oven.close()
self._batch_executor.close()
Loading

0 comments on commit cdd7923

Please sign in to comment.