From 1d2c90867db569fa1722ba35f6b2d539b3c2ecca Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Thu, 22 Feb 2024 09:33:48 -0500 Subject: [PATCH] device: clean up receiver code --- lib/logitech_receiver/receiver.py | 67 +++++++++++++------------------ 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index d3b5920b49..ade67292e1 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -1,6 +1,5 @@ -# -*- python-mode -*- - ## Copyright (C) 2012-2013 Daniel Pavel +## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/ ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -22,21 +21,13 @@ import hidapi as _hid from . import base as _base -from . import exceptions -from . import hidpp10 as _hidpp10 -from . import hidpp10_constants as _hidpp10_constants -from .base import product_information as _product_information -from .common import strhex as _strhex +from . import exceptions, hidpp10, hidpp10_constants from .device import Device logger = logging.getLogger(__name__) -_R = _hidpp10_constants.REGISTERS -_IR = _hidpp10_constants.INFO_SUBREGISTERS - -# -# -# +_R = hidpp10_constants.REGISTERS +_IR = hidpp10_constants.INFO_SUBREGISTERS class Receiver: @@ -55,7 +46,7 @@ def __init__(self, handle, path, product_id, setting_callback=None): self.path = path self.product_id = product_id self.setting_callback = setting_callback - product_info = _product_information(self.product_id) + product_info = _base.product_information(self.product_id) if not product_info: logger.warning("Unknown receiver type: %s", self.product_id) product_info = {} @@ -64,13 +55,13 @@ def __init__(self, handle, path, product_id, setting_callback=None): # read the serial immediately, so we can find out max_devices if self.receiver_kind == "bolt": serial_reply = self.read_register(_R.bolt_uniqueId) - self.serial = _strhex(serial_reply) + self.serial = serial_reply.hex().upper() self.max_devices = product_info.get("max_devices", 1) self.may_unpair = product_info.get("may_unpair", False) else: serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) if serial_reply: - self.serial = _strhex(serial_reply[1:5]) + self.serial = serial_reply[1:5].hex().upper() self.max_devices = ord(serial_reply[6:7]) if self.max_devices <= 0 or self.max_devices > 6: self.max_devices = product_info.get("max_devices", 1) @@ -107,7 +98,7 @@ def __del__(self): @property def firmware(self): if self._firmware is None and self.handle: - self._firmware = _hidpp10.get_firmware(self) + self._firmware = hidpp10.get_firmware(self) return self._firmware # how many pairings remain (None for unknown, -1 for unlimited) @@ -127,19 +118,19 @@ def enable_connection_notifications(self, enable=True): if enable: set_flag_bits = ( - _hidpp10_constants.NOTIFICATION_FLAG.battery_status - | _hidpp10_constants.NOTIFICATION_FLAG.wireless - | _hidpp10_constants.NOTIFICATION_FLAG.software_present + hidpp10_constants.NOTIFICATION_FLAG.battery_status + | hidpp10_constants.NOTIFICATION_FLAG.wireless + | hidpp10_constants.NOTIFICATION_FLAG.software_present ) else: set_flag_bits = 0 - ok = _hidpp10.set_notification_flags(self, set_flag_bits) + ok = hidpp10.set_notification_flags(self, set_flag_bits) if ok is None: logger.warning("%s: failed to %s receiver notifications", self, "enable" if enable else "disable") return None - flag_bits = _hidpp10.get_notification_flags(self) - flag_names = None if flag_bits is None else tuple(_hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits)) + flag_bits = hidpp10.get_notification_flags(self) + flag_names = None if flag_bits is None else tuple(hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits)) if logger.isEnabledFor(logging.INFO): logger.info("%s: receiver notifications %s => %s", self, "enabled" if enable else "disabled", flag_names) return flag_bits @@ -161,9 +152,9 @@ def device_pairing_information(self, n: int) -> dict: if self.receiver_kind == "bolt": pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) if pair_info: - wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3]) - kind = _hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] - serial = _strhex(pair_info[4:8]) + wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper() + kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] + serial = pair_info[4:8].hex().upper() return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"} else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register") @@ -172,31 +163,31 @@ def device_pairing_information(self, n: int) -> dict: power_switch = "(unknown)" pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) if pair_info: # either a Unifying receiver or a Unifying-ready receiver - wpid = _strhex(pair_info[3:5]) - kind = _hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F] + wpid = pair_info[3:5].hex().upper() + kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F] polling_rate = str(ord(pair_info[2:3])) + "ms" elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path wpid = _hid.find_paired_node_wpid(self.path, n) if not wpid: logger.error("Unable to get wpid from udev for device %d of %s", n, self) raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device") - kind = _hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)] + kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)] elif not self.receiver_kind == "unifying": # may be an old Nano receiver device_info = self.read_register(_R.receiver_info, 0x04) if device_info: - wpid = _strhex(device_info[3:5]) - kind = _hidpp10_constants.DEVICE_KIND[0x00] # unknown kind + wpid = device_info[3:5].hex().upper() + kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) if pair_info: - power_switch = _hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] + power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] else: # some Nano receivers? pair_info = self.read_register(0x2D5) if pair_info: - serial = _strhex(pair_info[1:5]) + serial = pair_info[1:5].hex().upper() return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch} def get_kind_from_index(self, index): @@ -233,12 +224,12 @@ def register_new_device(self, number, notification=None): if notification is not None: online = not bool(ord(notification.data[0:1]) & 0x40) # the rest may be redundant, but keep it around for now - info["wpid"] = _strhex(notification.data[2:3] + notification.data[1:2]) + info["wpid"] = (notification.data[2:3] + notification.data[1:2]).hex().upper() kind = ord(notification.data[0:1]) & 0x0F if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index - info["wpid"] = "00" + _strhex(notification.data[2:3]) + info["wpid"] = "00" + notification.data[2:3].hex().upper() kind = self.get_kind_from_index(number) - info["kind"] = _hidpp10_constants.DEVICE_KIND[kind] + info["kind"] = hidpp10_constants.DEVICE_KIND[kind] else: online = True dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback) @@ -289,8 +280,8 @@ def request(self, request_id, *params): if bool(self): return _base.request(self.handle, 0xFF, request_id, *params) - read_register = _hidpp10.read_register - write_register = _hidpp10.write_register + read_register = hidpp10.read_register + write_register = hidpp10.write_register def __iter__(self): connected_devices = self.count()