Skip to content

Commit

Permalink
Refine interfaces for testability
Browse files Browse the repository at this point in the history
  • Loading branch information
MattHag committed May 24, 2024
1 parent 815dce0 commit 0a9bad3
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 92 deletions.
47 changes: 39 additions & 8 deletions lib/logitech_receiver/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

from typing import Callable
from typing import Optional
from typing import Protocol
from typing import cast

import hidapi

Expand All @@ -46,23 +48,49 @@
_IR = hidpp10_constants.INFO_SUBREGISTERS


class LowLevelInterface(Protocol):
def open_path(self, path):
...

def ping(self, handle, number, long_message: bool):
...

def request(self, handle, devnumber, request_id, *params, **kwargs):
...

def close(self, handle, *args, **kwargs) -> bool:
...


low_level_interface = cast(LowLevelInterface, base)


class DeviceFactory:
@staticmethod
def create_device(device_info, setting_callback=None):
def create_device(low_level: LowLevelInterface, device_info, setting_callback=None):
"""Opens a Logitech Device found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or None.
"""
try:
handle = base.open_path(device_info.path)
handle = low_level.open_path(device_info.path)
if handle:
# a direct connected device might not be online (as reported by user)
return Device(None, None, None, handle=handle, device_info=device_info, setting_callback=setting_callback)
return Device(
low_level,
None,
None,
None,
handle=handle,
device_info=device_info,
setting_callback=setting_callback,
)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)
raise


class Device:
Expand All @@ -72,6 +100,7 @@ class Device:

def __init__(
self,
low_level: LowLevelInterface,
receiver,
number,
online,
Expand All @@ -83,6 +112,7 @@ def __init__(
assert receiver or device_info
if receiver:
assert 0 < number <= 15 # some receivers have devices past their max # of devices
self.low_level = low_level
self.number = number # will be None at this point for directly connected devices
self.online = online # is the device online? - gates many atempts to contact the device
self.descriptor = None
Expand Down Expand Up @@ -129,11 +159,11 @@ def __init__(
self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None
if not self.handle:
try:
self.handle = base.open_path(self.path) if self.path else None
self.handle = self.low_level.open_path(self.path) if self.path else None
except Exception: # maybe the device wasn't set up
try:
time.sleep(1)
self.handle = base.open_path(self.path) if self.path else None
self.handle = self.low_level.open_path(self.path) if self.path else None
except Exception: # give up
self.handle = None # should this give up completely?

Expand Down Expand Up @@ -484,7 +514,7 @@ def request(self, request_id, *params, no_reply=False):
long = self.hidpp_long is True or (
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
)
return base.request(
return self.low_level.request(
self.handle or self.receiver.handle,
self.number,
request_id,
Expand All @@ -503,7 +533,8 @@ def ping(self):
long = self.hidpp_long is True or (
self.hidpp_long is None and (self.bluetooth or self._protocol is not None and self._protocol >= 2.0)
)
protocol = base.ping(self.handle or self.receiver.handle, self.number, long_message=long)
handle = self.handle or self.receiver.handle
protocol = self.low_level.ping(handle, self.number, long_message=long)
self.online = protocol is not None
if protocol:
self._protocol = protocol
Expand All @@ -519,7 +550,7 @@ def close(self):
if hasattr(self, "cleanups"):
for cleanup in self.cleanups:
cleanup(self)
return handle and base.close(handle)
return handle and self.low_level.close(handle)

def __index__(self):
return self.number
Expand Down
21 changes: 20 additions & 1 deletion lib/logitech_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from dataclasses import dataclass
from typing import Callable
from typing import Optional
from typing import Protocol
from typing import cast

import hidapi

Expand All @@ -42,6 +44,23 @@
_IR = hidpp10_constants.INFO_SUBREGISTERS


class LowLevelInterface(Protocol):
def open_path(self, path):
...

def ping(self, handle, number, long_message=False):
...

def request(self, handle, devnumber, request_id, *params, **kwargs):
...

def close(self, handle):
...


low_level_interface = cast(LowLevelInterface, base)


@dataclass
class Pairing:
"""Information about the current or most recent pairing"""
Expand Down Expand Up @@ -228,7 +247,7 @@ def register_new_device(self, number, notification=None):
logger.warning("mismatch on device kind %s %s", info["kind"], nkind)
else:
online = True
dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback)
dev = Device(low_level_interface, self, number, online, pairing_info=info, setting_callback=self.setting_callback)
if logger.isEnabledFor(logging.INFO):
logger.info("%s: found new device %d (%s)", self, number, dev.wpid)
self._devices[number] = dev
Expand Down
9 changes: 4 additions & 5 deletions lib/solaar/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import logitech_receiver.device as _device
import logitech_receiver.receiver as _receiver

from logitech_receiver.base import receivers
from logitech_receiver.base import receivers_and_devices
from logitech_receiver import base

from solaar import NAME

Expand Down Expand Up @@ -108,7 +107,7 @@ def _create_parser():


def _receivers(dev_path=None):
for dev_info in receivers():
for dev_info in base.receivers():
if dev_path is not None and dev_path != dev_info.path:
continue
try:
Expand All @@ -123,12 +122,12 @@ def _receivers(dev_path=None):


def _receivers_and_devices(dev_path=None):
for dev_info in receivers_and_devices():
for dev_info in base.receivers_and_devices():
if dev_path is not None and dev_path != dev_info.path:
continue
try:
if dev_info.isDevice:
d = _device.DeviceFactory.create_device(dev_info)
d = _device.DeviceFactory.create_device(base, dev_info)
else:
d = _receiver.ReceiverFactory.create_receiver(dev_info)

Expand Down
2 changes: 1 addition & 1 deletion lib/solaar/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _start(device_info):
if not isDevice:
receiver = _receiver.ReceiverFactory.create_receiver(device_info, _setting_callback)
else:
receiver = _device.DeviceFactory.create_device(device_info, _setting_callback)
receiver = _device.DeviceFactory.create_device(_base, device_info, _setting_callback)
if receiver:
configuration.attach_to(receiver)
if receiver.bluetooth and receiver.hid_serial:
Expand Down
Loading

0 comments on commit 0a9bad3

Please sign in to comment.