Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Camera interface. #39

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ services:
- "127.0.0.1:5672:5672"
volumes:
- "./config/rabbitmq:/etc/rabbitmq:ro"
# using playback server to test Camera on a stream for now. consider using something more general like gstreamer's videotestsrc?
playback-server:
image: waggle/wes-playback-server:0.1.0
ports:
- "127.0.0.1:8090:8090"
1 change: 0 additions & 1 deletion docs/writing-a-plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ source:
architectures:
- "linux/amd64"
- "linux/arm64"
- "linux/arm/v7"
```

This file contains metadata about what your plugin is called and what it's supposed to do. It is used by the [Edge Code Repository](https://portal.sagecontinuum.org/apps/explore) when submitting plugins.
Expand Down
288 changes: 167 additions & 121 deletions src/waggle/data/vision.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from shutil import which
import ffmpeg
import logging
from contextlib import ExitStack, contextmanager


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,13 +92,13 @@ def __init__(self, path, timestamp, format=RGB):
def __enter__(self):
self.capture = cv2.VideoCapture(self.path)
if not self.capture.isOpened():
raise RuntimeError(
f"unable to open video capture for file {self.path!r}"
)
raise RuntimeError(f"unable to open video capture for file {self.path!r}")
self.fps = self.capture.get(cv2.CAP_PROP_FPS)
if self.fps > 100.:
self.fps = 0.
logger.debug(f'pywaggle cannot calculate timestamp because the fps ({self.fps}) is too high.')
if self.fps > 100.0:
self.fps = 0.0
logger.debug(
f"pywaggle cannot calculate timestamp because the fps ({self.fps}) is too high."
)
self.timestamp_delta = 0
else:
self.timestamp_delta = 1 / self.fps
Expand All @@ -113,31 +115,39 @@ def __iter__(self):

def __next__(self):
if self.capture == None or not self.capture.isOpened():
raise RuntimeError("video is not opened. use the Python WITH statement to open the video")
raise RuntimeError(
"video is not opened. use the Python WITH statement to open the video"
)
ok, data = self.capture.read()
if not ok or data is None:
raise StopIteration
# timestamp must be an integer in nanoseconds
approx_timestamp = self.timestamp + int(self.timestamp_delta * self._frame_count * 1e9)
approx_timestamp = self.timestamp + int(
self.timestamp_delta * self._frame_count * 1e9
)
self._frame_count += 1
return ImageSample(data=data, timestamp=approx_timestamp, format=self.format)


INPUT_TYPE_FILE = "file"
INPUT_TYPE_OTHER = "other"


def resolve_device(device):
if isinstance(device, Path):
return resolve_device_from_path(device)
return resolve_device_from_path(device), INPUT_TYPE_FILE
# objects that are not paths or strings are considered already resolved
if not isinstance(device, str):
return device
return device, INPUT_TYPE_OTHER
match = re.match(r"([A-Za-z0-9]+)://(.*)$", device)
# non-url like paths refer to data shim devices
if match is None:
return resolve_device_from_data_config(device)
return resolve_device_from_data_config(device), INPUT_TYPE_OTHER
# return file:// urls as path
if match.group(1) == "file":
return resolve_device_from_path(Path(match.group(2)))
return resolve_device_from_path(Path(match.group(2))), INPUT_TYPE_FILE
# return other urls as-is
return device
return device, INPUT_TYPE_OTHER


def resolve_device_from_path(path):
Expand All @@ -154,74 +164,152 @@ def resolve_device_from_data_config(device):
except KeyError:
raise KeyError(f"missing .handler.args.url field for device {device!r}.")

class Camera:
INPUT_TYPE_FILE = "file"
INPUT_TYPE_OTHER = "other"

class Camera:
def __init__(self, device=0, format=RGB):
self.capture = _Capture(resolve_device(device), format)
match = re.match(r"([A-Za-z0-9]+)://(.*)$", device)
if match is not None and match.group(1) == "file":
self.input_type = self.INPUT_TYPE_FILE
self.es = ExitStack()

device, input_type = resolve_device(device)
self.device = device
self.format = format
if input_type == "file":
self.capture_class = FileCapture
elif input_type == "other":
self.capture_class = StreamCapture
else:
self.input_type = self.INPUT_TYPE_OTHER
raise RuntimeError(f"invalid camera input type for device {device}")

def __enter__(self):
if self.input_type == self.INPUT_TYPE_FILE:
logger.info(f'input is a file. the background thread disabled for grabbing frames')
self.capture.enable_daemon = False
else:
self.capture.enable_daemon = True
self.capture.__enter__()
return self
capture = self.capture_class(self.device, self.format)
self.es.callback(capture.close)
return capture

def __exit__(self, exc_type, exc_val, exc_tb):
self.capture.__exit__(exc_type, exc_val, exc_tb)
self.es.close()

def snapshot(self):
with self.capture:
return self.capture.snapshot()
with self as capture:
return capture.snapshot()

def stream(self):
with self.capture:
yield from self.capture.stream()
with self as capture:
yield from capture.stream()

def record(self, duration, file_path="./sample.mp4", skip_second=1):
return self.capture.record(duration, file_path, skip_second)
if which("ffmpeg") is None:
raise RuntimeError(
"ffmpeg does not exist to record video. please install ffmpeg"
)
# TODO find cross platform option for webcams since likely to be used during tutorials
if isinstance(self.device, int):
c = ffmpeg.input(str(self.device), ss=skip_second)
elif isinstance(self.device, str) and self.device.startswith("rtsp"):
c = ffmpeg.input(self.device, rtsp_transport="tcp", ss=skip_second)
else:
c = ffmpeg.input(self.device, ss=skip_second)
c = ffmpeg.output(
c, file_path, codec="copy", f="mp4", t=duration
).overwrite_output()
timestamp = get_timestamp()

try:
ffmpeg.run(c, quiet=True)
except ffmpeg.Error as e:
raise RuntimeError(f"error while recording: {e.stderr.decode()}")

return VideoSample(path=file_path, timestamp=timestamp)


class FileCapture:
def __init__(self, device, format):
self.device = device
self.format = format

self.capture = cv2.VideoCapture(self.device)
if not self.capture.isOpened():
raise RuntimeError(
f"unable to open video capture for device {self.device!r}"
)

def close(self):
self.capture.release()

def snapshot(self):
return self._grab_frame()

def stream(self):
try:
while True:
yield self._grab_frame()
except RuntimeError:
return

def record(self):
raise RuntimeError(
"Camera already opened. Camera.record must be called outside of a with block."
)

def _grab_frame(self):
ok = self.capture.grab()
if not ok:
raise RuntimeError("failed to grab frame")
timestamp = get_timestamp()
ok, data = self.capture.retrieve()
if not ok:
raise RuntimeError("failed to decode frame")
return ImageSample(data=data, timestamp=timestamp, format=self.format)


class _Capture:
class StreamCapture:
def __init__(self, device, format):
self.device = device
self.format = format
self.context_depth = 0
self.enable_daemon = False

self.capture = cv2.VideoCapture(self.device)
if not self.capture.isOpened():
raise RuntimeError(
f"unable to open video capture for device {self.device!r}"
)

self.lock = threading.Lock()
self.daemon_need_to_stop = threading.Event()
self._ready_for_next_frame = threading.Event()
self.daemon = threading.Thread(target=self._run, daemon=True)
self.lock = threading.Lock()
self.stopped = threading.Event()
threading.Thread(target=self._run, daemon=True).start()

def __enter__(self):
if self.context_depth == 0:
self.capture = cv2.VideoCapture(self.device)
if not self.capture.isOpened():
raise RuntimeError(
f"unable to open video capture for device {self.device!r}"
)
# spin up a thread to keep up with the camera frame rate
if self.enable_daemon:
self.daemon_need_to_stop.clear()
self.daemon.start()
self.context_depth += 1
return self
def close(self):
self.daemon_need_to_stop.set()
self.stopped.wait(timeout=10)
self.capture.release()

def snapshot(self):
return self._grab_frame()

def stream(self):
try:
while True:
yield self._grab_frame()
except RuntimeError:
return

def record(self):
raise RuntimeError(
"Camera already opened. Camera.record must be called outside of a with block."
)

def _grab_frame(self):
if not self._ready_for_next_frame.wait(timeout=10.0):
raise RuntimeError(
"failed to grab a frame from the background thread: timed out"
)
self._ready_for_next_frame.clear()
with acquire_with_timeout(self.lock, timeout=1.0):
timestamp = self.timestamp
ok, data = self.capture.retrieve()
if not ok:
raise RuntimeError("failed to retrieve the taken snapshot")
return ImageSample(data=data, timestamp=timestamp, format=self.format)

def __exit__(self, exc_type, exc_val, exc_tb):
self.context_depth -= 1
if self.context_depth == 0:
if self.enable_daemon:
self.daemon_need_to_stop.set()
self.capture.release()

def _run(self):
# we sleep slighly shorter than FPS to drain the buffer efficiently
# NOTE: OpenCV's FPS get function is inaccurate as a USB webcam gives 1 FPS while
Expand All @@ -232,69 +320,17 @@ def _run(self):
# if fps > 0 and fps < 100:
# sleep = 1 / (fps + 1)
# logging.debug(f'camera FPS is {fps}. the background thread sleeps {sleep} seconds in between grab()')
while not self.daemon_need_to_stop.is_set():
try:
self.lock.acquire()
ok = self.capture.grab()
if not ok:
raise RuntimeError("failed to grab a frame")
self.timestamp = get_timestamp()
finally:
self.lock.release()
self._ready_for_next_frame.set()
time.sleep(sleep)

def grab_frame(self):
if self.daemon.is_alive():
if not self._ready_for_next_frame.wait(timeout=10.):
raise RuntimeError("failed to grab a frame from the background thread: timed out")
self._ready_for_next_frame.clear()
try:
self.lock.acquire(timeout=1)
timestamp = self.timestamp
ok, data = self.capture.retrieve()
if not ok:
raise RuntimeError("failed to retrieve the taken snapshot")
finally:
self.lock.release()
return ImageSample(data=data, timestamp=timestamp, format=self.format)
else:
ok = self.capture.grab()
if not ok:
raise RuntimeError("failed to take a snapshot")
timestamp = get_timestamp()
ok, data = self.capture.retrieve()
if not ok:
raise RuntimeError("failed to retrieve the taken snapshot")
return ImageSample(data=data, timestamp=timestamp, format=self.format)

def snapshot(self):
return self.grab_frame()

def stream(self):
try:
while True:
yield self.grab_frame()
except:
pass

def record(self, duration, file_path="./sample.mp4", skip_second=1):
if which("ffmpeg") == None:
raise RuntimeError("ffmpeg does not exist to record video. please install ffmpeg")
if self.context_depth > 0:
raise RuntimeError(f'the stream {self.device} is already open. please close first or use without the Python\'s WITH statement')
if isinstance(self.device, str) and self.device.startswith("rtsp"):
c = ffmpeg.input(self.device, rtsp_transport="tcp", ss=skip_second)
else:
c = ffmpeg.input(self.device, ss=skip_second)
c = ffmpeg.output(c, file_path, codec="copy", f='mp4', t=duration).overwrite_output()
timestamp = get_timestamp()
_, stderr = ffmpeg.run(c, quiet=True)
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
return VideoSample(path=file_path, timestamp=timestamp)
else:
raise RuntimeError(f'error while recording: {stderr}')

while not self.daemon_need_to_stop.is_set():
with acquire_with_timeout(self.lock, timeout=10.0):
ok = self.capture.grab()
if not ok:
raise RuntimeError("failed to grab a frame")
self.timestamp = get_timestamp()
self._ready_for_next_frame.set()
time.sleep(sleep)
finally:
self.stopped.set()


class ImageFolder:
Expand All @@ -320,3 +356,13 @@ def __getitem__(self, i):

def __repr__(self):
return f"ImageFolder{self.files!r}"


@contextmanager
def acquire_with_timeout(lock, timeout):
if not lock.acquire(timeout=timeout):
raise TimeoutError("timed out when acquiring lock")
try:
yield
finally:
lock.release()
Binary file added tests/test.mp4
Binary file not shown.
Loading