diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 0713b5e066..c8ae17993e 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -28,7 +28,7 @@ from random import getrandbits from time import time -import hidapi as _hid +import hidapi from . import base_usb from . import common @@ -42,10 +42,6 @@ _hidpp20 = hidpp20.Hidpp20() -# -# -# - def _wired_device(product_id, interface): return {"vendor_id": 1133, "product_id": product_id, "bus_id": 3, "usb_interface": interface, "isDevice": True} @@ -140,7 +136,7 @@ def filter_receivers(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_lon def receivers(): """Enumerate all the receivers attached to the machine.""" - yield from _hid.enumerate(filter_receivers) + yield from hidapi.enumerate(filter_receivers) def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False): @@ -159,12 +155,12 @@ def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False): def receivers_and_devices(): """Enumerate all the receivers and devices directly attached to the machine.""" - yield from _hid.enumerate(filter) + yield from hidapi.enumerate(filter) def notify_on_receivers_glib(callback): """Watch for matching devices and notifies the callback on the GLib thread.""" - return _hid.monitor_glib(callback, filter) + return hidapi.monitor_glib(callback, filter) # @@ -185,7 +181,7 @@ def open_path(path): :returns: an open receiver handle if this is the right Linux device, or ``None``. """ - return _hid.open_path(path) + return hidapi.open_path(path) def open(): @@ -204,13 +200,11 @@ def close(handle): if handle: try: if isinstance(handle, int): - _hid.close(handle) + hidapi.close(handle) else: handle.close() - # logger.info("closed receiver handle %r", handle) return True except Exception: - # logger.exception("closing receiver handle %r", handle) pass return False @@ -248,7 +242,7 @@ def write(handle, devnumber, data, long_message=False): ) try: - _hid.write(int(handle), wdata) + hidapi.write(int(handle), wdata) except Exception as reason: logger.error("write failed, assuming handle %r no longer available", handle) close(handle) @@ -297,7 +291,7 @@ def _read(handle, timeout): try: # convert timeout to milliseconds, the hidapi expects it timeout = int(timeout * 1000) - data = _hid.read(int(handle), _MAX_READ_SIZE, timeout) + data = hidapi.read(int(handle), _MAX_READ_SIZE, timeout) except Exception as reason: logger.warning("read failed, assuming handle %r no longer available", handle) close(handle) @@ -331,7 +325,7 @@ def _skip_incoming(handle, ihandle, notifications_hook): while True: try: # read whatever is already in the buffer, if any - data = _hid.read(ihandle, _MAX_READ_SIZE, 0) + data = hidapi.read(ihandle, _MAX_READ_SIZE, 0) except Exception as reason: logger.error("read failed, assuming receiver %s no longer available", handle) close(handle) diff --git a/lib/logitech_receiver/common.py b/lib/logitech_receiver/common.py index 8dae8cf857..01ab88c9e0 100644 --- a/lib/logitech_receiver/common.py +++ b/lib/logitech_receiver/common.py @@ -14,6 +14,8 @@ ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +from __future__ import annotations + import binascii import dataclasses @@ -21,7 +23,7 @@ from typing import Optional from typing import Union -import yaml as _yaml +import yaml from solaar.i18n import _ @@ -355,8 +357,8 @@ def to_yaml(cls, dumper, data): return dumper.represent_mapping("!NamedInt", {"value": int(data), "name": data.name}, flow_style=True) -_yaml.SafeLoader.add_constructor("!NamedInt", NamedInt.from_yaml) -_yaml.add_representer(NamedInt, NamedInt.to_yaml) +yaml.SafeLoader.add_constructor("!NamedInt", NamedInt.from_yaml) +yaml.add_representer(NamedInt, NamedInt.to_yaml) class NamedInts: diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index f705e711b3..01185ba13b 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -22,7 +22,7 @@ from typing import Optional -import hidapi as _hid +import hidapi import solaar.configuration as _configuration from . import base @@ -69,7 +69,16 @@ class Device: read_register = hidpp10.read_register write_register = hidpp10.write_register - def __init__(self, receiver, number, online, pairing_info=None, handle=None, device_info=None, setting_callback=None): + def __init__( + self, + receiver, + number, + online, + pairing_info=None, + handle=None, + device_info=None, + setting_callback=None, + ): assert receiver or device_info if receiver: assert 0 < number <= 15 # some receivers have devices past their max # of devices @@ -116,7 +125,7 @@ def __init__(self, receiver, number, online, pairing_info=None, handle=None, dev self.cleanups = [] # functions to run on the device when it is closed if not self.path: - self.path = _hid.find_paired_node(receiver.path, number, 1) if receiver else None + self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None if not self.handle: try: self.handle = base.open_path(self.path) if self.path else None @@ -551,3 +560,4 @@ def __str__(self): def __del__(self): self.close() + # pass diff --git a/lib/logitech_receiver/diversion.py b/lib/logitech_receiver/diversion.py index cf7b135111..9919b5b0cc 100644 --- a/lib/logitech_receiver/diversion.py +++ b/lib/logitech_receiver/diversion.py @@ -14,17 +14,16 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import ctypes as _ctypes +import ctypes import logging import math import numbers -import os as _os -import os.path as _path -import platform as _platform +import os +import platform import socket import subprocess -import sys as _sys -import time as _time +import sys +import time from typing import Dict from typing import Tuple @@ -37,7 +36,7 @@ # There is no evdev on macOS or Windows. Diversion will not work without # it but other Solaar functionality is available. -if _platform.system() in ("Darwin", "Windows"): +if platform.system() in ("Darwin", "Windows"): evdev = None else: import evdev @@ -103,7 +102,7 @@ if logger.isEnabledFor(logging.INFO): logger.info("GDK Keymap %sset up", "" if gkeymap else "not ") -wayland = _os.getenv("WAYLAND_DISPLAY") # is this Wayland? +wayland = os.getenv("WAYLAND_DISPLAY") # is this Wayland? if wayland: logger.warning( "rules cannot access modifier keys in Wayland, " @@ -138,26 +137,26 @@ _dbus_interface = None -class XkbDisplay(_ctypes.Structure): +class XkbDisplay(ctypes.Structure): """opaque struct""" -class XkbStateRec(_ctypes.Structure): +class XkbStateRec(ctypes.Structure): _fields_ = [ - ("group", _ctypes.c_ubyte), - ("locked_group", _ctypes.c_ubyte), - ("base_group", _ctypes.c_ushort), - ("latched_group", _ctypes.c_ushort), - ("mods", _ctypes.c_ubyte), - ("base_mods", _ctypes.c_ubyte), - ("latched_mods", _ctypes.c_ubyte), - ("locked_mods", _ctypes.c_ubyte), - ("compat_state", _ctypes.c_ubyte), - ("grab_mods", _ctypes.c_ubyte), - ("compat_grab_mods", _ctypes.c_ubyte), - ("lookup_mods", _ctypes.c_ubyte), - ("compat_lookup_mods", _ctypes.c_ubyte), - ("ptr_buttons", _ctypes.c_ushort), + ("group", ctypes.c_ubyte), + ("locked_group", ctypes.c_ubyte), + ("base_group", ctypes.c_ushort), + ("latched_group", ctypes.c_ushort), + ("mods", ctypes.c_ubyte), + ("base_mods", ctypes.c_ubyte), + ("latched_mods", ctypes.c_ubyte), + ("locked_mods", ctypes.c_ubyte), + ("compat_state", ctypes.c_ubyte), + ("grab_mods", ctypes.c_ubyte), + ("compat_grab_mods", ctypes.c_ubyte), + ("lookup_mods", ctypes.c_ubyte), + ("compat_lookup_mods", ctypes.c_ubyte), + ("ptr_buttons", ctypes.c_ushort), ] # something strange is happening here but it is not being used @@ -177,7 +176,7 @@ def x11_setup(): if logger.isEnabledFor(logging.INFO): logger.info("X11 library loaded and display set up") except Exception: - logger.warning("X11 not available - some rule capabilities inoperable", exc_info=_sys.exc_info()) + logger.warning("X11 not available - some rule capabilities inoperable", exc_info=sys.exc_info()) _x11 = False xtest_available = False return _x11 @@ -192,7 +191,7 @@ def gnome_dbus_interface_setup(): remote_object = bus.get_object("org.gnome.Shell", "/io/github/pwr_solaar/solaar") _dbus_interface = dbus.Interface(remote_object, "io.github.pwr_solaar.solaar") except dbus.exceptions.DBusException: - logger.warning("Solaar Gnome extension not installed - some rule capabilities inoperable", exc_info=_sys.exc_info()) + logger.warning("Solaar Gnome extension not installed - some rule capabilities inoperable", exc_info=sys.exc_info()) _dbus_interface = False return _dbus_interface @@ -202,14 +201,14 @@ def xkb_setup(): if Xkbdisplay is not None: return Xkbdisplay try: # set up to get keyboard state using ctypes interface to libx11 - X11Lib = _ctypes.cdll.LoadLibrary("libX11.so") - X11Lib.XOpenDisplay.restype = _ctypes.POINTER(XkbDisplay) - X11Lib.XkbGetState.argtypes = [_ctypes.POINTER(XkbDisplay), _ctypes.c_uint, _ctypes.POINTER(XkbStateRec)] + X11Lib = ctypes.cdll.LoadLibrary("libX11.so") + X11Lib.XOpenDisplay.restype = ctypes.POINTER(XkbDisplay) + X11Lib.XkbGetState.argtypes = [ctypes.POINTER(XkbDisplay), ctypes.c_uint, ctypes.POINTER(XkbStateRec)] Xkbdisplay = X11Lib.XOpenDisplay(None) if logger.isEnabledFor(logging.INFO): logger.info("XKB display set up") except Exception: - logger.warning("XKB display not available - rules cannot access keyboard group", exc_info=_sys.exc_info()) + logger.warning("XKB display not available - rules cannot access keyboard group", exc_info=sys.exc_info()) Xkbdisplay = False return Xkbdisplay @@ -261,7 +260,7 @@ def setup_uinput(): def kbdgroup(): if xkb_setup(): state = XkbStateRec() - X11Lib.XkbGetState(Xkbdisplay, XkbUseCoreKbd, _ctypes.pointer(state)) + X11Lib.XkbGetState(Xkbdisplay, XkbUseCoreKbd, ctypes.pointer(state)) return state.group else: return None @@ -1213,7 +1212,7 @@ def evaluate(self, feature, notification, device, last_result): self.keyDown(self.key_symbols, current) if self.action != DEPRESS: self.keyUp(reversed(self.key_symbols), current) - _time.sleep(0.01) + time.sleep(0.01) else: logger.warning("no keymap so cannot determine which keycode to send") return None @@ -1252,7 +1251,7 @@ def evaluate(self, feature, notification, device, last_result): logger.info("MouseScroll action: %s %s %s", self.amounts, last_result, amounts) dx, dy = amounts simulate_scroll(dx, dy) - _time.sleep(0.01) + time.sleep(0.01) return None def data(self): @@ -1288,7 +1287,7 @@ def evaluate(self, feature, notification, device, last_result): logger.info(f"MouseClick action: {int(self.count)} {self.button}") if self.button and self.count: click(buttons[self.button], self.count) - _time.sleep(0.01) + time.sleep(0.01) return None def data(self): @@ -1502,8 +1501,8 @@ def process_notification(device, notification, feature): GLib.idle_add(evaluate_rules, feature, notification, device) -_XDG_CONFIG_HOME = _os.environ.get("XDG_CONFIG_HOME") or _path.expanduser(_path.join("~", ".config")) -_file_path = _path.join(_XDG_CONFIG_HOME, "solaar", "rules.yaml") +_XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser(os.path.join("~", ".config")) +_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "rules.yaml") rules = built_in_rules @@ -1560,7 +1559,7 @@ def load_config_rule_file(): """Loads user configured rules.""" global rules - if _path.isfile(_file_path): + if os.path.isfile(_file_path): rules = _load_rule_config(_file_path) diff --git a/lib/logitech_receiver/exceptions.py b/lib/logitech_receiver/exceptions.py index 27989aa1d0..0f96276e51 100644 --- a/lib/logitech_receiver/exceptions.py +++ b/lib/logitech_receiver/exceptions.py @@ -15,14 +15,14 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -from .common import KwException as _KwException +from .common import KwException # # Exceptions that may be raised by this API. # -class NoReceiver(_KwException): +class NoReceiver(KwException): """Raised when trying to talk through a previously open handle, when the receiver is no longer available. Should only happen if the receiver is physically disconnected from the machine, or its kernel driver module is @@ -31,25 +31,25 @@ class NoReceiver(_KwException): pass -class NoSuchDevice(_KwException): +class NoSuchDevice(KwException): """Raised when trying to reach a device number not paired to the receiver.""" pass -class DeviceUnreachable(_KwException): +class DeviceUnreachable(KwException): """Raised when a request is made to an unreachable (turned off) device.""" pass -class FeatureNotSupported(_KwException): +class FeatureNotSupported(KwException): """Raised when trying to request a feature not supported by the device.""" pass -class FeatureCallError(_KwException): +class FeatureCallError(KwException): """Raised if the device replied to a feature call with an error.""" pass diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 1acb30a8d8..9b1d0863ec 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -24,7 +24,7 @@ from typing import List from typing import Optional -import yaml as _yaml +import yaml from solaar.i18n import _ @@ -949,11 +949,11 @@ def __eq__(self, other): return type(self) == type(other) and self.to_bytes() == other.to_bytes() def __str__(self): - return _yaml.dump(self, width=float("inf")).rstrip("\n") + return yaml.dump(self, width=float("inf")).rstrip("\n") -_yaml.SafeLoader.add_constructor("!LEDEffectSetting", LEDEffectSetting.from_yaml) -_yaml.add_representer(LEDEffectSetting, LEDEffectSetting.to_yaml) +yaml.SafeLoader.add_constructor("!LEDEffectSetting", LEDEffectSetting.from_yaml) +yaml.add_representer(LEDEffectSetting, LEDEffectSetting.to_yaml) class LEDEffectInfo: # an effect that a zone can do @@ -1132,8 +1132,8 @@ def __repr__(self): ) -_yaml.SafeLoader.add_constructor("!Button", Button.from_yaml) -_yaml.add_representer(Button, Button.to_yaml) +yaml.SafeLoader.add_constructor("!Button", Button.from_yaml) +yaml.add_representer(Button, Button.to_yaml) class OnboardProfile: @@ -1172,7 +1172,7 @@ def from_bytes(cls, sector, enabled, buttons, gbuttons, bytes): po_timeout=_unpack(" dict: - wpid = _hid.find_paired_node_wpid(self.path, number) # extract WPID from udev path + wpid = hidapi.find_paired_node_wpid(self.path, number) # extract WPID from udev path if not wpid: logger.error("Unable to get wpid from udev for device %d of %s", number, self) raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device") @@ -491,9 +491,9 @@ def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]: """Opens a Logitech Receiver found attached to the machine, by Linux device path.""" try: - handle = _base.open_path(device_info.path) + handle = base.open_path(device_info.path) if handle: - product_info = _base.product_information(device_info.product_id) + product_info = base.product_information(device_info.product_id) if not product_info: logger.warning("Unknown receiver type: %s", device_info.product_id) product_info = {} @@ -502,7 +502,7 @@ def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]: return rclass(kind, product_info, handle, device_info.path, device_info.product_id, setting_callback) except OSError as e: logger.exception("open %s", device_info) - if e.errno == _errno.EACCES: + if e.errno == errno.EACCES: raise except Exception: logger.exception("open %s", device_info) diff --git a/lib/logitech_receiver/special_keys.py b/lib/logitech_receiver/special_keys.py index ca351c5163..5d113a8431 100644 --- a/lib/logitech_receiver/special_keys.py +++ b/lib/logitech_receiver/special_keys.py @@ -17,15 +17,15 @@ # Reprogrammable keys information # Mostly from Logitech documentation, but with some edits for better Lunix compatibility -import os as _os +import os -import yaml as _yaml +import yaml from .common import NamedInts as _NamedInts from .common import UnsortedNamedInts as _UnsortedNamedInts -_XDG_CONFIG_HOME = _os.environ.get("XDG_CONFIG_HOME") or _os.path.expanduser(_os.path.join("~", ".config")) -_keys_file_path = _os.path.join(_XDG_CONFIG_HOME, "solaar", "keys.yaml") +_XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser(os.path.join("~", ".config")) +_keys_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "keys.yaml") #