Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract constants and exception into a module #2287

Closed
wants to merge 11 commits into from
53 changes: 16 additions & 37 deletions lib/logitech_receiver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@

import hidapi as _hid

from . import hidpp10 as _hidpp10
from . import exceptions
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp20 as _hidpp20
from . import hidpp20_constants as _hidpp20_constants
from .base_usb import ALL as _RECEIVER_USB_IDS
from .common import KwException as _KwException
from .common import strhex as _strhex
from .descriptors import DEVICES as _DEVICES

Expand Down Expand Up @@ -113,29 +114,6 @@ def product_information(usb_id):
# when pinging, be extra patient (no longer)
_PING_TIMEOUT = DEFAULT_TIMEOUT

#
# Exceptions that may be raised by this API.
#


class NoReceiver(_KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
unloaded."""
pass


class NoSuchDevice(_KwException):
"""Raised when trying to reach a device number not paired to the receiver."""
pass


class DeviceUnreachable(_KwException):
"""Raised when a request is made to an unreachable (turned off) device."""
pass


#
#
#
Expand Down Expand Up @@ -263,7 +241,7 @@ def write(handle, devnumber, data, long_message=False):
except Exception as reason:
logger.error('write failed, assuming handle %r no longer available', handle)
close(handle)
raise NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason)


def read(handle, timeout=DEFAULT_TIMEOUT):
Expand Down Expand Up @@ -312,7 +290,7 @@ def _read(handle, timeout):
except Exception as reason:
logger.warning('read failed, assuming handle %r no longer available', handle)
close(handle)
raise NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason)

if data and check_message(data): # ignore messages that fail check
report_id = ord(data[:1])
Expand Down Expand Up @@ -343,7 +321,7 @@ def _skip_incoming(handle, ihandle, notifications_hook):
except Exception as reason:
logger.error('read failed, assuming receiver %s no longer available', handle)
close(handle)
raise NoReceiver(reason=reason)
raise exceptions.NoReceiver(reason=reason)

if data:
if check_message(data): # only process messages that pass check
Expand Down Expand Up @@ -464,7 +442,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
notifications_hook = getattr(handle, 'notifications_hook', None)
try:
_skip_incoming(handle, ihandle, notifications_hook)
except NoReceiver:
except exceptions.NoReceiver:
logger.warning('device or receiver disconnected')
return None
write(ihandle, devnumber, request_data, long_message)
Expand All @@ -489,15 +467,15 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
'(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp10.ERROR[error]
_hidpp10_constants.ERROR[error]
)
return _hidpp10.ERROR[error] if return_error else None
return _hidpp10_constants.ERROR[error] if return_error else None
if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]:
# a HID++ 2.0 feature call returned with an error
error = ord(reply_data[3:4])
logger.error(
'(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp20.ERROR[error]
_hidpp20_constants.ERROR[error]
)
raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)

Expand Down Expand Up @@ -549,7 +527,7 @@ def ping(handle, devnumber, long_message=False):
notifications_hook = getattr(handle, 'notifications_hook', None)
try:
_skip_incoming(handle, int(handle), notifications_hook)
except NoReceiver:
except exceptions.NoReceiver:
logger.warning('device or receiver disconnected')
return

Expand All @@ -574,13 +552,14 @@ def ping(handle, devnumber, long_message=False):
if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and \
reply_data[1:3] == request_data[:2]: # error response
error = ord(reply_data[3:4])
if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
if error == _hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
return 1.0
if error == _hidpp10.ERROR.resource_error or error == _hidpp10.ERROR.connection_request_failed:
if error == _hidpp10_constants.ERROR.resource_error or \
error == _hidpp10_constants.ERROR.connection_request_failed:
return # device unreachable
if error == _hidpp10.ERROR.unknown_device: # no paired device with that number
if error == _hidpp10_constants.ERROR.unknown_device: # no paired device with that number
logger.error('(%s) device %d error on ping request: unknown device', handle, devnumber)
raise NoSuchDevice(number=devnumber, request=request_id)
raise exceptions.NoSuchDevice(number=devnumber, request=request_id)

if notifications_hook:
n = make_notification(report_id, reply_devnumber, reply_data)
Expand Down
4 changes: 2 additions & 2 deletions lib/logitech_receiver/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
# - the device uses a USB interface other than 2
# - the name or codename should be different from what the device reports

from .hidpp10 import DEVICE_KIND as _DK
from .hidpp10 import REGISTERS as _R
from .hidpp10_constants import DEVICE_KIND as _DK
from .hidpp10_constants import REGISTERS as _R

#
#
Expand Down
31 changes: 17 additions & 14 deletions lib/logitech_receiver/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@

from . import base as _base
from . import descriptors as _descriptors
from . import exceptions
from . import hidpp10 as _hidpp10
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp20 as _hidpp20
from . import hidpp20_constants as _hidpp20_constants
from .common import strhex as _strhex
from .settings_templates import check_feature_settings as _check_feature_settings

logger = logging.getLogger(__name__)

_R = _hidpp10.REGISTERS
_IR = _hidpp10.INFO_SUBREGISTERS
_R = _hidpp10_constants.REGISTERS
_IR = _hidpp10_constants.INFO_SUBREGISTERS

KIND_MAP = {kind: _hidpp10.DEVICE_KIND[str(kind)] for kind in _hidpp20.DEVICE_KIND}
KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in _hidpp20_constants.DEVICE_KIND}

#
#
Expand Down Expand Up @@ -123,20 +126,20 @@ def __init__(
if receiver.receiver_kind == '27Mhz': # 27 Mhz receiver
self.wpid = '00' + _strhex(link_notification.data[2:3])
kind = receiver.get_kind_from_index(number)
self._kind = _hidpp10.DEVICE_KIND[kind]
self._kind = _hidpp10_constants.DEVICE_KIND[kind]
elif receiver.receiver_kind == '27Mhz': # 27 Mhz receiver doesn't have pairing registers
self.wpid = _hid.find_paired_node_wpid(receiver.path, number)
if not self.wpid:
logger.error('Unable to get wpid from udev for device %d of %s', number, receiver)
raise _base.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device')
raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device')
kind = receiver.get_kind_from_index(number)
self._kind = _hidpp10.DEVICE_KIND[kind]
self._kind = _hidpp10_constants.DEVICE_KIND[kind]
else: # get information from pairing registers
self.online = True
self.update_pairing_information()
self.update_extended_pairing_information()
if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found
raise _base.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial')
raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial')

# the wpid is set to None on this object when the device is unpaired
assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver)
Expand Down Expand Up @@ -207,7 +210,7 @@ def name(self):
if not self.online: # be very defensive
try:
self.ping()
except _base.NoSuchDevice:
except exceptions.NoSuchDevice:
pass
if self.online and self.protocol >= 2.0:
self._name = _hidpp20.get_name(self)
Expand Down Expand Up @@ -414,10 +417,10 @@ def enable_connection_notifications(self, enable=True):

if enable:
set_flag_bits = (
_hidpp10.NOTIFICATION_FLAG.battery_status
| _hidpp10.NOTIFICATION_FLAG.keyboard_illumination
| _hidpp10.NOTIFICATION_FLAG.wireless
| _hidpp10.NOTIFICATION_FLAG.software_present
_hidpp10_constants.NOTIFICATION_FLAG.battery_status
| _hidpp10_constants.NOTIFICATION_FLAG.keyboard_illumination
| _hidpp10_constants.NOTIFICATION_FLAG.wireless
| _hidpp10_constants.NOTIFICATION_FLAG.software_present
)
else:
set_flag_bits = 0
Expand All @@ -426,7 +429,7 @@ def enable_connection_notifications(self, enable=True):
logger.warning('%s: failed to %s device notifications', self, 'enable' if enable else 'disable')

flag_bits = _hidpp10.get_notification_flags(self)
flag_names = None if flag_bits is None else tuple(_hidpp10.NOTIFICATION_FLAG.flag_names(flag_bits))
flag_names = None if flag_bits is None else tuple(_hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits))
if logger.isEnabledFor(logging.INFO):
logger.info('%s: device notifications %s %s', self, 'enabled' if enable else 'disabled', flag_names)
return flag_bits if ok else None
Expand Down Expand Up @@ -513,7 +516,7 @@ def __bool__(self):
def __str__(self):
try:
name = self.name or self.codename or '?'
except _base.NoSuchDevice:
except exceptions.NoSuchDevice:
name = 'name not available'
return '<Device(%d,%s,%s,%s)>' % (self.number, self.wpid or self.product_id, name, self.serial)

Expand Down
33 changes: 33 additions & 0 deletions lib/logitech_receiver/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from .common import KwException as _KwException

#
# Exceptions that may be raised by this API.
#


class NoReceiver(_KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
unloaded."""
pass


class NoSuchDevice(_KwException):
"""Raised when trying to reach a device number not paired to the receiver."""
pass


class DeviceUnreachable(_KwException):
"""Raised when a request is made to an unreachable (turned off) device."""
pass


class FeatureNotSupported(_KwException):
"""Raised when trying to request a feature not supported by the device."""
pass


class FeatureCallError(_KwException):
"""Raised if the device replied to a feature call with an error."""
pass
Loading
Loading