From ae45c4a9cdb5748a9155402cd04cd12a0d4ab1ca Mon Sep 17 00:00:00 2001 From: Matthias Hagmann <16444067+MattHag@users.noreply.github.com> Date: Wed, 14 Feb 2024 20:37:41 +0100 Subject: [PATCH] logitech_receiver: Move exceptions into own module Related #1097 --- lib/logitech_receiver/__init__.py | 2 -- lib/logitech_receiver/base.py | 52 +++++++++-------------------- lib/logitech_receiver/device.py | 9 ++--- lib/logitech_receiver/exceptions.py | 33 ++++++++++++++++++ lib/logitech_receiver/hidpp20.py | 32 +++++------------- lib/logitech_receiver/listener.py | 3 +- lib/logitech_receiver/receiver.py | 32 +++++++++--------- lib/solaar/cli/show.py | 4 +-- lib/solaar/listener.py | 11 +++--- 9 files changed, 88 insertions(+), 90 deletions(-) create mode 100644 lib/logitech_receiver/exceptions.py diff --git a/lib/logitech_receiver/__init__.py b/lib/logitech_receiver/__init__.py index e31407edac..e1a193ee04 100644 --- a/lib/logitech_receiver/__init__.py +++ b/lib/logitech_receiver/__init__.py @@ -31,10 +31,8 @@ import logging from . import listener, status # noqa: F401 -from .base import DeviceUnreachable, NoReceiver, NoSuchDevice # noqa: F401 from .common import strhex # noqa: F401 from .device import Device # noqa: F401 -from .hidpp20 import FeatureCallError, FeatureNotSupported # noqa: F401 from .receiver import Receiver # noqa: F401 logger = logging.getLogger(__name__) diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 0024b021ec..61fa12fb71 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -30,12 +30,13 @@ import hidapi as _hid -from . import hidpp10 as _hidpp10 +from . import exceptions +from . import hidpp10_constants as _hidpp10_constants from . import hidpp20 as _hidpp20 +from . import hidpp20_constants as _hidpp20_constants from .base_usb import ALL as _RECEIVER_USB_IDS from .base_usb import DEVICES as _DEVICE_IDS from .base_usb import other_device_check as _other_device_check -from .common import KwException as _KwException from .common import strhex as _strhex logger = logging.getLogger(__name__) @@ -69,29 +70,6 @@ # when pinging, be extra patient (no longer) _PING_TIMEOUT = DEFAULT_TIMEOUT -# -# Exceptions that may be raised by this API. -# - - -class NoReceiver(_KwException): - """Raised when trying to talk through a previously open handle, when the - receiver is no longer available. Should only happen if the receiver is - physically disconnected from the machine, or its kernel driver module is - unloaded.""" - pass - - -class NoSuchDevice(_KwException): - """Raised when trying to reach a device number not paired to the receiver.""" - pass - - -class DeviceUnreachable(_KwException): - """Raised when a request is made to an unreachable (turned off) device.""" - pass - - # # # @@ -219,7 +197,7 @@ def write(handle, devnumber, data, long_message=False): except Exception as reason: logger.error('write failed, assuming handle %r no longer available', handle) close(handle) - raise NoReceiver(reason=reason) + raise exceptions.NoReceiver(reason=reason) def read(handle, timeout=DEFAULT_TIMEOUT): @@ -268,7 +246,7 @@ def _read(handle, timeout): except Exception as reason: logger.warning('read failed, assuming handle %r no longer available', handle) close(handle) - raise NoReceiver(reason=reason) + raise exceptions.NoReceiver(reason=reason) if data and check_message(data): # ignore messages that fail check report_id = ord(data[:1]) @@ -299,7 +277,7 @@ def _skip_incoming(handle, ihandle, notifications_hook): except Exception as reason: logger.error('read failed, assuming receiver %s no longer available', handle) close(handle) - raise NoReceiver(reason=reason) + raise exceptions.NoReceiver(reason=reason) if data: if check_message(data): # only process messages that pass check @@ -420,7 +398,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error notifications_hook = getattr(handle, 'notifications_hook', None) try: _skip_incoming(handle, ihandle, notifications_hook) - except NoReceiver: + except exceptions.NoReceiver: logger.warning('device or receiver disconnected') return None write(ihandle, devnumber, request_data, long_message) @@ -445,15 +423,15 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error if logger.isEnabledFor(logging.DEBUG): logger.debug( '(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error, - _hidpp10.ERROR[error] + _hidpp10_constants.ERROR[error] ) - return _hidpp10.ERROR[error] if return_error else None + return _hidpp10_constants.ERROR[error] if return_error else None if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]: # a HID++ 2.0 feature call returned with an error error = ord(reply_data[3:4]) logger.error( '(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error, - _hidpp20.ERROR[error] + _hidpp20_constants.ERROR[error] ) raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params) @@ -505,7 +483,7 @@ def ping(handle, devnumber, long_message=False): notifications_hook = getattr(handle, 'notifications_hook', None) try: _skip_incoming(handle, int(handle), notifications_hook) - except NoReceiver: + except exceptions.NoReceiver: logger.warning('device or receiver disconnected') return @@ -530,13 +508,13 @@ def ping(handle, devnumber, long_message=False): if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and \ reply_data[1:3] == request_data[:2]: # error response error = ord(reply_data[3:4]) - if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device + if error == _hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device return 1.0 - if error == _hidpp10.ERROR.resource_error or error == _hidpp10.ERROR.connection_request_failed: + if error == _hidpp10_constants.ERROR.resource_error or error == _hidpp10_constants.ERROR.connection_request_failed: return # device unreachable - if error == _hidpp10.ERROR.unknown_device: # no paired device with that number + if error == _hidpp10_constants.ERROR.unknown_device: # no paired device with that number logger.error('(%s) device %d error on ping request: unknown device', handle, devnumber) - raise NoSuchDevice(number=devnumber, request=request_id) + raise exceptions.NoSuchDevice(number=devnumber, request=request_id) if notifications_hook: n = make_notification(report_id, reply_devnumber, reply_data) diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index 18799f8959..a0cdb51652 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -27,6 +27,7 @@ from . import base as _base from . import descriptors as _descriptors +from . import exceptions from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 from .common import strhex as _strhex @@ -128,7 +129,7 @@ def __init__( self.wpid = _hid.find_paired_node_wpid(receiver.path, number) if not self.wpid: logger.error('Unable to get wpid from udev for device %d of %s', number, receiver) - raise _base.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device') + raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device') kind = receiver.get_kind_from_index(number) self._kind = _hidpp10.DEVICE_KIND[kind] else: # get information from pairing registers @@ -136,7 +137,7 @@ def __init__( self.update_pairing_information() self.update_extended_pairing_information() if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found - raise _base.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial') + raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial') # the wpid is set to None on this object when the device is unpaired assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver) @@ -208,7 +209,7 @@ def name(self): if not self.online: # be very defensive try: self.ping() - except _base.NoSuchDevice: + except exceptions.NoSuchDevice: pass if self.online and self.protocol >= 2.0: self._name = _hidpp20.get_name(self) @@ -514,7 +515,7 @@ def __bool__(self): def __str__(self): try: name = self.name or self.codename or '?' - except _base.NoSuchDevice: + except exceptions.NoSuchDevice: name = 'name not available' return '' % (self.number, self.wpid or self.product_id, name, self.serial) diff --git a/lib/logitech_receiver/exceptions.py b/lib/logitech_receiver/exceptions.py new file mode 100644 index 0000000000..5ae67c5975 --- /dev/null +++ b/lib/logitech_receiver/exceptions.py @@ -0,0 +1,33 @@ +from .common import KwException as _KwException + +# +# Exceptions that may be raised by this API. +# + + +class NoReceiver(_KwException): + """Raised when trying to talk through a previously open handle, when the + receiver is no longer available. Should only happen if the receiver is + physically disconnected from the machine, or its kernel driver module is + unloaded.""" + pass + + +class NoSuchDevice(_KwException): + """Raised when trying to reach a device number not paired to the receiver.""" + pass + + +class DeviceUnreachable(_KwException): + """Raised when a request is made to an unreachable (turned off) device.""" + pass + + +class FeatureNotSupported(_KwException): + """Raised when trying to request a feature not supported by the device.""" + pass + + +class FeatureCallError(_KwException): + """Raised if the device replied to a feature call with an error.""" + pass diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 4032d1c96c..0d4b0c37a8 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -27,10 +27,9 @@ import yaml as _yaml -from . import special_keys +from . import exceptions, special_keys from .common import BATTERY_APPROX as _BATTERY_APPROX from .common import FirmwareInfo as _FirmwareInfo -from .common import KwException as _KwException from .common import NamedInt as _NamedInt from .common import NamedInts as _NamedInts from .common import UnsortedNamedInts as _UnsortedNamedInts @@ -227,21 +226,6 @@ def hexint_presenter(dumper, data): # -class FeatureNotSupported(_KwException): - """Raised when trying to request a feature not supported by the device.""" - pass - - -class FeatureCallError(_KwException): - """Raised if the device replied to a feature call with an error.""" - pass - - -# -# -# - - class FeaturesArray(dict): def __init__(self, device): @@ -462,8 +446,8 @@ def _getCidReporting(self): mapping_flags_2 = 0 self._mapping_flags = mapping_flags_1 | (mapping_flags_2 << 8) else: - raise FeatureCallError(msg='No reply from device.') - except FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning + raise exceptions.FeatureCallError(msg='No reply from device.') + except exceptions.FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning if logger.isEnabledFor(logging.WARNING): logger.warn( f'Feature Call Error in _getCidReporting on device {self._device} for cid {self._cid} - use defaults' @@ -498,7 +482,7 @@ def _setCidReporting(self, flags=None, remap=0): bfield = 0 for f, v in flags.items(): if v and FLAG_TO_CAPABILITY[f] not in self.flags: - raise FeatureNotSupported( + raise exceptions.FeatureNotSupported( msg=f'Tried to set mapping flag "{f}" on control "{self.key}" ' + f'which does not support "{FLAG_TO_CAPABILITY[f]}" on device {self._device}.' ) @@ -511,7 +495,7 @@ def _setCidReporting(self, flags=None, remap=0): self._mapping_flags &= ~int(f) if remap != 0 and remap not in self.remappable_to: - raise FeatureNotSupported( + raise exceptions.FeatureNotSupported( msg=f'Tried to remap control "{self.key}" to a control ID {remap} which it is not remappable to ' + f'on device {self._device}.' ) @@ -1044,7 +1028,7 @@ def value(self): def read(self): try: value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF) - except FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus) + except exceptions.FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus) if logger.isEnabledFor(logging.WARNING): logger.warn(f'Feature Call Error reading Gesture Spec on device {self._device} for spec {self.id} - use None') return None @@ -1129,7 +1113,7 @@ class Backlight: def __init__(self, device): response = device.feature_request(FEATURE.BACKLIGHT2, 0x00) if not response: - raise FeatureCallError(msg='No reply from device.') + raise exceptions.FeatureCallError(msg='No reply from device.') self.device = device self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = _unpack( ' %s', self, 'enabled' if enable else 'disabled', flag_names) return flag_bits @@ -154,33 +156,33 @@ def device_pairing_information(self, n): pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) if pair_info: wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3]) - kind = _hidpp10.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] + kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] return wpid, kind, 0 else: - raise _base.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid') wpid = 0 kind = None polling_rate = None pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver wpid = _strhex(pair_info[3:5]) - kind = _hidpp10.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F] + kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F] polling_rate = str(ord(pair_info[2:3])) + 'ms' elif self.receiver_kind == '27Mz': # 27Mhz receiver, fill extracting WPID from udev path wpid = _hid.find_paired_node_wpid(self.path, n) if not wpid: logger.error('Unable to get wpid from udev for device %d of %s', n, self) - raise _base.NoSuchDevice(number=n, receiver=self, error='Not present 27Mhz device') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='Not present 27Mhz device') kind = _hidpp10.DEVICE_KIND[self.get_kind_from_index(n)] elif not self.receiver_kind == 'unifying': # unifying protocol not supported, may be an old Nano receiver device_info = self.read_register(_R.receiver_info, 0x04) if device_info: wpid = _strhex(device_info[3:5]) - kind = _hidpp10.DEVICE_KIND[0x00] # unknown kind + kind = _hidpp10_constants.DEVICE_KIND[0x00] # unknown kind else: - raise _base.NoSuchDevice(number=n, receiver=self, error='read pairing information - non-unifying') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='read pairing information - non-unifying') else: - raise _base.NoSuchDevice(number=n, receiver=self, error='read pairing information') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='read pairing information') return wpid, kind, polling_rate def device_extended_pairing_information(self, n): @@ -217,7 +219,7 @@ def get_kind_from_index(self, index): kind = 3 else: # unknown device number on 27Mhz receiver logger.error('failed to calculate device kind for device %d of %s', index, self) - raise _base.NoSuchDevice(number=index, receiver=self, error='Unknown 27Mhz device number') + raise exceptions.NoSuchDevice(number=index, receiver=self, error='Unknown 27Mhz device number') return kind def notify_devices(self): @@ -239,7 +241,7 @@ def register_new_device(self, number, notification=None): logger.info('%s: found new device %d (%s)', self, number, dev.wpid) self._devices[number] = dev return dev - except _base.NoSuchDevice as e: + except exceptions.NoSuchDevice as e: logger.warning('register new device failed for %s device %d error %s', e.receiver, e.number, e.error) logger.warning('%s: looked for device %d, not found', self, number) diff --git a/lib/solaar/cli/show.py b/lib/solaar/cli/show.py index 9b204d0ab1..307366ffe1 100644 --- a/lib/solaar/cli/show.py +++ b/lib/solaar/cli/show.py @@ -16,7 +16,7 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -from logitech_receiver import base as _base +from logitech_receiver import exceptions from logitech_receiver import hidpp10 as _hidpp10 from logitech_receiver import hidpp20 as _hidpp20 from logitech_receiver import receiver as _receiver @@ -85,7 +85,7 @@ def _print_device(dev, num=None): # try to ping the device to see if it actually exists and to wake it up try: dev.ping() - except _base.NoSuchDevice: + except exceptions.NoSuchDevice: print(' %s: Device not found' % num or dev.number) return diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 3ddba6d5be..c3d7580d2f 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -26,7 +26,8 @@ from logitech_receiver import Device, Receiver from logitech_receiver import base as _base -from logitech_receiver import hidpp10 as _hidpp10 +from logitech_receiver import exceptions +from logitech_receiver import hidpp10_constants as _hidpp10_constants from logitech_receiver import listener as _listener from logitech_receiver import notifications as _notifications from logitech_receiver import status as _status @@ -40,8 +41,8 @@ logger = logging.getLogger(__name__) -_R = _hidpp10.REGISTERS -_IR = _hidpp10.INFO_SUBREGISTERS +_R = _hidpp10_constants.REGISTERS +_IR = _hidpp10_constants.INFO_SUBREGISTERS # # @@ -87,7 +88,7 @@ def has_started(self): logger.info('%s: notifications listener has started (%s)', self.receiver, self.receiver.handle) nfs = self.receiver.enable_connection_notifications() if logger.isEnabledFor(logging.WARNING): - if not self.receiver.isDevice and not ((nfs if nfs else 0) & _hidpp10.NOTIFICATION_FLAG.wireless): + if not self.receiver.isDevice and not ((nfs if nfs else 0) & _hidpp10_constants.NOTIFICATION_FLAG.wireless): logger.warning( 'Receiver on %s might not support connection notifications, GUI might not show its devices', self.receiver.path @@ -402,7 +403,7 @@ def _process_add(device_info, retry): _error_callback('permissions', device_info.path) else: _error_callback('nodevice', device_info.path) - except _base.NoReceiver: + except exceptions.NoReceiver: _error_callback('nodevice', device_info.path)