Skip to content

Commit

Permalink
Refactor to avoid using the executor (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Mar 10, 2024
1 parent 47753dc commit e10db85
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 48 deletions.
57 changes: 22 additions & 35 deletions haffmpeg/core.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
"""Base functionality of ffmpeg HA wrapper."""
import asyncio
import functools
import logging
import re
import shlex
import subprocess
from typing import List, Optional
from typing import List, Optional, Set

from .timeout import asyncio_timeout

_LOGGER = logging.getLogger(__name__)

FFMPEG_STDOUT = "stdout"
FFMPEG_STDERR = "stderr"

_BACKGROUND_TASKS: Set[asyncio.Task] = set()


class HAFFmpeg:
"""HA FFmpeg process async.
Expand All @@ -24,10 +26,10 @@ def __init__(self, ffmpeg_bin: str):
self._loop = asyncio.get_running_loop()
self._ffmpeg = ffmpeg_bin
self._argv = None
self._proc = None
self._proc: Optional["asyncio.subprocess.Process"] = None

@property
def process(self) -> subprocess.Popen:
def process(self) -> "asyncio.subprocess.Process":
"""Return a Popen object or None of not running."""
return self._proc

Expand Down Expand Up @@ -112,8 +114,8 @@ async def open(
stderr_pipe: bool = False,
) -> bool:
"""Start a ffmpeg instance and pipe output."""
stdout = subprocess.PIPE if stdout_pipe else subprocess.DEVNULL
stderr = subprocess.PIPE if stderr_pipe else subprocess.DEVNULL
stdout = asyncio.subprocess.PIPE if stdout_pipe else asyncio.subprocess.DEVNULL
stderr = asyncio.subprocess.PIPE if stderr_pipe else asyncio.subprocess.DEVNULL

if self.is_running:
_LOGGER.warning("FFmpeg is already running!")
Expand All @@ -125,16 +127,14 @@ async def open(
# start ffmpeg
_LOGGER.debug("Start FFmpeg with %s", str(self._argv))
try:
proc_func = functools.partial(
subprocess.Popen,
self._argv,
self._proc = await asyncio.create_subprocess_exec(
*self._argv,
bufsize=0,
stdin=subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
stdout=stdout,
stderr=stderr,
close_fds=False,
)
self._proc = await self._loop.run_in_executor(None, proc_func)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("FFmpeg fails %s", err)
self._clear()
Expand All @@ -149,17 +149,14 @@ async def close(self, timeout=5) -> None:
return

# Can't use communicate because we attach the output to a streamreader
def _close():
"""Close ffmpeg."""
self._proc.stdin.write(b"q")
self._proc.wait(timeout=timeout)

# send stop to ffmpeg
try:
await self._loop.run_in_executor(None, _close)
self._proc.stdin.write(b"q")
async with asyncio_timeout(timeout):
await self._proc.wait()
_LOGGER.debug("Close FFmpeg process")

except (subprocess.TimeoutExpired, ValueError):
except (asyncio.TimeoutError, ValueError):
_LOGGER.warning("Timeout while waiting of FFmpeg")
self.kill()

Expand All @@ -169,25 +166,15 @@ def _close():
def kill(self) -> None:
"""Kill ffmpeg job."""
self._proc.kill()
self._loop.run_in_executor(None, self._proc.communicate)
background_task = asyncio.create_task(self._proc.communicate())
_BACKGROUND_TASKS.add(background_task)
background_task.add_done_callback(_BACKGROUND_TASKS.remove)

async def get_reader(self, source=FFMPEG_STDOUT) -> asyncio.StreamReader:
"""Create and return streamreader."""
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)

# Attach stream
if source == FFMPEG_STDOUT:
await self._loop.connect_read_pipe(
lambda: reader_protocol, self._proc.stdout
)
else:
await self._loop.connect_read_pipe(
lambda: reader_protocol, self._proc.stderr
)

# Start reader
return reader
return self._proc.stdout
return self._proc.stderr


class HAFFmpegWorker(HAFFmpeg):
Expand Down Expand Up @@ -234,7 +221,7 @@ async def _process_lines(self, pattern: Optional[str] = None) -> None:
await self._queue.put(line)

try:
await self._loop.run_in_executor(None, self._proc.wait)
await self._proc.wait()
finally:
await self._queue.put(None)
_LOGGER.debug("Stopped reading ffmpeg output.")
Expand Down
9 changes: 4 additions & 5 deletions haffmpeg/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import logging
import re
from time import time
from typing import Callable, Optional, Coroutine

import async_timeout
from typing import Callable, Coroutine, Optional

from .core import FFMPEG_STDOUT, HAFFmpegWorker
from .timeout import asyncio_timeout

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,7 +71,7 @@ async def _worker_process(self) -> None:
while True:
try:
_LOGGER.debug("Reading State: %d, timeout: %s", state, timeout)
with async_timeout.timeout(timeout):
async with asyncio_timeout(timeout):
data = await self._queue.get()
timeout = None
if data is None:
Expand Down Expand Up @@ -189,7 +188,7 @@ async def _worker_process(self) -> None:
while True:
try:
_LOGGER.debug("Reading State: %d, timeout: %s", state, timeout)
with async_timeout.timeout(timeout):
async with asyncio_timeout(timeout):
data = await self._queue.get()
if data is None:
self._loop.call_soon(self._callback, None)
Expand Down
8 changes: 8 additions & 0 deletions haffmpeg/timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Timeouts."""
import sys

if sys.version_info[:2] < (3, 11):
# pylint: disable-next=unused-import
from async_timeout import timeout as asyncio_timeout # noqa: F401
else:
from asyncio import timeout as asyncio_timeout # noqa: F401
17 changes: 9 additions & 8 deletions haffmpeg/tools.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""For HA varios tools."""
import functools
import asyncio
import logging
import re
import subprocess
from typing import Optional

from .core import HAFFmpeg
from .timeout import asyncio_timeout

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,12 +40,13 @@ async def get_image(
return None

# read image

try:
proc_func = functools.partial(self._proc.communicate, timeout=timeout)
image, _ = await self._loop.run_in_executor(None, proc_func)
async with asyncio_timeout(timeout):
image, _ = await self._proc.communicate()
return image

except (subprocess.TimeoutExpired, ValueError):
except (asyncio.TimeoutError, ValueError):
_LOGGER.warning("Timeout reading image.")
self.kill()
return None
Expand All @@ -71,14 +72,14 @@ async def get_version(self, timeout: int = 15) -> Optional[str]:

# read output
try:
proc_func = functools.partial(self._proc.communicate, timeout=timeout)
output, _ = await self._loop.run_in_executor(None, proc_func)
async with asyncio_timeout(timeout):
output, _ = await self._proc.communicate()

result = re.search(r"ffmpeg version (\S*)", output.decode())
if result is not None:
return result.group(1)

except (subprocess.TimeoutExpired, ValueError):
except (asyncio.TimeoutError, ValueError):
_LOGGER.warning("Timeout reading stdout.")
self.kill()

Expand Down

0 comments on commit e10db85

Please sign in to comment.