From 52fe98fb71f38351d1a2086d6b70e8bc60b9c67f Mon Sep 17 00:00:00 2001 From: walesch-yan Date: Tue, 14 Jan 2025 17:27:06 +0100 Subject: [PATCH] update MJPEGCamera --- docs/usage/cameras.md | 21 +++- video_streamer/core/camera.py | 163 +++++++++++++++++++++++++------- video_streamer/core/config.py | 43 ++++++++- video_streamer/core/streamer.py | 10 +- video_streamer/main.py | 31 +++++- 5 files changed, 223 insertions(+), 45 deletions(-) diff --git a/docs/usage/cameras.md b/docs/usage/cameras.md index f8c302c..2ca837d 100644 --- a/docs/usage/cameras.md +++ b/docs/usage/cameras.md @@ -36,6 +36,25 @@ The `MJPEGCamera` provides specialized support for *MJPEG* video streams. It is > **Note**: Currently the `MJPEGCamera` is the only camera that does not support conversion to a `Redis` Pub/Sub channel (more about streaming on a [redis channel](setup.md#dual-streaming-seamlessly-serve-mjpeg-and-redis-pubsub-video-feeds)) +#### Authentication for MJPEG Streams + +Some MJPEG streams may require authentication to access. To support such scenarios, the `MJPEGCamera` class includes built-in authentication support. Currently, both `Basic` and `Digest` authentication methods are supported. + +Below is an example of how to use the video-streamer to access a stream requiring `Basic` authentication: + +```bash +video-streamer -of MPEG1 -uri -auth Basic -user -pass +``` + +##### Explanation of the Parameters: +- `-of`: Specifies the ouput format, here `MPEG1` is used. +- `-uri`: The URL of the MJPEG stream. +- `-auth`: Specifies the authentication method (`Basic` or `Digest`) +- `-user`: The username for authentication +- `-pass`: The password required for authentication + +Replace ``, `` and `` with the appropriate values for your stream. Ensure you handle credentials securely and avoid exposing them in public or shared scripts! + --- ## RedisCamera @@ -48,7 +67,7 @@ Instead of using a real camera, the `video-streamer` allows to use said `Redis` To use the `RedisCamera`, one can use the following command: ```bash -video-streamer -d -of MJPEG1 -uri redis://[host]:[port] -irc ExampleStream +video-streamer -d -of MPEG1 -uri redis://[host]:[port] -irc ExampleStream ``` where `host` and `port` are the respective host and port of the `Redis` server and `ExampleStream` would be the Pub/Sub channel to use for generating the stream. diff --git a/video_streamer/core/camera.py b/video_streamer/core/camera.py index bf28dd3..069d3dd 100644 --- a/video_streamer/core/camera.py +++ b/video_streamer/core/camera.py @@ -23,9 +23,11 @@ except ImportError: logging.warning("PyTango not available.") +from requests.auth import HTTPBasicAuth, HTTPDigestAuth +from video_streamer.core.config import AuthenticationConfiguration class Camera: - def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None): + def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None): self._device_uri = device_uri self._sleep_time = sleep_time self._debug = debug @@ -48,7 +50,7 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None: self._output = output if self._redis: host, port = self._redis.split(':') - self._redis_client = redis.StrictRedis(host=host, port=port) + self._redis_client = redis.StrictRedis(host=host, port=int(port)) while True: try: @@ -63,7 +65,7 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None: pass @property - def size(self) -> Tuple[float, float]: + def size(self) -> Tuple[int, int]: return (self._width, self._height) def get_jpeg(self, data, size=(0, 0)) -> bytearray: @@ -76,37 +78,131 @@ def get_jpeg(self, data, size=(0, 0)) -> bytearray: image.save(jpeg_data, format="JPEG") jpeg_data = jpeg_data.getvalue() - return jpeg_data + return bytearray(jpeg_data) + + def _image_to_rgb24(self, image: bytes) -> bytearray: + """ + Convert binary image data into raw RGB24-encoded byte array + Supported image types include JPEG, PNG, BMP, TIFF, GIF, ... + """ + image_array = np.frombuffer(image, dtype=np.uint8) + frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + return bytearray(rgb_frame.tobytes()) class MJPEGCamera(Camera): - def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None): + def __init__(self, device_uri: str, sleep_time: float, auth_config: AuthenticationConfiguration, debug: bool = False, redis: str = None, redis_channel: str = None): super().__init__(device_uri, sleep_time, debug, redis, redis_channel) + self._authentication=self._createAuthenticationHeader(auth_config) + self._set_size() + + def _set_size(self) -> None: + buffer = bytearray() + # To set the size, extract the first image from the MJPEG stream + try: + response = requests.get(self._device_uri, stream=True, verify=False, auth=self._authentication) + if response.status_code == 200: + boundary = self._extract_boundary(response.headers) + if not boundary: + logging.error("Boundary not found in Content-Type header.") + return + + for chunk in response.iter_content(chunk_size=8192): + buffer.extend(chunk) + + while True: + frame, buffer = self._extract_frame(buffer, boundary) + if frame is None: + break + image = Image.open(io.BytesIO(frame)) + self._width, self._height = image.size + return + else: + logging.error(f"Received unexpected status code {response.status_code}") + return + except requests.RequestException as e: + logging.exception(f"Exception occured during stream request") + return + + def _createAuthenticationHeader(self, auth_config:AuthenticationConfiguration) -> Union[None, HTTPBasicAuth, HTTPDigestAuth]: + type = auth_config.type + if type == "Basic": + return HTTPBasicAuth(username=auth_config.username, password=auth_config.password) + elif type == "Digest": + return HTTPDigestAuth(username=auth_config.username, password=auth_config.password) + elif type: + logging.warning("Unknown authentication Type {type}") + return None + + def _extract_boundary(self, headers): + """ + Extract the boundary marker from the Content-Type header. + """ + content_type = headers.get("Content-Type", "") + if "boundary=" in content_type: + return content_type.split("boundary=")[-1] + return None + + def _extract_frame(self, buffer: bytearray, boundary: str): + """ + Extract a single JPEG frame from the buffer if a complete frame exists. + Returns a tuple of (frame_data, remaining_buffer). + """ + boundary_bytes = f"--{boundary}".encode() + start_index = buffer.find(boundary_bytes) + if start_index == -1: + return None, buffer # Boundary not found + + # Find the next boundary after the current one + next_index = buffer.find(boundary_bytes, start_index + len(boundary_bytes)) + if next_index == -1: + return None, buffer # Complete frame not yet available + + # Extract the data between boundaries + frame_section = buffer[start_index + len(boundary_bytes):next_index] + + # Separate headers and JPEG data + header_end = frame_section.find(b"\r\n\r\n") # End of headers + if header_end == -1: + return None, buffer # Headers not fully received + + # Extract the JPEG data + frame_data = frame_section[header_end + 4:] # Skip past the headers + remaining_buffer = buffer[next_index:] # Data after the next boundary + return frame_data.strip(), remaining_buffer # Strip any extra whitespace def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None: - # auth=("user", "password") - r = requests.get(self._device_uri, stream=True) + buffer = bytearray() + self._output = output - buffer = bytes() while True: try: - if r.status_code == 200: - for chunk in r.iter_content(chunk_size=1024): - buffer += chunk - + response = requests.get(self._device_uri, stream=True, verify=False, auth=self._authentication) + if response.status_code == 200: + boundary = self._extract_boundary(response.headers) + if not boundary: + logging.error("Boundary not found in Content-Type header.") + break + + for chunk in response.iter_content(chunk_size=8192): + buffer.extend(chunk) + + while True: + frame, buffer = self._extract_frame(buffer, boundary) + if frame is None: + break + self._write_data(self._image_to_rgb24(bytes(frame))) else: - print("Received unexpected status code {}".format(r.status_code)) - except requests.exceptions.StreamConsumedError: - output.put(buffer) - r = requests.get(self._device_uri, stream=True) - buffer = bytes() - - def get_jpeg(self, data, size=None) -> bytearray: - return data + logging.error(f"Received unexpected status code {response.status_code}") + break + except requests.RequestException as e: + logging.exception(f"Exception occured during stream request") + break class LimaCamera(Camera): - def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None): + def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None): super().__init__(device_uri, sleep_time, debug, redis, redis_channel) self._lima_tango_device = self._connect(self._device_uri) @@ -131,7 +227,7 @@ def _get_image(self) -> Tuple[bytearray, float, float, int]: hfmt = ">IHHqiiHHHH" hsize = struct.calcsize(hfmt) - _, _, img_mode, frame_number, width, height, _, _, _, _ = struct.unpack( + _, _, _, frame_number, width, height, _, _, _, _ = struct.unpack( hfmt, img_data[1][:hsize] ) @@ -162,7 +258,7 @@ def _poll_once(self) -> None: class RedisCamera(Camera): - def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, out_redis: str = None, out_redis_channel: str = None, in_redis_channel: str = 'frames'): + def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, out_redis: str = None, out_redis_channel: str = None, in_redis_channel: str = 'frames'): super().__init__(device_uri, sleep_time, debug, out_redis, out_redis_channel) # for this camera in_redis_... is for the input and redis_... as usual for output self._in_redis_client = self._connect(self._device_uri) @@ -203,17 +299,12 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None: "frame_number": self._last_frame_number } self._redis_client.publish(self._redis_channel, json.dumps(frame_dict)) - raw_image_data = base64.b64decode(frame["data"]) - # ffmpeg needs an rgb encoded image, since we cannot be sure if the image was in rgb or - # bgr(common for cv2 image manipulation) we need these transformations - image_array = np.frombuffer(raw_image_data, dtype=np.uint8) - frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - self._write_data(rgb_frame.tobytes()) + raw_image_data = base64.b64decode(frame["data"]) + self._write_data(self._image_to_rgb24(raw_image_data)) class TestCamera(Camera): - def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None): + def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None): super().__init__(device_uri, sleep_time, debug, redis, redis_channel) self._sleep_time = 0.05 testimg_fpath = os.path.join(os.path.dirname(__file__), "fakeimg.jpg") @@ -224,7 +315,7 @@ def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: self._last_frame_number = -1 def _poll_once(self) -> None: - self._write_data(self._raw_data) + self._write_data(bytearray(self._raw_data)) self._last_frame_number += 1 if self._redis: @@ -239,7 +330,7 @@ def _poll_once(self) -> None: time.sleep(self._sleep_time) class VideoTestCamera(Camera): - def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None): + def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None): super().__init__(device_uri, sleep_time, debug, redis, redis_channel) self._sleep_time = 0.04 # for your testvideo, please use an uncompressed video or mjpeg codec, @@ -252,7 +343,7 @@ def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: def _poll_once(self) -> None: if not self._video_capture.isOpened(): - print("Video capture is not opened.") + logging.error("Video capture is not opened.") return ret, frame = self._video_capture.read() @@ -262,7 +353,7 @@ def _poll_once(self) -> None: self._video_capture = cv2.VideoCapture(self._testvideo_fpath) ret, frame = self._video_capture.read() if not ret: - print("Failed to restart video capture.") + logging.error("Failed to restart video capture.") return frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) @@ -283,7 +374,7 @@ def _poll_once(self) -> None: def _set_video_dimensions(self): if not self._video_capture.isOpened(): - print("Video capture is not opened.") + logging.error("Video capture is not opened.") return self._width = int(self._video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) self._height = int(self._video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) \ No newline at end of file diff --git a/video_streamer/core/config.py b/video_streamer/core/config.py index 7019359..fdb60b6 100644 --- a/video_streamer/core/config.py +++ b/video_streamer/core/config.py @@ -17,12 +17,32 @@ from typing_extensions import Self __all__ = ( + "AuthenticationConfiguration", "SourceConfiguration", "ServerConfiguration", "get_config_from_file", "get_config_from_dict", + "get_auth_config_from_dict", ) +class AuthenticationConfiguration(BaseModel): + """Authentication Configuration""" + + type: Union[str, None] = Field( + title="Authentication Type", + description="Type of authentication, supported types are 'Basic', 'Digest', None", + default= None, + ) + + username: Union[bytes, str] = Field( + title="Username", + default="", + ) + + password: Union[bytes, str] = Field( + title="Password", + default="", + ) class SourceConfiguration(BaseModel): """Source Configuration""" @@ -65,7 +85,10 @@ class SourceConfiguration(BaseModel): description= "Channel for RedisCamera to listen to", default="CameraStream", ) - + auth_config: AuthenticationConfiguration = Field( + title="Authentication Configurations", + default=AuthenticationConfiguration(type=None), + ) class ServerConfiguration(BaseModel): """Server Configuration""" @@ -130,7 +153,7 @@ def get_config_from_file(fpath: Union[str, Path]) -> Union[ServerConfiguration, def get_config_from_dict( - config_data: dict[str, Any], + config_data: Dict[str, Any], ) -> Union[ServerConfiguration, None]: """Get server configuration from dictionary. @@ -144,3 +167,19 @@ def get_config_from_dict( return ServerConfiguration.model_validate(config_data) except ValidationError: return None + +def get_auth_config_from_dict( + config_data: Dict[str, Any], +) -> Union[AuthenticationConfiguration, None]: + """Get authentication configuration from dictionary. + + Args: + config_data (dict[str, Any]): Authentication Data. + + Returns: + Union[AuthenticationConfiguration, None]: Authentication Configuration or None. + """ + try: + return AuthenticationConfiguration.model_validate(config_data) + except ValidationError: + return None \ No newline at end of file diff --git a/video_streamer/core/streamer.py b/video_streamer/core/streamer.py index d31e0f3..9a5065f 100644 --- a/video_streamer/core/streamer.py +++ b/video_streamer/core/streamer.py @@ -28,7 +28,7 @@ def get_camera(self) -> Camera: elif self._config.input_uri == "videotest": return VideoTestCamera("TANGO_URI", self._expt, False, self._config.redis, self._config.redis_channel) elif self._config.input_uri.startswith("http"): - return MJPEGCamera(self._config.input_uri, self._expt, False, self._config.redis, self._config.redis_channel) + return MJPEGCamera(self._config.input_uri, self._expt, self._config.auth_config, False, self._config.redis, self._config.redis_channel) elif self._config.input_uri.startswith("redis"): return RedisCamera(self._config.input_uri, self._expt, False, self._config.redis, self._config.redis_channel, self._config.in_redis_channel) @@ -100,8 +100,8 @@ def _start_ffmpeg( :returns: Processes performing encoding :rtype: tuple """ - source_size = "%s:%s" % source_size - out_size = "%s:%s" % out_size + source_size_str = "%s:%s" % source_size + out_size_str = "%s:%s" % out_size ffmpeg_args = [ "ffmpeg", @@ -110,7 +110,7 @@ def _start_ffmpeg( "-pixel_format", "rgb24", "-s", - source_size, + source_size_str, "-i", "-", "-f", @@ -118,7 +118,7 @@ def _start_ffmpeg( "-q:v", "%s" % quality, "-vf", - "scale=%s" % out_size, + "scale=%s" % out_size_str, "-vcodec", "mpeg1video", "http://127.0.0.1:%s/video_input/" % port, diff --git a/video_streamer/main.py b/video_streamer/main.py index fdaf6ba..86a520f 100644 --- a/video_streamer/main.py +++ b/video_streamer/main.py @@ -2,7 +2,7 @@ import argparse from video_streamer.server import create_app -from video_streamer.core.config import get_config_from_dict, get_config_from_file +from video_streamer.core.config import get_config_from_dict, get_config_from_file, get_auth_config_from_dict def parse_args() -> argparse.Namespace: @@ -124,6 +124,30 @@ def parse_args() -> argparse.Namespace: default="CameraStream", ) + opt_parser.add_argument( + "-auth", + "--auth_type", + dest="auth_type", + help="Type of authentication request", + default="Digest", + ) + + opt_parser.add_argument( + "-user", + "--username", + dest="username", + help="Username for authentication request", + default="", + ) + + opt_parser.add_argument( + "-pass", + "--password", + dest="password", + help="Password for authentication request", + default="", + ) + return opt_parser.parse_args() @@ -151,6 +175,11 @@ def run() -> None: "hash": args.hash, "size": _size, "in_redis_channel": args.in_redis_channel, + "auth_config": get_auth_config_from_dict({ + "type": args.auth_type, + "username": args.username, + "password": args.password + }) } } }