Skip to content

Commit

Permalink
logitech_receiver: Move exceptions into own module
Browse files Browse the repository at this point in the history
Related #1097
  • Loading branch information
MattHag committed Feb 14, 2024
1 parent b516b12 commit ae45c4a
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 90 deletions.
2 changes: 0 additions & 2 deletions lib/logitech_receiver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import logging

from . import listener, status # noqa: F401
from .base import DeviceUnreachable, NoReceiver, NoSuchDevice # noqa: F401
from .common import strhex # noqa: F401
from .device import Device # noqa: F401
from .hidpp20 import FeatureCallError, FeatureNotSupported # noqa: F401
from .receiver import Receiver # noqa: F401

logger = logging.getLogger(__name__)
Expand Down
52 changes: 15 additions & 37 deletions lib/logitech_receiver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@

import hidapi as _hid

from . import hidpp10 as _hidpp10
from . import exceptions
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp20 as _hidpp20
from . import hidpp20_constants as _hidpp20_constants
from .base_usb import ALL as _RECEIVER_USB_IDS
from .base_usb import DEVICES as _DEVICE_IDS
from .base_usb import other_device_check as _other_device_check
from .common import KwException as _KwException
from .common import strhex as _strhex

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -69,29 +70,6 @@
# when pinging, be extra patient (no longer)
_PING_TIMEOUT = DEFAULT_TIMEOUT

#
# Exceptions that may be raised by this API.
#


