Skip to content

Commit

Permalink
receiver: Pass hidpp10 instance to remove tight coupling
Browse files Browse the repository at this point in the history
Introduce Protocol and pass instance of Hidpp10 at instantiation of a
Receiver. This simplifies testing and helps to decouple device from
hidpp10.

Related #2350
  • Loading branch information
MattHag committed Mar 3, 2024
1 parent 9b69aae commit 5c27679
Showing 1 changed file with 88 additions and 24 deletions.
112 changes: 88 additions & 24 deletions lib/logitech_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import errno as _errno
import logging

from typing import Any, Protocol

import hidapi as _hid

from . import base as _base
Expand All @@ -26,11 +28,21 @@

logger = logging.getLogger(__name__)

_hidpp10 = hidpp10.Hidpp10()
_R = hidpp10_constants.REGISTERS
_IR = hidpp10_constants.INFO_SUBREGISTERS


class Hidpp10Protocol(Protocol):
def get_firmware(self, device) -> tuple:
...

def get_notification_flags(self, device) -> Any:
...

def set_notification_flags(self, device, flags) -> Any:
...


class ReceiverFactory:
@staticmethod
def create_receiver(device_info, setting_callback=None):
Expand All @@ -46,19 +58,62 @@ def create_receiver(device_info, setting_callback=None):

handle = _base.open_path(device_info.path)
if handle:
hidpp_instance = hidpp10.Hidpp10()
receiver_kind = product_info.get("receiver_kind", "unknown")
if receiver_kind == "bolt":
return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
return BoltReceiver(
hidpp_instance,
product_info,
handle,
device_info.path,
device_info.product_id,
setting_callback,
)
elif receiver_kind == "unifying":
return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
return UnifyingReceiver(
hidpp_instance,
product_info,
handle,
device_info.path,
device_info.product_id,
setting_callback,
)
elif receiver_kind == "lightspeed":
return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
return LightSpeedReceiver(
hidpp_instance,
product_info,
handle,
device_info.path,
device_info.product_id,
setting_callback,
)
elif receiver_kind == "nano":
return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
return NanoReceiver(
hidpp_instance,
product_info,
handle,
device_info.path,
device_info.product_id,
setting_callback,
)
elif receiver_kind == "27Mhz":
return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
return Ex100Receiver(
hidpp_instance,
product_info,
handle,
device_info.path,
device_info.product_id,
setting_callback,
)
else:
return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
return Receiver(
hidpp_instance,
product_info,
handle,
device_info.path,
device_info.product_id,
setting_callback,
)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
Expand All @@ -75,9 +130,21 @@ class Receiver:

number = 0xFF
kind = None
read_register = hidpp10.read_register
write_register = hidpp10.write_register

def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
def __init__(
self,
hidpp10_instance: Hidpp10Protocol,
receiver_kind,
product_info,
handle,
path,
product_id,
setting_callback=None,
):
assert handle
self.hidpp10 = hidpp10_instance
self.isDevice = False # some devices act as receiver so we need a property to distinguish them
self.handle = handle
self.path = path
Expand Down Expand Up @@ -131,7 +198,7 @@ def __del__(self):
@property
def firmware(self):
if self._firmware is None and self.handle:
self._firmware = _hidpp10.get_firmware(self)
self._firmware = self.hidpp10.get_firmware(self)
return self._firmware

# how many pairings remain (None for unknown, -1 for unlimited)
Expand All @@ -157,12 +224,12 @@ def enable_connection_notifications(self, enable=True):
)
else:
set_flag_bits = 0
ok = _hidpp10.set_notification_flags(self, set_flag_bits)
ok = self.hidpp10.set_notification_flags(self, set_flag_bits)
if ok is None:
logger.warning("%s: failed to %s receiver notifications", self, "enable" if enable else "disable")
return None

flag_bits = _hidpp10.get_notification_flags(self)
flag_bits = self.hidpp10.get_notification_flags(self)
flag_names = None if flag_bits is None else tuple(hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits))
if logger.isEnabledFor(logging.INFO):
logger.info("%s: receiver notifications %s => %s", self, "enabled" if enable else "disabled", flag_names)
Expand Down Expand Up @@ -287,9 +354,6 @@ def request(self, request_id, *params):
if bool(self):
return _base.request(self.handle, 0xFF, request_id, *params)

read_register = hidpp10.read_register
write_register = hidpp10.write_register

def __iter__(self):
connected_devices = self.count()
found_devices = 0
Expand Down Expand Up @@ -386,8 +450,8 @@ def __str__(self):


class BoltReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("bolt", product_info, handle, path, product_id, setting_callback)
def __init__(self, hidpp10_instance, product_info, handle, path, product_id, setting_callback=None):
super().__init__(hidpp10_instance, "bolt", product_info, handle, path, product_id, setting_callback)

def initialize(self, product_info: dict):
serial_reply = self.read_register(_R.bolt_uniqueId)
Expand Down Expand Up @@ -435,20 +499,20 @@ def _unpair_device_per_receiver(self, key):


class UnifyingReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("unifying", product_info, handle, path, product_id, setting_callback)
def __init__(self, hidpp10_instance, product_info, handle, path, product_id, setting_callback=None):
super().__init__(hidpp10_instance, "unifying", product_info, handle, path, product_id, setting_callback)


class NanoReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("nano", product_info, handle, path, product_id, setting_callback)
def __init__(self, hidpp10_instance, product_info, handle, path, product_id, setting_callback=None):
super().__init__(hidpp10_instance, "nano", product_info, handle, path, product_id, setting_callback)


class LightSpeedReceiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("lightspeed", product_info, handle, path, product_id, setting_callback)
def __init__(self, hidpp10_instance, product_info, handle, path, product_id, setting_callback=None):
super().__init__(hidpp10_instance, "lightspeed", product_info, handle, path, product_id, setting_callback)


class Ex100Receiver(Receiver):
def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback)
def __init__(self, hidpp10_instance, product_info, handle, path, product_id, setting_callback=None):
super().__init__(hidpp10_instance, "27Mhz", product_info, handle, path, product_id, setting_callback)

0 comments on commit 5c27679

Please sign in to comment.