class NoReceiver(_KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
unloaded."""
pass


class NoSuchDevice(_KwException):
"""Raised when trying to reach a device number not paired to the receiver."""
pass


class DeviceUnreachable(_KwException):
"""Raised when a request is made to an unreachable (turned off) device."""
pass


#
#
#
Expand Down Expand Up @@ -219,7 +197,7 @@ def write(handle, devnumber, data, long_message=False):
except Exception as reason:
logger.error('write failed, assuming handle %r no longer available', handle)
close(handle)
raise NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason)


def read(handle, timeout=DEFAULT_TIMEOUT):
Expand Down Expand Up @@ -268,7 +246,7 @@ def _read(handle, timeout):
except Exception as reason:
logger.warning('read failed, assuming handle %r no longer available', handle)
close(handle)
raise NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason)

if data and check_message(data): # ignore messages that fail check
report_id = ord(data[:1])
Expand Down Expand Up @@ -299,7 +277,7 @@ def _skip_incoming(handle, ihandle, notifications_hook):
except Exception as reason:
logger.error('read failed, assuming receiver %s no longer available', handle)
close(handle)
raise NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason)

if data:
if check_message(data): # only process messages that pass check
Expand Down Expand Up @@ -420,7 +398,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
notifications_hook = getattr(handle, 'notifications_hook', None)
try:
_skip_incoming(handle, ihandle, notifications_hook)
except NoReceiver:
except exceptions.NoReceiver:
logger.warning('device or receiver disconnected')
return None
write(ihandle, devnumber, request_data, long_message)
Expand All @@ -445,15 +423,15 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
'(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp10.ERROR[error]
_hidpp10_constants.ERROR[error]
)
return _hidpp10.ERROR[error] if return_error else None
return _hidpp10_constants.ERROR[error] if return_error else None
if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]:
# a HID++ 2.0 feature call returned with an error
error = ord(reply_data[3:4])
logger.error(
'(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp20.ERROR[error]
_hidpp20_constants.ERROR[error]
)
raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)

Expand Down Expand Up @@ -505,7 +483,7 @@ def ping(handle, devnumber, long_message=False):
notifications_hook = getattr(handle, 'notifications_hook', None)
try:
_skip_incoming(handle, int(handle), notifications_hook)
except NoReceiver:
except exceptions.NoReceiver:
logger.warning('device or receiver disconnected')
return

Expand All @@ -530,13 +508,13 @@ def ping(handle, devnumber, long_message=False):
if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and \
reply_data[1:3] == request_data[:2]: # error response
error = ord(reply_data[3:4])
if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
if error == _hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
return 1.0
if error == _hidpp10.ERROR.resource_error or error == _hidpp10.ERROR.connection_request_failed:
if error == _hidpp10_constants.ERROR.resource_error or error == _hidpp10_constants.ERROR.connection_request_failed:
return # device unreachable
if error == _hidpp10.ERROR.unknown_device: # no paired device with that number
if error == _hidpp10_constants.ERROR.unknown_device: # no paired device with that number
logger.error('(%s) device %d error on ping request: unknown device', handle, devnumber)
raise NoSuchDevice(number=devnumber, request=request_id)
raise exceptions.NoSuchDevice(number=devnumber, request=request_id)

if notifications_hook:
n = make_notification(report_id, reply_devnumber, reply_data)
Expand Down
9 changes: 5 additions & 4 deletions lib/logitech_receiver/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from . import base as _base
from . import descriptors as _descriptors
from . import exceptions
from . import hidpp10 as _hidpp10
from . import hidpp20 as _hidpp20
from .common import strhex as _strhex
Expand Down Expand Up @@ -128,15 +129,15 @@ def __init__(
self.wpid = _hid.find_paired_node_wpid(receiver.path, number)
if not self.wpid:
logger.error('Unable to get wpid from udev for device %d of %s', number, receiver)
raise _base.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device')
raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device')
kind = receiver.get_kind_from_index(number)
self._kind = _hidpp10.DEVICE_KIND[kind]
else: # get information from pairing registers
self.online = True
self.update_pairing_information()
self.update_extended_pairing_information()
if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found
raise _base.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial')
raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial')

# the wpid is set to None on this object when the device is unpaired
assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver)
Expand Down Expand Up @@ -208,7 +209,7 @@ def name(self):
if not self.online: # be very defensive
try:
self.ping()
except _base.NoSuchDevice:
except exceptions.NoSuchDevice:
pass
if self.online and self.protocol >= 2.0:
self._name = _hidpp20.get_name(self)
Expand Down Expand Up @@ -514,7 +515,7 @@ def __bool__(self):
def __str__(self):
try:
name = self.name or self.codename or '?'
except _base.NoSuchDevice:
except exceptions.NoSuchDevice:
name = 'name not available'
return '<Device(%d,%s,%s,%s)>' % (self.number, self.wpid or self.product_id, name, self.serial)

Expand Down
33 changes: 33 additions & 0 deletions lib/logitech_receiver/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from .common import KwException as _KwException

#
# Exceptions that may be raised by this API.
#


class NoReceiver(_KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
unloaded."""
pass


class NoSuchDevice(_KwException):
"""Raised when trying to reach a device number not paired to the receiver."""
pass


class DeviceUnreachable(_KwException):
"""Raised when a request is made to an unreachable (turned off) device."""
pass


class FeatureNotSupported(_KwException):
"""Raised when trying to request a feature not supported by the device."""
pass


class FeatureCallError(_KwException):
"""Raised if the device replied to a feature call with an error."""
pass
32 changes: 8 additions & 24 deletions lib/logitech_receiver/hidpp20.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@

import yaml as _yaml

from . import special_keys
from . import exceptions, special_keys
from .common import BATTERY_APPROX as _BATTERY_APPROX
from .common import FirmwareInfo as _FirmwareInfo
from .common import KwException as _KwException
from .common import NamedInt as _NamedInt
from .common import NamedInts as _NamedInts
from .common import UnsortedNamedInts as _UnsortedNamedInts
Expand Down Expand Up @@ -227,21 +226,6 @@ def hexint_presenter(dumper, data):
#


class FeatureNotSupported(_KwException):
"""Raised when trying to request a feature not supported by the device."""
pass


class FeatureCallError(_KwException):
"""Raised if the device replied to a feature call with an error."""
pass


#
#
#


class FeaturesArray(dict):

def __init__(self, device):
Expand Down Expand Up @@ -462,8 +446,8 @@ def _getCidReporting(self):
mapping_flags_2 = 0
self._mapping_flags = mapping_flags_1 | (mapping_flags_2 << 8)
else:
raise FeatureCallError(msg='No reply from device.')
except FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning
raise exceptions.FeatureCallError(msg='No reply from device.')
except exceptions.FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning
if logger.isEnabledFor(logging.WARNING):
logger.warn(
f'Feature Call Error in _getCidReporting on device {self._device} for cid {self._cid} - use defaults'
Expand Down Expand Up @@ -498,7 +482,7 @@ def _setCidReporting(self, flags=None, remap=0):
bfield = 0
for f, v in flags.items():
if v and FLAG_TO_CAPABILITY[f] not in self.flags:
raise FeatureNotSupported(
raise exceptions.FeatureNotSupported(
msg=f'Tried to set mapping flag "{f}" on control "{self.key}" ' +
f'which does not support "{FLAG_TO_CAPABILITY[f]}" on device {self._device}.'
)
Expand All @@ -511,7 +495,7 @@ def _setCidReporting(self, flags=None, remap=0):
self._mapping_flags &= ~int(f)

if remap != 0 and remap not in self.remappable_to:
raise FeatureNotSupported(
raise exceptions.FeatureNotSupported(
msg=f'Tried to remap control "{self.key}" to a control ID {remap} which it is not remappable to ' +
f'on device {self._device}.'
)
Expand Down Expand Up @@ -1044,7 +1028,7 @@ def value(self):
def read(self):
try:
value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF)
except FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus)
except exceptions.FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus)
if logger.isEnabledFor(logging.WARNING):
logger.warn(f'Feature Call Error reading Gesture Spec on device {self._device} for spec {self.id} - use None')
return None
Expand Down Expand Up @@ -1129,7 +1113,7 @@ class Backlight:
def __init__(self, device):
response = device.feature_request(FEATURE.BACKLIGHT2, 0x00)
if not response:
raise FeatureCallError(msg='No reply from device.')
raise exceptions.FeatureCallError(msg='No reply from device.')
self.device = device
self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = _unpack(
'<BBBHBHHH', response[:12]
Expand Down Expand Up @@ -1818,7 +1802,7 @@ def get_adc_measurement(device):
report = feature_request(device, FEATURE.ADC_MEASUREMENT)
if report is not None:
return decipher_adc_measurement(report)
except FeatureCallError:
except exceptions.FeatureCallError:
return FEATURE.ADC_MEASUREMENT if FEATURE.ADC_MEASUREMENT in device.features else None


Expand Down
3 changes: 2 additions & 1 deletion lib/logitech_receiver/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import threading as _threading

from . import base as _base
from . import exceptions

# from time import time as _timestamp

Expand Down Expand Up @@ -163,7 +164,7 @@ def run(self):
if self._queued_notifications.empty():
try:
n = _base.read(self.receiver.handle, _EVENT_READ_TIMEOUT)
except _base.NoReceiver:
except exceptions.NoReceiver:
logger.warning('%s disconnected', self.receiver.name)
self.receiver.close()
break
Expand Down
Loading

0 comments on commit ae45c4a

Please sign in to comment.