diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000000..4ff7f5b12f --- /dev/null +++ b/.coveragerc @@ -0,0 +1,22 @@ +[run] +branch = True + +source = + hid_parser + hidapi + keysyms + logitech_receiver + solaar + +omit = + */tests/* + */setup.py + */__main__.py + +[report] +exclude_lines = + pragma: no cover + if __name__ == '__main__': + if typing.TYPE_CHECKING + +fail_under = 40 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a61726b83f..bdd22bd63b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,6 +31,17 @@ jobs: run: | make test + - name: Upload coverage to Codecov + if: github.ref == 'refs/heads/master' + uses: codecov/codecov-action@v4.5.0 + with: + directory: ./coverage/reports/ + env_vars: OS, PYTHON + files: ./coverage.xml + flags: unittests + name: codecov-umbrella + token: ${{ secrets.CODECOV_TOKEN }} + macos-tests: runs-on: macos-latest @@ -55,4 +66,14 @@ jobs: make install_pip PIP_ARGS='.["test"]' - name: Run tests on macOS run: | - export DYLD_LIBRARY_PATH=$(brew --prefix hidapi)/lib:$DYLD_LIBRARY_PATH && pytest --cov=lib/ tests/ + export DYLD_LIBRARY_PATH=$(brew --prefix hidapi)/lib:$DYLD_LIBRARY_PATH && pytest --cov --cov-report=xml + - name: Upload coverage to Codecov + if: github.ref == 'refs/heads/master' + uses: codecov/codecov-action@v4.5.0 + with: + directory: ./coverage/reports/ + env_vars: OS, PYTHON + files: ./coverage.xml + flags: unittests + name: codecov-umbrella + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/Makefile b/Makefile index 05261fc514..204f23f60a 100644 --- a/Makefile +++ b/Makefile @@ -66,4 +66,4 @@ lint: test: @echo "Running Solaar tests" - pytest --cov=lib/ tests/ + pytest --cov --cov-report=xml diff --git a/README.md b/README.md index e6987eb6d4..3cddc45e8c 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ that are otherwise ignored by the Linux input system. Manual Installation +[![codecov](https://codecov.io/gh/pwr-Solaar/Solaar/graph/badge.svg?token=D7YWFEWID6)](https://codecov.io/gh/pwr-Solaar/Solaar) [![License: GPL v2](https://img.shields.io/badge/License-GPL%20v2+-blue.svg)](../LICENSE.txt)
diff --git a/lib/hidapi/__init__.py b/lib/hidapi/__init__.py
index 06fd2d641e..e69de29bb2 100644
--- a/lib/hidapi/__init__.py
+++ b/lib/hidapi/__init__.py
@@ -1,47 +0,0 @@
-## Copyright (C) 2012-2013 Daniel Pavel
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License along
-## with this program; if not, write to the Free Software Foundation, Inc.,
-## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""Generic Human Interface Device API."""
-
-import platform
-
-if platform.system() in ("Darwin", "Windows"):
- from hidapi.hidapi_impl import close # noqa: F401
- from hidapi.hidapi_impl import enumerate # noqa: F401
- from hidapi.hidapi_impl import find_paired_node # noqa: F401
- from hidapi.hidapi_impl import find_paired_node_wpid # noqa: F401
- from hidapi.hidapi_impl import get_manufacturer # noqa: F401
- from hidapi.hidapi_impl import get_product # noqa: F401
- from hidapi.hidapi_impl import get_serial # noqa: F401
- from hidapi.hidapi_impl import monitor_glib # noqa: F401
- from hidapi.hidapi_impl import open # noqa: F401
- from hidapi.hidapi_impl import open_path # noqa: F401
- from hidapi.hidapi_impl import read # noqa: F401
- from hidapi.hidapi_impl import write # noqa: F401
-else:
- from hidapi.udev_impl import close # noqa: F401
- from hidapi.udev_impl import enumerate # noqa: F401
- from hidapi.udev_impl import find_paired_node # noqa: F401
- from hidapi.udev_impl import find_paired_node_wpid # noqa: F401
- from hidapi.udev_impl import get_manufacturer # noqa: F401
- from hidapi.udev_impl import get_product # noqa: F401
- from hidapi.udev_impl import get_serial # noqa: F401
- from hidapi.udev_impl import monitor_glib # noqa: F401
- from hidapi.udev_impl import open # noqa: F401
- from hidapi.udev_impl import open_path # noqa: F401
- from hidapi.udev_impl import read # noqa: F401
- from hidapi.udev_impl import write # noqa: F401
-
-__version__ = "0.9"
diff --git a/lib/hidapi/hidapi_impl.py b/lib/hidapi/hidapi_impl.py
index 6d70ba5610..13d834a7ed 100644
--- a/lib/hidapi/hidapi_impl.py
+++ b/lib/hidapi/hidapi_impl.py
@@ -34,11 +34,11 @@
from threading import Thread
from time import sleep
-import gi
-
from hidapi.common import DeviceInfo
if typing.TYPE_CHECKING:
+ import gi
+
gi.require_version("Gdk", "3.0")
from gi.repository import GLib # NOQA: E402
@@ -263,10 +263,10 @@ def _match(action, device, filterfn):
if not device["hidpp_short"] and not device["hidpp_long"]:
return None
- filter = filterfn(bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"])
- if not filter:
+ filter_func = filterfn(bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"])
+ if not filter_func:
return
- isDevice = filter.get("isDevice")
+ isDevice = filter_func.get("isDevice")
if action == "add":
d_info = DeviceInfo(
@@ -305,12 +305,12 @@ def _match(action, device, filterfn):
return d_info
-def find_paired_node(receiver_path, index, timeout):
+def find_paired_node(receiver_path: str, index: int, timeout: int):
"""Find the node of a device paired with a receiver"""
return None
-def find_paired_node_wpid(receiver_path, index):
+def find_paired_node_wpid(receiver_path: str, index: int):
"""Find the node of a device paired with a receiver, get wpid from udev"""
return None
diff --git a/lib/hidapi/hidconsole.py b/lib/hidapi/hidconsole.py
index e418e94575..11b9ff9ced 100644
--- a/lib/hidapi/hidconsole.py
+++ b/lib/hidapi/hidconsole.py
@@ -17,6 +17,7 @@
import argparse
import os
import os.path
+import platform
import readline
import sys
import time
@@ -27,7 +28,10 @@
from threading import Lock
from threading import Thread
-import hidapi
+if platform.system() == "Linux":
+ import hidapi.udev_impl as hidapi
+else:
+ import hidapi.hidapi_impl as hidapi
LOGITECH_VENDOR_ID = 0x046D
diff --git a/lib/hidapi/udev_impl.py b/lib/hidapi/udev_impl.py
index 4d1fe2dbc3..58dc6644cf 100644
--- a/lib/hidapi/udev_impl.py
+++ b/lib/hidapi/udev_impl.py
@@ -37,12 +37,13 @@
from time import sleep
from time import time
-import gi
import pyudev
from hidapi.common import DeviceInfo
if typing.TYPE_CHECKING:
+ import gi
+
gi.require_version("Gdk", "3.0")
from gi.repository import GLib # NOQA: E402
@@ -77,7 +78,7 @@ def exit():
# The filterfn is used to determine whether this is a device of interest to Solaar.
# It is given the bus id, vendor id, and product id and returns a dictionary
# with the required hid_driver and usb_interface and whether this is a receiver or device.
-def _match(action, device, filterfn):
+def _match(action, device, filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Dbus event {action} {device}")
hid_device = device.find_parent("hid")
@@ -112,11 +113,11 @@ def _match(action, device, filterfn):
"Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s", device.device_node, bid, vid, pid, e
)
- filter = filterfn(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long)
- if not filter:
+ filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long)
+ if not filtered_result:
return
- interface_number = filter.get("usb_interface")
- isDevice = filter.get("isDevice")
+ interface_number = filtered_result.get("usb_interface")
+ isDevice = filtered_result.get("isDevice")
if action == "add":
hid_driver_name = hid_device.properties.get("DRIVER")
@@ -175,7 +176,7 @@ def _match(action, device, filterfn):
return d_info
-def find_paired_node(receiver_path, index, timeout):
+def find_paired_node(receiver_path: str, index: int, timeout: int):
"""Find the node of a device paired with a receiver"""
context = pyudev.Context()
receiver_phys = pyudev.Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS")
@@ -259,7 +260,7 @@ def _process_udev_event(monitor, condition, cb, filterfn):
m.start()
-def enumerate(filterfn):
+def enumerate(filter_func: typing.Callable[[int, int, int, bool, bool], dict[str, typing.Any]]):
"""Enumerate the HID Devices.
List all the HID devices attached to the system, optionally filtering by
@@ -271,7 +272,7 @@ def enumerate(filterfn):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Starting dbus enumeration")
for dev in pyudev.Context().list_devices(subsystem="hidraw"):
- dev_info = _match("add", dev, filterfn)
+ dev_info = _match("add", dev, filter_func)
if dev_info:
yield dev_info
diff --git a/lib/keysyms/generate.py b/lib/keysyms/generate.py
index 5368f0c7c4..f913036a61 100755
--- a/lib/keysyms/generate.py
+++ b/lib/keysyms/generate.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python3
+"""Extract key symbol encodings from X11 header files."""
+
from pathlib import Path
from pprint import pprint
from re import findall
from subprocess import run
from tempfile import TemporaryDirectory
-repo = "https://github.com/freedesktop/xorg-proto-x11proto.git"
-xx = "https://gitlab.freedesktop.org/xorg/proto/xorgproto/-/tree/master/include/X11/"
repo = "https://gitlab.freedesktop.org/xorg/proto/xorgproto.git"
pattern = r"#define XK_(\w+)\s+0x(\w+)(?:\s+/\*\s+U\+(\w+))?"
xf86pattern = r"#define XF86XK_(\w+)\s+0x(\w+)(?:\s+/\*\s+U\+(\w+))?"
@@ -14,28 +14,24 @@
def main():
keysymdef = {}
+ keysym_files = [
+ ("include/X11/keysymdef.h", pattern, ""),
+ ("include/X11/XF86keysym.h", xf86pattern, "XF86_"),
+ ]
with TemporaryDirectory() as temp:
run(["git", "clone", repo, "."], cwd=temp)
- # text = Path(temp, 'keysymdef.h').read_text()
- text = Path(temp, "include/X11/keysymdef.h").read_text()
- for name, sym, uni in findall(pattern, text):
- sym = int(sym, 16)
- uni = int(uni, 16) if uni else None
- if keysymdef.get(name, None):
- print("KEY DUP", name)
- keysymdef[name] = sym
- # text = Path(temp, 'keysymdef.h').read_text()
- text = Path(temp, "include/X11/XF86keysym.h").read_text()
- for name, sym, uni in findall(xf86pattern, text):
- sym = int(sym, 16)
- uni = int(uni, 16) if uni else None
- if keysymdef.get("XF86_" + name, None):
- print("KEY DUP", "XF86_" + name)
- keysymdef["XF86_" + name] = sym
+
+ for filename, extraction_pattern, prefix in keysym_files:
+ text = Path(temp, filename).read_text()
+ for name, sym, _ in findall(extraction_pattern, text):
+ sym = int(sym, 16)
+ if keysymdef.get(f"{prefix}{name}", None):
+ print(f"KEY DUP {prefix}{name}")
+ keysymdef[f"{prefix}{name}"] = sym
with open("keysymdef.py", "w") as f:
- f.write("# flake8: noqa\nkeysymdef = \\\n")
+ f.write("# flake8: noqa\nkey_symbols = \\\n")
pprint(keysymdef, f)
diff --git a/lib/keysyms/keysymdef.py b/lib/keysyms/keysymdef.py
index 0f2ba6e7d2..1d377d607c 100644
--- a/lib/keysyms/keysymdef.py
+++ b/lib/keysyms/keysymdef.py
@@ -1,5 +1,5 @@
# flake8: noqa
-keysymdef = {
+key_symbols = {
"0": 48,
"1": 49,
"2": 50,
diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py
index 7685c37f84..da3ac43bc5 100644
--- a/lib/logitech_receiver/base.py
+++ b/lib/logitech_receiver/base.py
@@ -14,13 +14,13 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-# Base low-level functions used by the API proper.
-# Unlikely to be used directly unless you're expanding the API.
+"""Base low-level functions as API for upper layers."""
from __future__ import annotations
import dataclasses
import logging
+import platform
import struct
import threading
import typing
@@ -31,14 +31,12 @@
from typing import Any
import gi
-import hidapi
from . import base_usb
from . import common
from . import descriptors
from . import exceptions
from . import hidpp10_constants
-from . import hidpp20
from . import hidpp20_constants
from .common import LOGITECH_VENDOR_ID
from .common import BusID
@@ -47,9 +45,32 @@
gi.require_version("Gdk", "3.0")
from gi.repository import GLib # NOQA: E402
+if platform.system() == "Linux":
+ import hidapi.udev_impl as hidapi
+else:
+ import hidapi.hidapi_impl as hidapi
+
logger = logging.getLogger(__name__)
-_hidpp20 = hidpp20.Hidpp20()
+
+_SHORT_MESSAGE_SIZE = 7
+_LONG_MESSAGE_SIZE = 20
+_MEDIUM_MESSAGE_SIZE = 15
+_MAX_READ_SIZE = 32
+
+HIDPP_SHORT_MESSAGE_ID = 0x10
+HIDPP_LONG_MESSAGE_ID = 0x11
+DJ_MESSAGE_ID = 0x20
+
+
+"""Default timeout on read (in seconds)."""
+DEFAULT_TIMEOUT = 4
+# the receiver itself should reply very fast, within 500ms
+_RECEIVER_REQUEST_TIMEOUT = 0.9
+# devices may reply a lot slower, as the call has to go wireless to them and come back
+_DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT
+# when pinging, be extra patient (no longer)
+_PING_TIMEOUT = DEFAULT_TIMEOUT
@dataclasses.dataclass
@@ -89,57 +110,26 @@ def _bluetooth_device(product_id: int) -> dict[str, Any]:
KNOWN_DEVICE_IDS.append(_bluetooth_device(d.btid))
-def other_device_check(bus_id: int, vendor_id: int, product_id: int):
+def other_device_check(bus_id: int, vendor_id: int, product_id: int) -> dict[str, Any] | None:
"""Check whether product is a Logitech USB-connected or Bluetooth device based on bus, vendor, and product IDs
This allows Solaar to support receiverless HID++ 2.0 devices that it knows nothing about"""
if vendor_id != LOGITECH_VENDOR_ID:
return
- if bus_id == BusID.USB:
- if product_id >= 0xC07D and product_id <= 0xC094 or product_id >= 0xC32B and product_id <= 0xC344:
- return _usb_device(product_id, 2)
- elif bus_id == BusID.BLUETOOTH:
- if product_id >= 0xB012 and product_id <= 0xB0FF or product_id >= 0xB317 and product_id <= 0xB3FF:
- return _bluetooth_device(product_id)
+ device_info = None
+ if bus_id == BusID.USB and (0xC07D <= product_id <= 0xC094 or 0xC32B <= product_id <= 0xC344):
+ device_info = _usb_device(product_id, 2)
+ elif bus_id == BusID.BLUETOOTH and (0xB012 <= product_id <= 0xB0FF or 0xB317 <= product_id <= 0xB3FF):
+ device_info = _bluetooth_device(product_id)
+ return device_info
-def product_information(usb_id: int | str) -> dict:
- if isinstance(usb_id, str):
- usb_id = int(usb_id, 16)
+def product_information(usb_id: int) -> dict[str, Any]:
+ """Returns hardcoded information from USB receiver."""
+ return base_usb.get_receiver_info(usb_id)
- for r in base_usb.ALL:
- if usb_id == r.get("product_id"):
- return r
- return {}
-
-_SHORT_MESSAGE_SIZE = 7
-_LONG_MESSAGE_SIZE = 20
-_MEDIUM_MESSAGE_SIZE = 15
-_MAX_READ_SIZE = 32
-
-HIDPP_SHORT_MESSAGE_ID = 0x10
-HIDPP_LONG_MESSAGE_ID = 0x11
-DJ_MESSAGE_ID = 0x20
-
-# mapping from report_id to message length
-report_lengths = {
- HIDPP_SHORT_MESSAGE_ID: _SHORT_MESSAGE_SIZE,
- HIDPP_LONG_MESSAGE_ID: _LONG_MESSAGE_SIZE,
- DJ_MESSAGE_ID: _MEDIUM_MESSAGE_SIZE,
- 0x21: _MAX_READ_SIZE,
-}
-"""Default timeout on read (in seconds)."""
-DEFAULT_TIMEOUT = 4
-# the receiver itself should reply very fast, within 500ms
-_RECEIVER_REQUEST_TIMEOUT = 0.9
-# devices may reply a lot slower, as the call has to go wireless to them and come back
-_DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT
-# when pinging, be extra patient (no longer)
-_PING_TIMEOUT = DEFAULT_TIMEOUT
-
-
-def match(record, bus_id, vendor_id, product_id):
+def _match(record: dict[str, Any], bus_id: int, vendor_id: int, product_id: int):
return (
(record.get("bus_id") is None or record.get("bus_id") == bus_id)
and (record.get("vendor_id") is None or record.get("vendor_id") == vendor_id)
@@ -147,11 +137,22 @@ def match(record, bus_id, vendor_id, product_id):
)
-def filter_receivers(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):
- """Check that this product is a Logitech receiver and if so return the receiver record for further checking"""
- for record in base_usb.ALL: # known receivers
- if match(record, bus_id, vendor_id, product_id):
+def filter_receivers(
+ bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False
+) -> dict[str, Any]:
+ """Check that this product is a Logitech receiver.
+
+ Filters based on bus_id, vendor_id and product_id.
+
+ If so return the receiver record for further checking.
+ """
+ try:
+ record = base_usb.get_receiver_info(product_id)
+ if _match(record, bus_id, vendor_id, product_id):
return record
+ except ValueError:
+ pass
+
if vendor_id == LOGITECH_VENDOR_ID and 0xC500 <= product_id <= 0xC5FF: # unknown receiver
return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": False}
@@ -161,13 +162,16 @@ def receivers():
yield from hidapi.enumerate(filter_receivers)
-def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):
+def filter_products_of_interest(
+ bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False
+) -> dict[str, Any] | None:
"""Check that this product is of interest and if so return the device record for further checking"""
record = filter_receivers(bus_id, vendor_id, product_id, hidpp_short, hidpp_long)
if record: # known or unknown receiver
return record
+
for record in KNOWN_DEVICE_IDS:
- if match(record, bus_id, vendor_id, product_id):
+ if _match(record, bus_id, vendor_id, product_id):
return record
if hidpp_short or hidpp_long: # unknown devices that use HID++
return {"vendor_id": vendor_id, "product_id": product_id, "bus_id": bus_id, "isDevice": True}
@@ -177,7 +181,7 @@ def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):
def receivers_and_devices():
"""Enumerate all the receivers and devices directly attached to the machine."""
- yield from hidapi.enumerate(filter)
+ yield from hidapi.enumerate(filter_products_of_interest)
def notify_on_receivers_glib(glib: GLib, callback):
@@ -188,7 +192,7 @@ def notify_on_receivers_glib(glib: GLib, callback):
glib
GLib instance.
"""
- return hidapi.monitor_glib(glib, callback, filter)
+ return hidapi.monitor_glib(glib, callback, filter_products_of_interest)
def open_path(path):
@@ -290,11 +294,23 @@ def read(handle, timeout=DEFAULT_TIMEOUT):
return reply
-# sanity checks on message report id and size
-def check_message(data):
+def is_relevant_message(data: bytes) -> bool:
+ """Checks if given id is a HID++ or DJ message.
+
+ Applies sanity checks on message report ID and message size.
+ """
assert isinstance(data, bytes), (repr(data), type(data))
+
+ # mapping from report_id to message length
+ report_lengths = {
+ HIDPP_SHORT_MESSAGE_ID: _SHORT_MESSAGE_SIZE,
+ HIDPP_LONG_MESSAGE_ID: _LONG_MESSAGE_SIZE,
+ DJ_MESSAGE_ID: _MEDIUM_MESSAGE_SIZE,
+ 0x21: _MAX_READ_SIZE,
+ }
+
report_id = ord(data[:1])
- if report_id in report_lengths: # is this an HID++ or DJ message?
+ if report_id in report_lengths:
if report_lengths.get(report_id) == len(data):
return True
else:
@@ -320,7 +336,7 @@ def _read(handle, timeout):
close(handle)
raise exceptions.NoReceiver(reason=reason) from reason
- if data and check_message(data): # ignore messages that fail check
+ if data and is_relevant_message(data): # ignore messages that fail check
report_id = ord(data[:1])
devnumber = ord(data[1:2])
@@ -350,7 +366,7 @@ def _skip_incoming(handle, ihandle, notifications_hook):
raise exceptions.NoReceiver(reason=reason) from reason
if data:
- if check_message(data): # only process messages that pass check
+ if is_relevant_message(data): # only process messages that pass check
# report_id = ord(data[:1])
if notifications_hook:
n = make_notification(ord(data[:1]), ord(data[1:2]), data[2:])
@@ -421,13 +437,46 @@ def acquire_timeout(lock, handle, timeout):
lock.release()
-# cycle the HID++ 2.0 software ID from x2 to xF, inclusive, to separate results from each other, notifications, and driver
-sw_id = 0xF
+def _get_next_sw_id() -> int:
+ """Returns 'random' software ID to separate replies from different devices.
+
+ Cycle the HID++ 2.0 software ID from 0x2 to 0xF to separate
+ results and notifications.
+ """
+ if not hasattr(_get_next_sw_id, "software_id"):
+ _get_next_sw_id.software_id = 0xF
+
+ if _get_next_sw_id.software_id < 0xF:
+ _get_next_sw_id.software_id += 1
+ else:
+ _get_next_sw_id.software_id = 2
+ return _get_next_sw_id.software_id
+
+
+def find_paired_node(receiver_path: str, index: int, timeout: int):
+ """Find the node of a device paired with a receiver."""
+ return hidapi.find_paired_node(receiver_path, index, timeout)
+
+
+def find_paired_node_wpid(receiver_path: str, index: int):
+ """Find the node of a device paired with a receiver.
+
+ Get wpid from udev.
+ """
+ return hidapi.find_paired_node_wpid(receiver_path, index)
# a very few requests (e.g., host switching) do not expect a reply, but use no_reply=True with extreme caution
-def request(handle, devnumber, request_id, *params, no_reply=False, return_error=False, long_message=False, protocol=1.0):
- global sw_id
+def request(
+ handle,
+ devnumber,
+ request_id: int,
+ *params,
+ no_reply: bool = False,
+ return_error: bool = False,
+ long_message: bool = False,
+ protocol: float = 1.0,
+):
"""Makes a feature call to a device and waits for a matching reply.
:param handle: an open UR handle.
:param devnumber: attached device number.
@@ -438,12 +487,10 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
with acquire_timeout(handle_lock(handle), handle, 10.0):
assert isinstance(request_id, int)
if (devnumber != 0xFF or protocol >= 2.0) and request_id < 0x8000:
- # For HID++ 2.0 feature requests, randomize the SoftwareId to make it
- # easier to recognize the reply for this request. also, always set the
- # most significant bit (8) in SoftwareId, to make notifications easier
- # to distinguish from request replies.
+ # Always set the most significant bit (8) in SoftwareId,
+ # to make notifications easier to distinguish from request replies.
# This only applies to peripheral requests, ofc.
- sw_id = sw_id + 1 if sw_id < 0xF else 2
+ sw_id = _get_next_sw_id()
request_id = (request_id & 0xFFF0) | sw_id # was 0x08 | getrandbits(3)
timeout = _RECEIVER_REQUEST_TIMEOUT if devnumber == 0xFF else _DEVICE_REQUEST_TIMEOUT
@@ -544,11 +591,10 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
# raise DeviceUnreachable(number=devnumber, request=request_id)
-def ping(handle, devnumber, long_message=False):
+def ping(handle, devnumber, long_message: bool = False):
"""Check if a device is connected to the receiver.
:returns: The HID protocol supported by the device, as a floating point number, if the device is active.
"""
- global sw_id
if logger.isEnabledFor(logging.DEBUG):
logger.debug("(%s) pinging device %d", handle, devnumber)
with acquire_timeout(handle_lock(handle), handle, 10.0):
@@ -560,8 +606,7 @@ def ping(handle, devnumber, long_message=False):
return
# randomize the mark byte to be able to identify the ping reply
- # cycle the sw_id byte from 2 to 15 (see above)
- sw_id = sw_id + 1 if sw_id < 0xF else 2
+ sw_id = _get_next_sw_id()
request_id = 0x0010 | sw_id # was 0x0018 | getrandbits(3)
request_data = struct.pack("!HBBB", request_id, 0, 0, getrandbits(8))
write(int(handle), devnumber, request_data, long_message)
diff --git a/lib/logitech_receiver/base_usb.py b/lib/logitech_receiver/base_usb.py
index c5f0246e5a..41e572b8a2 100644
--- a/lib/logitech_receiver/base_usb.py
+++ b/lib/logitech_receiver/base_usb.py
@@ -14,19 +14,20 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-## According to Logitech, they use the following product IDs (as of September 2020)
-## USB product IDs for receivers: 0xC526 - 0xC5xx
-## Wireless PIDs for hidpp10 devices: 0x2006 - 0x2019
-## Wireless PIDs for hidpp20 devices: 0x4002 - 0x4097, 0x4101 - 0x4102
-## USB product IDs for hidpp20 devices: 0xC07D - 0xC094, 0xC32B - 0xC344
-## Bluetooth product IDs (for hidpp20 devices): 0xB012 - 0xB0xx, 0xB32A - 0xB3xx
+"""Collection of known Logitech product IDs.
-# USB ids of Logitech wireless receivers.
-# Only receivers supporting the HID++ protocol can go in here.
+According to Logitech, they use the following product IDs (as of September 2020)
+USB product IDs for receivers: 0xC526 - 0xC5xx
+Wireless PIDs for hidpp10 devices: 0x2006 - 0x2019
+Wireless PIDs for hidpp20 devices: 0x4002 - 0x4097, 0x4101 - 0x4102
+USB product IDs for hidpp20 devices: 0xC07D - 0xC094, 0xC32B - 0xC344
+Bluetooth product IDs (for hidpp20 devices): 0xB012 - 0xB0xx, 0xB32A - 0xB3xx
-from solaar.i18n import _
+USB ids of Logitech wireless receivers.
+Only receivers supporting the HID++ protocol can go in here.
+"""
-from logitech_receiver.common import LOGITECH_VENDOR_ID
+from solaar.i18n import _
# max_devices is only used for receivers that do not support reading from Registers.RECEIVER_INFO offset 0x03, default
# to 1.
@@ -36,8 +37,10 @@
## should this last be changed so that may_unpair is used for all receivers? writing to Registers.RECEIVER_PAIRING
## doesn't seem right
+LOGITECH_VENDOR_ID = 0x046D
-def _bolt_receiver(product_id):
+
+def _bolt_receiver(product_id: int) -> dict:
return {
"vendor_id": LOGITECH_VENDOR_ID,
"product_id": product_id,
@@ -49,7 +52,7 @@ def _bolt_receiver(product_id):
}
-def _unifying_receiver(product_id):
+def _unifying_receiver(product_id: int) -> dict:
return {
"vendor_id": LOGITECH_VENDOR_ID,
"product_id": product_id,
@@ -60,7 +63,7 @@ def _unifying_receiver(product_id):
}
-def _nano_receiver(product_id):
+def _nano_receiver(product_id: int) -> dict:
return {
"vendor_id": LOGITECH_VENDOR_ID,
"product_id": product_id,
@@ -72,7 +75,7 @@ def _nano_receiver(product_id):
}
-def _nano_receiver_no_unpair(product_id):
+def _nano_receiver_no_unpair(product_id: int) -> dict:
return {
"vendor_id": LOGITECH_VENDOR_ID,
"product_id": product_id,
@@ -85,7 +88,7 @@ def _nano_receiver_no_unpair(product_id):
}
-def _nano_receiver_max2(product_id):
+def _nano_receiver_max2(product_id: int) -> dict:
return {
"vendor_id": LOGITECH_VENDOR_ID,
"product_id": product_id,
@@ -98,20 +101,7 @@ def _nano_receiver_max2(product_id):
}
-def _nano_receiver_maxn(product_id, max):
- return {
- "vendor_id": LOGITECH_VENDOR_ID,
- "product_id": product_id,
- "usb_interface": 1,
- "name": _("Nano Receiver"),
- "receiver_kind": "nano",
- "max_devices": max,
- "may_unpair": False,
- "re_pairs": True,
- }
-
-
-def _lenovo_receiver(product_id):
+def _lenovo_receiver(product_id: int) -> dict:
return {
"vendor_id": 6127,
"product_id": product_id,
@@ -122,7 +112,7 @@ def _lenovo_receiver(product_id):
}
-def _lightspeed_receiver(product_id):
+def _lightspeed_receiver(product_id: int) -> dict:
return {
"vendor_id": LOGITECH_VENDOR_ID,
"product_id": product_id,
@@ -133,7 +123,7 @@ def _lightspeed_receiver(product_id):
}
-def _ex100_receiver(product_id):
+def _ex100_receiver(product_id: int) -> dict:
return {
"vendor_id": LOGITECH_VENDOR_ID,
"product_id": product_id,
@@ -147,7 +137,7 @@ def _ex100_receiver(product_id):
# Receivers added here should also be listed in
-# share/solaar/io.github.pwr_solaar.solaar.metainfo.xml
+# share/solaar/io.github.pwr_solaar.solaar.meta-info.xml
# Look in https://github.com/torvalds/linux/blob/master/drivers/hid/hid-ids.h
# Bolt receivers (marked with the yellow lightning bolt logo)
@@ -170,7 +160,6 @@ def _ex100_receiver(product_id):
NANO_RECEIVER_C534 = _nano_receiver_max2(0xC534)
NANO_RECEIVER_C535 = _nano_receiver(0xC535) # branded as Dell
NANO_RECEIVER_C537 = _nano_receiver(0xC537)
-# NANO_RECEIVER_C542 = _nano_receiver(0xc542) # does not use HID++
NANO_RECEIVER_6042 = _lenovo_receiver(0x6042)
# Lightspeed receivers (usually sold with gaming devices)
@@ -183,11 +172,9 @@ def _ex100_receiver(product_id):
LIGHTSPEED_RECEIVER_C547 = _lightspeed_receiver(0xC547)
# EX100 old style receiver pre-unifying protocol
-# EX100_27MHZ_RECEIVER_C50C = _ex100_receiver(0xc50C) # in hid/hid-ids.h
EX100_27MHZ_RECEIVER_C517 = _ex100_receiver(0xC517)
-# EX100_27MHZ_RECEIVER_C51B = _ex100_receiver(0xc51B) # in hid/hid-ids.h
-ALL = (
+KNOWN_RECEIVERS = (
BOLT_RECEIVER_C548,
UNIFYING_RECEIVER_C52B,
UNIFYING_RECEIVER_C532,
@@ -203,7 +190,6 @@ def _ex100_receiver(product_id):
NANO_RECEIVER_C534,
NANO_RECEIVER_C535,
NANO_RECEIVER_C537,
- # NANO_RECEIVER_C542, # does not use HID++
NANO_RECEIVER_6042,
LIGHTSPEED_RECEIVER_C539,
LIGHTSPEED_RECEIVER_C53A,
@@ -214,3 +200,23 @@ def _ex100_receiver(product_id):
LIGHTSPEED_RECEIVER_C547,
EX100_27MHZ_RECEIVER_C517,
)
+
+
+def get_receiver_info(product_id: int) -> dict:
+ """Returns hardcoded information about Logitech receiver.
+
+ Parameters
+ ----------
+ product_id
+ Product ID of receiver e.g. 0xC548 for a Logitech Bolt receiver.
+
+ Returns
+ -------
+ dict
+ Product info with mandatory vendor_id, product_id,
+ usb_interface, name, receiver_kind
+ """
+ for receiver in KNOWN_RECEIVERS:
+ if product_id == receiver.get("product_id"):
+ return receiver
+ raise ValueError(f"Unknown product ID '0x{product_id:02X}")
diff --git a/lib/logitech_receiver/notify.py b/lib/logitech_receiver/desktop_notifications.py
similarity index 73%
rename from lib/logitech_receiver/notify.py
rename to lib/logitech_receiver/desktop_notifications.py
index cd6e38b19f..a9cf767625 100644
--- a/lib/logitech_receiver/notify.py
+++ b/lib/logitech_receiver/desktop_notifications.py
@@ -14,29 +14,46 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""Implements the desktop notification service."""
+
+import importlib
import logging
logger = logging.getLogger(__name__)
-try:
- import gi
- gi.require_version("Notify", "0.7")
- gi.require_version("Gtk", "3.0")
- from gi.repository import GLib # this import is allowed to fail making the entire feature unavailable
- from gi.repository import Gtk # this import is allowed to fail making the entire feature unavailable
- from gi.repository import Notify # this import is allowed to fail making the entire feature unavailable
+def notifications_available():
+ """Checks if notification service is available."""
+ notifications_supported = False
+ try:
+ import gi
+
+ gi.require_version("Notify", "0.7")
+ gi.require_version("Gtk", "3.0")
+
+ importlib.util.find_spec("gi.repository.GLib")
+ importlib.util.find_spec("gi.repository.Gtk")
+ importlib.util.find_spec("gi.repository.Notify")
+
+ notifications_supported = True
+ except ValueError as e:
+ logger.warning(f"Notification service is not available: {e}")
+ return notifications_supported
+
- available = True
-except (ValueError, ImportError):
- available = False
+available = notifications_available()
if available:
+ from gi.repository import GLib
+ from gi.repository import Gtk
+ from gi.repository import Notify
+
# cache references to shown notifications here to allow reuse
_notifications = {}
+ _ICON_LISTS = {}
def init():
- """Init the notifications system."""
+ """Initialize desktop notifications."""
global available
if available:
if not Notify.is_initted():
@@ -50,13 +67,14 @@ def init():
return available and Notify.is_initted()
def uninit():
+ """Stop desktop notifications."""
if available and Notify.is_initted():
if logger.isEnabledFor(logging.INFO):
logger.info("stopping desktop notifications")
_notifications.clear()
Notify.uninit()
- def show(dev, message, icon=None):
+ def show(dev, message: str, icon=None):
"""Show a notification with title and text."""
if available and (Notify.is_initted() or init()):
summary = dev.name
@@ -68,13 +86,9 @@ def show(dev, message, icon=None):
n.set_urgency(Notify.Urgency.NORMAL)
n.set_hint("desktop-entry", GLib.Variant("s", "solaar")) # replace with better name late
try:
- # if logger.isEnabledFor(logging.DEBUG):
- # logger.debug("showing %s", n)
n.show()
except Exception:
- logger.exception("showing %s", n)
-
- _ICON_LISTS = {}
+ logger.exception(f"showing {n}")
def device_icon_list(name="_", kind=None):
icon_list = _ICON_LISTS.get(name)
@@ -82,16 +96,17 @@ def device_icon_list(name="_", kind=None):
# names of possible icons, in reverse order of likelihood
# the theme will hopefully pick up the most appropriate
icon_list = ["preferences-desktop-peripherals"]
+ kind = str(kind)
if kind:
- if str(kind) == "numpad":
+ if kind == "numpad":
icon_list += ("input-keyboard", "input-dialpad")
- elif str(kind) == "touchpad":
+ elif kind == "touchpad":
icon_list += ("input-mouse", "input-tablet")
- elif str(kind) == "trackball":
+ elif kind == "trackball":
icon_list += ("input-mouse",)
- elif str(kind) == "headset":
+ elif kind == "headset":
icon_list += ("audio-headphones", "audio-headset")
- icon_list += ("input-" + str(kind),)
+ icon_list += (f"input-{kind}",)
_ICON_LISTS[name] = icon_list
return icon_list
diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py
index a627e6fc5b..0e8f1396b6 100644
--- a/lib/logitech_receiver/device.py
+++ b/lib/logitech_receiver/device.py
@@ -19,16 +19,13 @@
import threading
import time
+from typing import Any
from typing import Callable
from typing import Optional
from typing import Protocol
-from typing import cast
-
-import hidapi
from solaar import configuration
-from . import base
from . import descriptors
from . import exceptions
from . import hidpp10
@@ -47,7 +44,10 @@
class LowLevelInterface(Protocol):
- def open_path(self, path):
+ def open_path(self, path) -> Any:
+ ...
+
+ def find_paired_node(self, receiver_path: str, index: int, timeout: int):
...
def ping(self, handle, number, long_message: bool):
@@ -60,35 +60,30 @@ def close(self, handle, *args, **kwargs) -> bool:
...
-low_level_interface = cast(LowLevelInterface, base)
-
-
-class DeviceFactory:
- @staticmethod
- def create_device(low_level: LowLevelInterface, device_info, setting_callback=None):
- """Opens a Logitech Device found attached to the machine, by Linux device path.
- :returns: An open file handle for the found receiver, or None.
- """
- try:
- handle = low_level.open_path(device_info.path)
- if handle:
- # a direct connected device might not be online (as reported by user)
- return Device(
- low_level,
- None,
- None,
- None,
- handle=handle,
- device_info=device_info,
- setting_callback=setting_callback,
- )
- except OSError as e:
- logger.exception("open %s", device_info)
- if e.errno == errno.EACCES:
- raise
- except Exception:
- logger.exception("open %s", device_info)
+def create_device(low_level: LowLevelInterface, device_info, setting_callback=None):
+ """Opens a Logitech Device found attached to the machine, by Linux device path.
+ :returns: An open file handle for the found receiver, or None.
+ """
+ try:
+ handle = low_level.open_path(device_info.path)
+ if handle:
+ # a direct connected device might not be online (as reported by user)
+ return Device(
+ low_level,
+ None,
+ None,
+ None,
+ handle=handle,
+ device_info=device_info,
+ setting_callback=setting_callback,
+ )
+ except OSError as e:
+ logger.exception("open %s", device_info)
+ if e.errno == errno.EACCES:
raise
+ except Exception:
+ logger.exception("open %s", device_info)
+ raise
class Device:
@@ -154,7 +149,7 @@ def __init__(
self.cleanups = [] # functions to run on the device when it is closed
if not self.path:
- self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None
+ self.path = self.low_level.find_paired_node(receiver.path, number, 1) if receiver else None
if not self.handle:
try:
self.handle = self.low_level.open_path(self.path) if self.path else None
diff --git a/lib/logitech_receiver/diversion.py b/lib/logitech_receiver/diversion.py
index 78acbb6a72..2adebdc611 100644
--- a/lib/logitech_receiver/diversion.py
+++ b/lib/logitech_receiver/diversion.py
@@ -81,7 +81,7 @@
# Xtest extension to X11 - provides input simulation, partly works under Wayland
# Wayland - provides input simulation
-XK_KEYS: Dict[str, int] = keysymdef.keysymdef
+XK_KEYS: Dict[str, int] = keysymdef.key_symbols
# Event codes - can't use Xlib.X codes because Xlib might not be available
_KEY_RELEASE = 0
diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py
index b63085d89a..d41848c784 100644
--- a/lib/logitech_receiver/receiver.py
+++ b/lib/logitech_receiver/receiver.py
@@ -23,14 +23,10 @@
from typing import Callable
from typing import Optional
from typing import Protocol
-from typing import cast
-
-import hidapi
from solaar.i18n import _
from solaar.i18n import ngettext
-from . import base
from . import exceptions
from . import hidpp10
from . import hidpp10_constants
@@ -49,6 +45,9 @@ class LowLevelInterface(Protocol):
def open_path(self, path):
...
+ def find_paired_node_wpid(self, receiver_path: str, index: int):
+ ...
+
def ping(self, handle, number, long_message=False):
...
@@ -59,9 +58,6 @@ def close(self, handle):
...
-low_level_interface = cast(LowLevelInterface, base)
-
-
@dataclass
class Pairing:
"""Information about the current or most recent pairing"""
@@ -88,8 +84,18 @@ class Receiver:
number = 0xFF
kind = None
- def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
+ def __init__(
+ self,
+ low_level: LowLevelInterface,
+ receiver_kind,
+ product_info,
+ handle,
+ path,
+ product_id,
+ setting_callback=None,
+ ):
assert handle
+ self.low_level = low_level
self.isDevice = False # some devices act as receiver so we need a property to distinguish them
self.handle = handle
self.path = path
@@ -128,7 +134,7 @@ def close(self):
if d:
d.close()
self._devices.clear()
- return handle and base.close(handle)
+ return handle and self.low_level.close(handle)
def __del__(self):
self.close()
@@ -248,7 +254,7 @@ def register_new_device(self, number, notification=None):
logger.warning("mismatch on device kind %s %s", info["kind"], nkind)
else:
online = True
- dev = Device(low_level_interface, self, number, online, pairing_info=info, setting_callback=self.setting_callback)
+ dev = Device(self.low_level, self, number, online, pairing_info=info, setting_callback=self.setting_callback)
if logger.isEnabledFor(logging.INFO):
logger.info("%s: found new device %d (%s)", self, number, dev.wpid)
self._devices[number] = dev
@@ -273,7 +279,7 @@ def count(self):
def request(self, request_id, *params):
if bool(self):
- return base.request(self.handle, 0xFF, request_id, *params)
+ return self.low_level.request(self.handle, 0xFF, request_id, *params)
def reset_pairing(self):
self.pairing = Pairing()
@@ -437,18 +443,17 @@ def _unpair_device_per_receiver(self, key):
class UnifyingReceiver(Receiver):
- def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
- super().__init__(receiver_kind, product_info, handle, path, product_id, setting_callback)
+ pass
class NanoReceiver(Receiver):
- def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
- super().__init__(receiver_kind, product_info, handle, path, product_id, setting_callback)
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
class LightSpeedReceiver(Receiver):
- def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None):
- super().__init__(receiver_kind, product_info, handle, path, product_id, setting_callback)
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
class Ex100Receiver(Receiver):
@@ -471,7 +476,8 @@ def notification_information(self, number, notification):
return online, encrypted, wpid, kind
def device_pairing_information(self, number: int) -> dict:
- wpid = hidapi.find_paired_node_wpid(self.path, number) # extract WPID from udev path
+ # extract WPID from udev path
+ wpid = self.low_level.find_paired_node_wpid(self.path, number)
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
@@ -505,24 +511,33 @@ def _get_kind_from_index(receiver, index):
}
-class ReceiverFactory:
- @staticmethod
- def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]:
- """Opens a Logitech Receiver found attached to the machine, by Linux device path."""
-
- try:
- handle = base.open_path(device_info.path)
- if handle:
- product_info = base.product_information(device_info.product_id)
- if not product_info:
- logger.warning("Unknown receiver type: %s", device_info.product_id)
- product_info = {}
- kind = product_info.get("receiver_kind", "unknown")
- rclass = receiver_class_mapping.get(kind, Receiver)
- return rclass(kind, product_info, handle, device_info.path, device_info.product_id, setting_callback)
- except OSError as e:
- logger.exception("open %s", device_info)
- if e.errno == errno.EACCES:
- raise
- except Exception:
- logger.exception("open %s", device_info)
+def create_receiver(low_level: LowLevelInterface, device_info, setting_callback=None) -> Optional[Receiver]:
+ """Opens a Logitech Receiver found attached to the machine, by Linux device path."""
+
+ try:
+ handle = low_level.open_path(device_info.path)
+ if handle:
+ usb_id = device_info.product_id
+ if isinstance(usb_id, str):
+ usb_id = int(usb_id, 16)
+ try:
+ product_info = low_level.product_information(usb_id)
+ except ValueError:
+ product_info = {}
+ kind = product_info.get("receiver_kind", "unknown")
+ rclass = receiver_class_mapping.get(kind, Receiver)
+ return rclass(
+ low_level,
+ kind,
+ product_info,
+ handle,
+ device_info.path,
+ device_info.product_id,
+ setting_callback,
+ )
+ except OSError as e:
+ logger.exception("open %s", device_info)
+ if e.errno == errno.EACCES:
+ raise
+ except Exception:
+ logger.exception("open %s", device_info)
diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py
index f259384c06..b753ac7325 100644
--- a/lib/logitech_receiver/settings_templates.py
+++ b/lib/logitech_receiver/settings_templates.py
@@ -20,17 +20,18 @@
import traceback
from time import time
+from typing import Callable
from solaar.i18n import _
from . import base
from . import common
from . import descriptors
+from . import desktop_notifications
from . import diversion
from . import hidpp10_constants
from . import hidpp20
from . import hidpp20_constants
-from . import notify
from . import settings
from . import special_keys
from .hidpp10_constants import Registers
@@ -728,6 +729,15 @@ def build(cls, setting_class, device):
class DpiSlidingXY(settings.RawXYProcessing):
+ def __init__(
+ self,
+ *args,
+ show_notification: Callable[[str, str], bool],
+ **kwargs,
+ ):
+ super().__init__(*args, **kwargs)
+ self._show_notification = show_notification
+
def activate_action(self):
self.dpiSetting = next(filter(lambda s: s.name == "dpi" or s.name == "dpi_extended", self.device.settings), None)
self.dpiChoices = list(self.dpiSetting.choices)
@@ -745,12 +755,11 @@ def setNewDpi(self, newDpiIdx):
self.device.setting_callback(self.device, type(self.dpiSetting), [newDpi])
def displayNewDpi(self, newDpiIdx):
- if notify.available:
- selected_dpi = self.dpiChoices[newDpiIdx]
- min_dpi = self.dpiChoices[0]
- max_dpi = self.dpiChoices[-1]
- reason = f"DPI {selected_dpi} [min {min_dpi}, max {max_dpi}]"
- notify.show(self.device, reason)
+ selected_dpi = self.dpiChoices[newDpiIdx]
+ min_dpi = self.dpiChoices[0]
+ max_dpi = self.dpiChoices[-1]
+ reason = f"DPI {selected_dpi} [min {min_dpi}, max {max_dpi}]"
+ self._show_notification(self.device, reason)
def press_action(self, key): # start tracking
self.starting = True
@@ -912,7 +921,9 @@ def build(cls, setting_class, device):
if _F.ADJUSTABLE_DPI in device.features:
choices[k.key] = setting_class.choices_universe
if sliding is None:
- sliding = DpiSlidingXY(device, name="DpiSlding")
+ sliding = DpiSlidingXY(
+ device, name="DpiSliding", show_notification=desktop_notifications.show
+ )
else:
choices[k.key] = setting_class.choices_divert
if not choices:
diff --git a/lib/solaar/cli/__init__.py b/lib/solaar/cli/__init__.py
index 927b871ccf..65d6505d03 100644
--- a/lib/solaar/cli/__init__.py
+++ b/lib/solaar/cli/__init__.py
@@ -106,7 +106,7 @@ def _receivers(dev_path=None):
if dev_path is not None and dev_path != dev_info.path:
continue
try:
- r = receiver.ReceiverFactory.create_receiver(dev_info)
+ r = receiver.create_receiver(base, dev_info)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, r)
if r:
@@ -122,9 +122,9 @@ def _receivers_and_devices(dev_path=None):
continue
try:
if dev_info.isDevice:
- d = device.DeviceFactory.create_device(base, dev_info)
+ d = device.create_device(base, dev_info)
else:
- d = receiver.ReceiverFactory.create_receiver(dev_info)
+ d = receiver.create_receiver(base, dev_info)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[%s] => %s", dev_info.path, d)
diff --git a/lib/solaar/dbus.py b/lib/solaar/dbus.py
index d470184a1f..ff4e58a695 100644
--- a/lib/solaar/dbus.py
+++ b/lib/solaar/dbus.py
@@ -14,9 +14,12 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+from __future__ import annotations
import logging
+from typing import Callable
+
logger = logging.getLogger(__name__)
try:
@@ -49,14 +52,22 @@ def _suspend_or_resume(suspend):
_LOGIND_INTERFACE = "org.freedesktop.login1.Manager"
-def watch_suspend_resume(on_resume_callback=None, on_suspend_callback=None):
+def watch_suspend_resume(
+ on_resume_callback: Callable[[], None] | None = None,
+ on_suspend_callback: Callable[[], None] | None = None,
+):
"""Register callback for suspend/resume events.
They are called only if the system DBus is running, and the Login daemon is available."""
global _resume_callback, _suspend_callback
_suspend_callback = on_suspend_callback
_resume_callback = on_resume_callback
if bus is not None and on_resume_callback is not None or on_suspend_callback is not None:
- bus.add_signal_receiver(_suspend_or_resume, "PrepareForSleep", dbus_interface=_LOGIND_INTERFACE, path=_LOGIND_PATH)
+ bus.add_signal_receiver(
+ _suspend_or_resume,
+ "PrepareForSleep",
+ dbus_interface=_LOGIND_INTERFACE,
+ path=_LOGIND_PATH,
+ )
if logger.isEnabledFor(logging.INFO):
logger.info("connected to system dbus, watching for suspend/resume events")
diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py
index ef76d8a834..efac229ec1 100644
--- a/lib/solaar/listener.py
+++ b/lib/solaar/listener.py
@@ -255,9 +255,9 @@ def _start(device_info):
assert _status_callback and _setting_callback
isDevice = device_info.isDevice
if not isDevice:
- receiver_ = logitech_receiver.receiver.ReceiverFactory.create_receiver(device_info, _setting_callback)
+ receiver_ = logitech_receiver.receiver.create_receiver(base, device_info, _setting_callback)
else:
- receiver_ = logitech_receiver.device.DeviceFactory.create_device(base, device_info, _setting_callback)
+ receiver_ = logitech_receiver.device.create_device(base, device_info, _setting_callback)
if receiver_:
configuration.attach_to(receiver_)
if receiver_.bluetooth and receiver_.hid_serial:
diff --git a/lib/solaar/ui/__init__.py b/lib/solaar/ui/__init__.py
index b73e3c05fe..1510db0023 100644
--- a/lib/solaar/ui/__init__.py
+++ b/lib/solaar/ui/__init__.py
@@ -17,6 +17,8 @@
import logging
+from typing import Callable
+
import gi
import yaml
@@ -28,8 +30,8 @@
from solaar.ui.window import find_device
from . import common
+from . import desktop_notifications
from . import diversion_rules
-from . import notify
from . import tray
from . import window
@@ -43,11 +45,14 @@
assert Gtk.get_major_version() > 2, "Solaar requires Gtk 3 python bindings"
+APP_ID = "io.github.pwr_solaar.solaar"
+
+
def _startup(app, startup_hook, use_tray, show_window):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("startup registered=%s, remote=%s", app.get_is_registered(), app.get_is_remote())
common.start_async()
- notify.init()
+ desktop_notifications.init()
if use_tray:
tray.init(lambda _ignore: window.destroy())
window.init(show_window, use_tray)
@@ -85,15 +90,24 @@ def _shutdown(app, shutdown_hook):
shutdown_hook()
common.stop_async()
tray.destroy()
- notify.uninit()
+ desktop_notifications.uninit()
-def run_loop(startup_hook, shutdown_hook, use_tray, show_window):
+def run_loop(
+ startup_hook: Callable[[], None],
+ shutdown_hook: Callable[[], None],
+ use_tray: bool,
+ show_window: bool,
+):
assert use_tray or show_window, "need either tray or visible window"
- APP_ID = "io.github.pwr_solaar.solaar"
+
application = Gtk.Application.new(APP_ID, Gio.ApplicationFlags.HANDLES_COMMAND_LINE)
- application.connect("startup", lambda app, startup_hook: _startup(app, startup_hook, use_tray, show_window), startup_hook)
+ application.connect(
+ "startup",
+ lambda app, startup_hook: _startup(app, startup_hook, use_tray, show_window),
+ startup_hook,
+ )
application.connect("command-line", _command_line)
application.connect("activate", _activate)
application.connect("shutdown", _shutdown, shutdown_hook)
@@ -120,7 +134,7 @@ def _status_changed(device, alert, reason, refresh=False):
diversion_rules.update_devices()
if alert & (Alert.NOTIFICATION | Alert.ATTENTION):
- notify.show(device, reason)
+ desktop_notifications.show(device, reason)
def status_changed(device, alert=Alert.NONE, reason=None, refresh=False):
diff --git a/lib/solaar/ui/about.py b/lib/solaar/ui/about.py
deleted file mode 100644
index 52494f4b4a..0000000000
--- a/lib/solaar/ui/about.py
+++ /dev/null
@@ -1,102 +0,0 @@
-## Copyright (C) 2012-2013 Daniel Pavel
-## Revisions Copyright (C) Contributors to the Solaar project.
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2 of the License, or
-## (at your option) any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License along
-## with this program; if not, write to the Free Software Foundation, Inc.,
-## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-import logging
-
-from gi.repository import Gtk
-
-from solaar import NAME
-from solaar import __version__
-from solaar.i18n import _
-
-_dialog = None
-
-
-def _create():
- about = Gtk.AboutDialog()
-
- about.set_program_name(NAME)
- about.set_version(__version__)
- about.set_comments(_("Manages Logitech receivers,\nkeyboards, mice, and tablets."))
- about.set_icon_name(NAME.lower())
- about.set_logo_icon_name(NAME.lower())
-
- about.set_copyright("© 2012-2024 Daniel Pavel and contributors to the Solaar project")
- about.set_license_type(Gtk.License.GPL_2_0)
-
- about.set_authors(("Daniel Pavel http://github.com/pwr",))
- try:
- about.add_credit_section(_("Additional Programming"), ("Filipe Laíns", "Peter F. Patel-Schneider"))
- about.add_credit_section(_("GUI design"), ("Julien Gascard", "Daniel Pavel"))
- about.add_credit_section(
- _("Testing"),
- (
- "Douglas Wagner",
- "Julien Gascard",
- "Peter Wu http://www.lekensteyn.nl/logitech-unifying.html",
- ),
- )
- about.add_credit_section(
- _("Logitech documentation"),
- (
- "Julien Danjou http://julien.danjou.info/blog/2012/logitech-unifying-upower",
- "Nestor Lopez Casado http://drive.google.com/folderview?id=0BxbRzx7vEV7eWmgwazJ3NUFfQ28",
- ),
- )
- except TypeError:
- # gtk3 < ~3.6.4 has incorrect gi bindings
- logging.exception("failed to fully create the about dialog")
- except Exception:
- # the Gtk3 version may be too old, and the function does not exist
- logging.exception("failed to fully create the about dialog")
-
- about.set_translator_credits(
- "\n".join(
- (
- "gogo (croatian)",
- "Papoteur, David Geiger, Damien Lallement (français)",
- "Michele Olivo (italiano)",
- "Adrian Piotrowicz (polski)",
- "Drovetto, JrBenito (Portuguese-BR)",
- "Daniel Pavel (română)",
- "Daniel Zippert, Emelie Snecker (svensk)",
- "Dimitriy Ryazantcev (Russian)",
- "El Jinete Sin Cabeza (Español)",
- "Ferdina Kusumah (Indonesia)",
- )
- )
- )
-
- about.set_website("https://pwr-solaar.github.io/Solaar")
- about.set_website_label(NAME)
-
- about.connect("response", lambda x, y: x.hide())
-
- def _hide(dialog, event):
- dialog.hide()
- return True
-
- about.connect("delete-event", _hide)
-
- return about
-
-
-def show_window(trigger=None):
- global _dialog
- if _dialog is None:
- _dialog = _create()
- _dialog.present()
diff --git a/lib/solaar/ui/about/__init__.py b/lib/solaar/ui/about/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/lib/solaar/ui/about/about.py b/lib/solaar/ui/about/about.py
new file mode 100644
index 0000000000..7975bfef4c
--- /dev/null
+++ b/lib/solaar/ui/about/about.py
@@ -0,0 +1,36 @@
+## Copyright (C) Solaar Contributors
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License along
+## with this program; if not, write to the Free Software Foundation, Inc.,
+## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from solaar.ui.about.model import AboutModel
+from solaar.ui.about.presenter import Presenter
+from solaar.ui.about.view import AboutView
+
+
+def show(_=None, model=None, view=None):
+ """Opens the About dialog."""
+ if model is None:
+ model = AboutModel()
+ if view is None:
+ view = AboutView()
+ presenter = Presenter(model, view)
+ presenter.run()
+
+
+if __name__ == "__main__":
+ from gi.repository import Gtk
+
+ show(None)
+ Gtk.main()
diff --git a/lib/solaar/ui/about/model.py b/lib/solaar/ui/about/model.py
new file mode 100644
index 0000000000..8b489c6c61
--- /dev/null
+++ b/lib/solaar/ui/about/model.py
@@ -0,0 +1,82 @@
+## Copyright (C) Solaar Contributors
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License along
+## with this program; if not, write to the Free Software Foundation, Inc.,
+## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from __future__ import annotations
+
+from datetime import datetime
+from typing import List
+from typing import Tuple
+
+from solaar import __version__
+from solaar.i18n import _
+
+
+def _get_current_year() -> int:
+ return datetime.now().year
+
+
+class AboutModel:
+ def get_version(self) -> str:
+ return __version__
+
+ def get_description(self) -> str:
+ return _("Manages Logitech receivers,\nkeyboards, mice, and tablets.")
+
+ def get_copyright(self) -> str:
+ return f"© 2012-{_get_current_year()} Daniel Pavel and contributors to the Solaar project"
+
+ def get_authors(self) -> List[str]:
+ return [
+ "Daniel Pavel http://github.com/pwr",
+ ]
+
+ def get_translators(self) -> List[str]:
+ return [
+ "gogo (croatian)",
+ "Papoteur, David Geiger, Damien Lallement (français)",
+ "Michele Olivo (italiano)",
+ "Adrian Piotrowicz (polski)",
+ "Drovetto, JrBenito (Portuguese-BR)",
+ "Daniel Pavel (română)",
+ "Daniel Zippert, Emelie Snecker (svensk)",
+ "Dimitriy Ryazantcev (Russian)",
+ "El Jinete Sin Cabeza (Español)",
+ "Ferdina Kusumah (Indonesia)",
+ ]
+
+ def get_credit_sections(self) -> List[Tuple[str, List[str]]]:
+ return [
+ (_("Additional Programming"), ["Filipe Laíns", "Peter F. Patel-Schneider"]),
+ (_("GUI design"), ["Julien Gascard", "Daniel Pavel"]),
+ (
+ _("Testing"),
+ [
+ "Douglas Wagner",
+ "Julien Gascard",
+ "Peter Wu http://www.lekensteyn.nl/logitech-unifying.html",
+ ],
+ ),
+ (
+ _("Logitech documentation"),
+ [
+ "Julien Danjou http://julien.danjou.info/blog/2012/logitech-unifying-upower",
+ "Nestor Lopez Casado http://drive.google.com/folderview?id=0BxbRzx7vEV7eWmgwazJ3NUFfQ28",
+ ],
+ ),
+ ]
+
+ def get_website(self):
+ return "https://pwr-solaar.github.io/Solaar"
diff --git a/lib/solaar/ui/about/presenter.py b/lib/solaar/ui/about/presenter.py
new file mode 100644
index 0000000000..b63791de2a
--- /dev/null
+++ b/lib/solaar/ui/about/presenter.py
@@ -0,0 +1,95 @@
+## Copyright (C) Solaar Contributors
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License along
+## with this program; if not, write to the Free Software Foundation, Inc.,
+## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from __future__ import annotations
+
+from typing_extensions import Protocol
+
+from solaar.ui.about.model import AboutModel
+
+
+class AboutViewProtocol(Protocol):
+ def init_ui(self) -> None:
+ ...
+
+ def update_version_info(self, version: str) -> None:
+ ...
+
+ def update_description(self, comments: str) -> None:
+ ...
+
+ def update_copyright(self, copyright):
+ ...
+
+ def update_authors(self, authors: list[str]) -> None:
+ ...
+
+ def update_translators(self, translators: list[str]) -> None:
+ ...
+
+ def update_website(self, website):
+ ...
+
+ def update_credits(self, credit_sections: list[tuple[str, list[str]]]) -> None:
+ ...
+
+ def show(self) -> None:
+ ...
+
+
+class Presenter:
+ def __init__(self, model: AboutModel, view: AboutViewProtocol) -> None:
+ self.model = model
+ self.view = view
+
+ def update_version_info(self) -> None:
+ version = self.model.get_version()
+ self.view.update_version_info(version)
+
+ def update_credits(self) -> None:
+ credit_sections = self.model.get_credit_sections()
+ self.view.update_credits(credit_sections)
+
+ def update_description(self) -> None:
+ comments = self.model.get_description()
+ self.view.update_description(comments)
+
+ def update_copyright(self) -> None:
+ copyright = self.model.get_copyright()
+ self.view.update_copyright(copyright)
+
+ def update_authors(self) -> None:
+ authors = self.model.get_authors()
+ self.view.update_authors(authors)
+
+ def update_translators(self) -> None:
+ translators = self.model.get_translators()
+ self.view.update_translators(translators)
+
+ def update_website(self) -> None:
+ website = self.model.get_website()
+ self.view.update_website(website)
+
+ def run(self) -> None:
+ self.view.init_ui()
+ self.update_version_info()
+ self.update_description()
+ self.update_website()
+ self.update_copyright()
+ self.update_authors()
+ self.update_credits()
+ self.update_translators()
+ self.view.show()
diff --git a/lib/solaar/ui/about/view.py b/lib/solaar/ui/about/view.py
new file mode 100644
index 0000000000..6670ac4625
--- /dev/null
+++ b/lib/solaar/ui/about/view.py
@@ -0,0 +1,67 @@
+## Copyright (C) Solaar Contributors
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License along
+## with this program; if not, write to the Free Software Foundation, Inc.,
+## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+from typing import List
+from typing import Tuple
+from typing import Union
+
+from gi.repository import Gtk
+
+from solaar import NAME
+
+
+class AboutView:
+ def __init__(self) -> None:
+ self.view: Union[Gtk.AboutDialog, None] = None
+
+ def init_ui(self) -> None:
+ self.view = Gtk.AboutDialog()
+ self.view.set_program_name(NAME)
+ self.view.set_icon_name(NAME.lower())
+ self.view.set_license_type(Gtk.License.GPL_2_0)
+
+ self.view.connect("response", lambda x, y: self.handle_close(x))
+
+ def update_version_info(self, version: str) -> None:
+ self.view.set_version(version)
+
+ def update_description(self, comments: str) -> None:
+ self.view.set_comments(comments)
+
+ def update_copyright(self, copyright_text: str):
+ self.view.set_copyright(copyright_text)
+
+ def update_authors(self, authors: List[str]) -> None:
+ self.view.set_authors(authors)
+
+ def update_credits(self, credit_sections: List[Tuple[str, List[str]]]) -> None:
+ for section_name, people in credit_sections:
+ self.view.add_credit_section(section_name, people)
+
+ def update_translators(self, translators: List[str]) -> None:
+ translator_credits = "\n".join(translators)
+ self.view.set_translator_credits(translator_credits)
+
+ def update_website(self, website):
+ self.view.set_website_label(NAME)
+ self.view.set_website(website)
+
+ def show(self) -> None:
+ self.view.present()
+
+ def handle_close(self, event) -> None:
+ event.hide()
diff --git a/lib/solaar/ui/notify.py b/lib/solaar/ui/desktop_notifications.py
similarity index 86%
rename from lib/solaar/ui/notify.py
rename to lib/solaar/ui/desktop_notifications.py
index 619f87da0e..67864d78ff 100644
--- a/lib/solaar/ui/notify.py
+++ b/lib/solaar/ui/desktop_notifications.py
@@ -13,9 +13,9 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import importlib
# Optional desktop notifications.
-
import logging
from solaar import NAME
@@ -26,27 +26,35 @@
logger = logging.getLogger(__name__)
-try:
- import gi
+def notifications_available():
+ """Checks if notification service is available."""
+ notifications_supported = False
+ try:
+ import gi
- gi.require_version("Notify", "0.7")
- # this import is allowed to fail, in which case the entire feature is unavailable
- from gi.repository import GLib
- from gi.repository import Notify
+ gi.require_version("Notify", "0.7")
+
+ importlib.util.find_spec("gi.repository.GLib")
+ importlib.util.find_spec("gi.repository.Notify")
+
+ notifications_supported = True
+ except ValueError as e:
+ logger.warning(f"Notification service is not available: {e}")
+ return notifications_supported
- # assumed to be working since the import succeeded
- available = True
-except (ValueError, ImportError):
- available = False
+available = notifications_available()
if available:
+ from gi.repository import GLib
+ from gi.repository import Notify
+
# cache references to shown notifications here, so if another status comes
# while its notification is still visible we don't create another one
_notifications = {}
def init():
- """Init the notifications system."""
+ """Initialize desktop notifications."""
global available
if available:
if not Notify.is_initted():
@@ -60,6 +68,7 @@ def init():
return available and Notify.is_initted()
def uninit():
+ """Stop desktop notifications."""
if available and Notify.is_initted():
if logger.isEnabledFor(logging.INFO):
logger.info("stopping desktop notifications")
@@ -117,7 +126,7 @@ def show(dev, reason=None, icon=None, progress=None):
try:
n.show()
except Exception:
- logger.exception("showing %s", n)
+ logger.exception(f"showing {n}")
else:
diff --git a/lib/solaar/ui/tray.py b/lib/solaar/ui/tray.py
index 69c0e4a373..a076b6ce4b 100644
--- a/lib/solaar/ui/tray.py
+++ b/lib/solaar/ui/tray.py
@@ -31,10 +31,10 @@
from solaar import NAME
from solaar.i18n import _
-from . import about
from . import action
from . import icons
from . import window
+from .about import about
logger = logging.getLogger(__name__)
@@ -51,7 +51,7 @@ def _create_menu(quit_handler):
menu.append(no_receiver)
menu.append(Gtk.SeparatorMenuItem.new())
- menu.append(action.make_image_menu_item(_("About %s") % NAME, "help-about", about.show_window))
+ menu.append(action.make_image_menu_item(_("About %s") % NAME, "help-about", about.show))
menu.append(action.make_image_menu_item(_("Quit %s") % NAME, "application-exit", quit_handler))
menu.show_all()
diff --git a/lib/solaar/ui/window.py b/lib/solaar/ui/window.py
index f28ad7ce83..89b034ee36 100644
--- a/lib/solaar/ui/window.py
+++ b/lib/solaar/ui/window.py
@@ -29,11 +29,11 @@
from solaar.i18n import _
from solaar.i18n import ngettext
-from . import about
from . import action
from . import config_panel
from . import diversion_rules
from . import icons
+from .about import about
from .common import ui_async
gi.require_version("Gdk", "3.0")
@@ -305,7 +305,7 @@ def _create_window_layout():
bottom_buttons_box.set_spacing(20)
quit_button = _new_button(_("Quit %s") % NAME, "application-exit", _SMALL_BUTTON_ICON_SIZE, clicked=destroy)
bottom_buttons_box.add(quit_button)
- about_button = _new_button(_("About %s") % NAME, "help-about", _SMALL_BUTTON_ICON_SIZE, clicked=about.show_window)
+ about_button = _new_button(_("About %s") % NAME, "help-about", _SMALL_BUTTON_ICON_SIZE, clicked=about.show)
bottom_buttons_box.add(about_button)
diversion_button = _new_button(
_("Rule Editor"), "", _SMALL_BUTTON_ICON_SIZE, clicked=lambda *_trigger: diversion_rules.show_window(_model)
diff --git a/setup.py b/setup.py
index 21dd4c7df0..9a154bd601 100755
--- a/setup.py
+++ b/setup.py
@@ -3,6 +3,7 @@
from glob import glob
from os.path import dirname
+from pathlib import Path
from setuptools import find_packages
@@ -12,14 +13,11 @@
from distutils.core import setup
NAME = "Solaar"
-
-with open("lib/solaar/version", "r") as vfile:
- version = vfile.read().strip()
+version = Path("lib/solaar/version").read_text().strip()
try: # get commit from git describe
commit = subprocess.check_output(["git", "describe", "--always"], stderr=subprocess.DEVNULL).strip().decode()
- with open("lib/solaar/commit", "w") as vfile:
- vfile.write(f"{commit}\n")
+ Path("lib/solaar/commit").write_text(f"{commit}\n")
except Exception: # get commit from Ubuntu dpkg-parsechangelog
try:
commit = (
@@ -28,8 +26,7 @@
.decode()
)
commit = commit.split("~")
- with open("lib/solaar/commit", "w") as vfile:
- vfile.write(f"{commit[0]}\n")
+ Path("lib/solaar/commit").write_text(f"{commit[0]}\n")
except Exception as e:
print("Exception using dpkg-parsechangelog", e)
diff --git a/tests/hidapi/test_hidapi.py b/tests/hidapi/test_hidapi.py
index 2695e425c3..aef76891cc 100644
--- a/tests/hidapi/test_hidapi.py
+++ b/tests/hidapi/test_hidapi.py
@@ -1,6 +1,11 @@
+import platform
+
from unittest import mock
-import hidapi
+if platform.system() == "Linux":
+ import hidapi.udev_impl as hidapi
+else:
+ import hidapi.hidapi_impl as hidapi
def test_find_paired_node():
diff --git a/tests/logitech_receiver/hidpp.py b/tests/logitech_receiver/fake_hidpp.py
similarity index 100%
rename from tests/logitech_receiver/hidpp.py
rename to tests/logitech_receiver/fake_hidpp.py
diff --git a/tests/logitech_receiver/test_base.py b/tests/logitech_receiver/test_base.py
index 1df0da8e92..c84afb440d 100644
--- a/tests/logitech_receiver/test_base.py
+++ b/tests/logitech_receiver/test_base.py
@@ -6,16 +6,70 @@
@pytest.mark.parametrize(
"usb_id, expected_name, expected_receiver_kind",
[
- ("0xC548", "Bolt Receiver", "bolt"),
- ("0xC52B", "Unifying Receiver", "unifying"),
- ("0xC531", "Nano Receiver", "nano"),
- ("0xC53F", "Lightspeed Receiver", None),
- ("0xC517", "EX100 Receiver 27 Mhz", "27Mhz"),
+ (0xC548, "Bolt Receiver", "bolt"),
+ (0xC52B, "Unifying Receiver", "unifying"),
+ (0xC531, "Nano Receiver", "nano"),
+ (0xC53F, "Lightspeed Receiver", None),
+ (0xC517, "EX100 Receiver 27 Mhz", "27Mhz"),
],
)
def test_product_information(usb_id, expected_name, expected_receiver_kind):
res = base.product_information(usb_id)
assert res["name"] == expected_name
+ assert isinstance(res["vendor_id"], int)
+ assert isinstance(res["product_id"], int)
+
if expected_receiver_kind:
assert res["receiver_kind"] == expected_receiver_kind
+
+
+def test_filter_receivers_known():
+ bus_id = 2
+ vendor_id = 0x046D
+ product_id = 0xC548
+
+ receiver_info = base.filter_receivers(bus_id, vendor_id, product_id)
+
+ assert receiver_info["name"] == "Bolt Receiver"
+ assert receiver_info["receiver_kind"] == "bolt"
+
+
+def test_filter_receivers_unknown():
+ bus_id = 1
+ vendor_id = 0x046D
+ product_id = 0xC500
+
+ receiver_info = base.filter_receivers(bus_id, vendor_id, product_id)
+
+ assert receiver_info["bus_id"] == bus_id
+ assert receiver_info["product_id"] == product_id
+
+
+@pytest.mark.parametrize(
+ "hidpp_short, hidpp_long",
+ [(True, False), (False, True), (False, False)],
+)
+def test_filter_products_of_interest(hidpp_short, hidpp_long):
+ bus_id = 3
+ vendor_id = 0x046D
+ product_id = 0xC07E
+
+ receiver_info = base.filter_products_of_interest(
+ bus_id,
+ vendor_id,
+ product_id,
+ hidpp_short=hidpp_short,
+ hidpp_long=hidpp_long,
+ )
+
+ assert receiver_info["bus_id"] == bus_id
+ assert receiver_info["product_id"] == product_id
+
+
+def test_get_next_sw_id():
+ res1 = base._get_next_sw_id()
+ res2 = base._get_next_sw_id()
+
+ assert res1 == 2
+ assert res2 == 3
diff --git a/tests/logitech_receiver/test_desktop_notifications.py b/tests/logitech_receiver/test_desktop_notifications.py
new file mode 100644
index 0000000000..341f204ecd
--- /dev/null
+++ b/tests/logitech_receiver/test_desktop_notifications.py
@@ -0,0 +1,23 @@
+from unittest import mock
+
+from logitech_receiver import desktop_notifications
+
+
+def test_notifications_available():
+ result = desktop_notifications.notifications_available()
+
+ assert not result
+
+
+def test_init():
+ assert not desktop_notifications.init()
+
+
+def test_uninit():
+ assert desktop_notifications.uninit() is None
+
+
+def test_show():
+ dev = mock.MagicMock()
+ reason = "unknown"
+ assert desktop_notifications.show(dev, reason) is None
diff --git a/tests/logitech_receiver/test_device.py b/tests/logitech_receiver/test_device.py
index 56367db7da..2dd0dc02a6 100644
--- a/tests/logitech_receiver/test_device.py
+++ b/tests/logitech_receiver/test_device.py
@@ -17,7 +17,6 @@
from dataclasses import dataclass
from functools import partial
from typing import Optional
-from unittest import mock
import pytest
@@ -25,7 +24,7 @@
from logitech_receiver import device
from logitech_receiver import hidpp20
-from . import hidpp
+from . import fake_hidpp
class LowLevelInterfaceFake:
@@ -33,14 +32,17 @@ def __init__(self, responses=None):
self.responses = responses
def open_path(self, path):
- return hidpp.open_path(path)
+ return fake_hidpp.open_path(path)
+
+ def find_paired_node(self, receiver_path: str, index: int, timeout: int):
+ return None
def request(self, response, *args, **kwargs):
- func = partial(hidpp.request, self.responses)
+ func = partial(fake_hidpp.request, self.responses)
return func(response, *args, **kwargs)
def ping(self, response, *args, **kwargs):
- func = partial(hidpp.ping, self.responses)
+ func = partial(fake_hidpp.ping, self.responses)
return func(response, *args, **kwargs)
def close(self, *args, **kwargs):
@@ -70,29 +72,33 @@ class DeviceInfoStub:
@pytest.mark.parametrize(
"device_info, responses, expected_success",
- [(di_bad_handle, hidpp.r_empty, None), (di_error, hidpp.r_empty, False), (di_CCCC, hidpp.r_empty, True)],
+ [
+ (di_bad_handle, fake_hidpp.r_empty, None),
+ (di_error, fake_hidpp.r_empty, False),
+ (di_CCCC, fake_hidpp.r_empty, True),
+ ],
)
def test_create_device(device_info, responses, expected_success):
low_level_mock = LowLevelInterfaceFake(responses)
if expected_success is None:
with pytest.raises(PermissionError):
- device.DeviceFactory.create_device(low_level_mock, device_info)
+ device.create_device(low_level_mock, device_info)
elif not expected_success:
with pytest.raises(TypeError):
- device.DeviceFactory.create_device(low_level_mock, device_info)
+ device.create_device(low_level_mock, device_info)
else:
- test_device = device.DeviceFactory.create_device(low_level_mock, device_info)
+ test_device = device.create_device(low_level_mock, device_info)
assert bool(test_device) == expected_success
@pytest.mark.parametrize(
"device_info, responses, expected_codename, expected_name, expected_kind",
- [(di_CCCC, hidpp.r_empty, "?? (CCCC)", "Unknown device CCCC", "?")],
+ [(di_CCCC, fake_hidpp.r_empty, "?? (CCCC)", "Unknown device CCCC", "?")],
)
def test_device_name(device_info, responses, expected_codename, expected_name, expected_kind):
low_level = LowLevelInterfaceFake(responses)
- test_device = device.DeviceFactory.create_device(low_level, device_info)
+ test_device = device.create_device(low_level, device_info)
assert test_device.codename == expected_codename
assert test_device.name == expected_name
@@ -103,7 +109,14 @@ def test_device_name(device_info, responses, expected_codename, expected_name, e
"device_info, responses, handle, _name, _codename, number, protocol, registers",
zip(
[di_CCCC, di_C318, di_B530, di_C068, di_C08A, di_DDDD],
- [hidpp.r_empty, hidpp.r_keyboard_1, hidpp.r_keyboard_2, hidpp.r_mouse_1, hidpp.r_mouse_2, hidpp.r_mouse_3],
+ [
+ fake_hidpp.r_empty,
+ fake_hidpp.r_keyboard_1,
+ fake_hidpp.r_keyboard_2,
+ fake_hidpp.r_mouse_1,
+ fake_hidpp.r_mouse_2,
+ fake_hidpp.r_mouse_3,
+ ],
[0x11, 0x11, 0x11, 0x11, 0x11, 0x11],
[None, "Illuminated Keyboard", "Craft Advanced Keyboard", "G700 Gaming Mouse", "MX Vertical Wireless Mouse", None],
[None, "Illuminated", "Craft", "G700", "MX Vertical", None],
@@ -141,12 +154,6 @@ def __contains__(self, dev):
return True
-@pytest.fixture
-def mock_hid():
- with mock.patch("hidapi.find_paired_node", return_value=None) as find_paired_node:
- yield find_paired_node
-
-
pi_CCCC = {"wpid": "CCCC", "kind": 0, "serial": None, "polling": "1ms", "power_switch": "top"}
pi_2011 = {"wpid": "2011", "kind": 1, "serial": "1234", "polling": "2ms", "power_switch": "bottom"}
pi_4066 = {"wpid": "4066", "kind": 1, "serial": "5678", "polling": "4ms", "power_switch": "left"}
@@ -160,7 +167,14 @@ def mock_hid():
zip(
range(1, 7),
[pi_CCCC, pi_2011, pi_4066, pi_1007, pi_407B, pi_DDDD],
- [hidpp.r_empty, hidpp.r_keyboard_1, hidpp.r_keyboard_2, hidpp.r_mouse_1, hidpp.r_mouse_2, hidpp.r_mouse_3],
+ [
+ fake_hidpp.r_empty,
+ fake_hidpp.r_keyboard_1,
+ fake_hidpp.r_keyboard_2,
+ fake_hidpp.r_mouse_1,
+ fake_hidpp.r_mouse_2,
+ fake_hidpp.r_mouse_3,
+ ],
[0x11, 0x11, 0x11, 0x11, 0x11, 0x11],
[None, "Wireless Keyboard K520", "Craft Advanced Keyboard", "MX Air", "MX Vertical Wireless Mouse", None],
["CODE", "K520", "Craft", "MX Air", "MX Vertical", "CODE"],
@@ -176,12 +190,10 @@ def mock_hid():
],
),
)
-def test_device_receiver(number, pairing_info, responses, handle, _name, codename, p, p2, name, mock_hid):
- mock_hid.side_effect = lambda x, y, z: x
-
+def test_device_receiver(number, pairing_info, responses, handle, _name, codename, p, p2, name):
low_level = LowLevelInterfaceFake(responses)
- low_level.request = partial(hidpp.request, hidpp.replace_number(responses, number))
- low_level.ping = partial(hidpp.ping, hidpp.replace_number(responses, number))
+ low_level.request = partial(fake_hidpp.request, fake_hidpp.replace_number(responses, number))
+ low_level.ping = partial(fake_hidpp.ping, fake_hidpp.replace_number(responses, number))
test_device = device.Device(low_level, FakeReceiver(codename="CODE"), number, True, pairing_info, handle=handle)
test_device.receiver.device = test_device
@@ -207,7 +219,14 @@ def test_device_receiver(number, pairing_info, responses, handle, _name, codenam
zip(
range(1, 7),
[pi_CCCC, pi_2011, pi_4066, pi_1007, pi_407B, pi_DDDD],
- [hidpp.r_empty, hidpp.r_keyboard_1, hidpp.r_keyboard_2, hidpp.r_mouse_1, hidpp.r_mouse_2, hidpp.r_mouse_3],
+ [
+ fake_hidpp.r_empty,
+ fake_hidpp.r_keyboard_1,
+ fake_hidpp.r_keyboard_2,
+ fake_hidpp.r_mouse_1,
+ fake_hidpp.r_mouse_2,
+ fake_hidpp.r_mouse_3,
+ ],
[None, 0x11, 0x11, 0x11, 0x11, 0x11],
[None, None, "12345678", None, None, "12345679"], # unitId
[None, None, "1234567890AB", None, None, "123456780000"], # modelId
@@ -220,12 +239,10 @@ def test_device_receiver(number, pairing_info, responses, handle, _name, codenam
["1ms", "2ms", "4ms", "8ms", "1ms", "9ms"], # polling rate
),
)
-def test_device_ids(number, info, responses, handle, unitId, modelId, tid, kind, firmware, serial, id, psl, rate, mock_hid):
- mock_hid.side_effect = lambda x, y, z: x
-
+def test_device_ids(number, info, responses, handle, unitId, modelId, tid, kind, firmware, serial, id, psl, rate):
low_level = LowLevelInterfaceFake(responses)
- low_level.request = partial(hidpp.request, hidpp.replace_number(responses, number))
- low_level.ping = partial(hidpp.ping, hidpp.replace_number(responses, number))
+ low_level.request = partial(fake_hidpp.request, fake_hidpp.replace_number(responses, number))
+ low_level.ping = partial(fake_hidpp.ping, fake_hidpp.replace_number(responses, number))
test_device = device.Device(low_level, FakeReceiver(), number, True, info, handle=handle)
@@ -244,19 +261,19 @@ def __init__(self, responses, *args, **kwargs):
self.responses = responses
super().__init__(LowLevelInterfaceFake(responses), *args, **kwargs)
- request = hidpp.Device.request
- ping = hidpp.Device.ping
+ request = fake_hidpp.Device.request
+ ping = fake_hidpp.Device.ping
@pytest.mark.parametrize(
"device_info, responses, protocol, led, keys, remap, gestures, backlight, profiles",
[
- (di_CCCC, hidpp.r_empty, 1.0, type(None), None, None, None, None, None),
- (di_C318, hidpp.r_empty, 1.0, type(None), None, None, None, None, None),
- (di_B530, hidpp.r_keyboard_1, 1.0, type(None), None, None, None, None, None),
- (di_B530, hidpp.r_keyboard_2, 2.0, type(None), 4, 0, 0, None, None),
- (di_B530, hidpp.complex_responses_1, 4.5, hidpp20.LEDEffectsInfo, 0, 0, 0, None, None),
- (di_B530, hidpp.complex_responses_2, 4.5, hidpp20.RGBEffectsInfo, 8, 3, 1, True, True),
+ (di_CCCC, fake_hidpp.r_empty, 1.0, type(None), None, None, None, None, None),
+ (di_C318, fake_hidpp.r_empty, 1.0, type(None), None, None, None, None, None),
+ (di_B530, fake_hidpp.r_keyboard_1, 1.0, type(None), None, None, None, None, None),
+ (di_B530, fake_hidpp.r_keyboard_2, 2.0, type(None), 4, 0, 0, None, None),
+ (di_B530, fake_hidpp.complex_responses_1, 4.5, hidpp20.LEDEffectsInfo, 0, 0, 0, None, None),
+ (di_B530, fake_hidpp.complex_responses_2, 4.5, hidpp20.RGBEffectsInfo, 8, 3, 1, True, True),
],
)
def test_device_complex(device_info, responses, protocol, led, keys, remap, gestures, backlight, profiles, mocker):
@@ -289,12 +306,12 @@ def test_device_complex(device_info, responses, protocol, led, keys, remap, gest
@pytest.mark.parametrize(
"device_info, responses, protocol, p, persister, settings",
[
- (di_CCCC, hidpp.r_empty, 1.0, None, None, 0),
- (di_C318, hidpp.r_empty, 1.0, {}, {}, 0),
- (di_C318, hidpp.r_keyboard_1, 1.0, {"n": "n"}, {"n": "n"}, 1),
- (di_B530, hidpp.r_keyboard_2, 4.5, {"m": "m"}, {"m": "m"}, 1),
- (di_C068, hidpp.r_mouse_1, 1.0, {"o": "o"}, {"o": "o"}, 2),
- (di_C08A, hidpp.r_mouse_2, 4.5, {"p": "p"}, {"p": "p"}, 0),
+ (di_CCCC, fake_hidpp.r_empty, 1.0, None, None, 0),
+ (di_C318, fake_hidpp.r_empty, 1.0, {}, {}, 0),
+ (di_C318, fake_hidpp.r_keyboard_1, 1.0, {"n": "n"}, {"n": "n"}, 1),
+ (di_B530, fake_hidpp.r_keyboard_2, 4.5, {"m": "m"}, {"m": "m"}, 1),
+ (di_C068, fake_hidpp.r_mouse_1, 1.0, {"o": "o"}, {"o": "o"}, 2),
+ (di_C08A, fake_hidpp.r_mouse_2, 4.5, {"p": "p"}, {"p": "p"}, 0),
],
)
def test_device_settings(device_info, responses, protocol, p, persister, settings, mocker):
@@ -310,9 +327,21 @@ def test_device_settings(device_info, responses, protocol, p, persister, setting
@pytest.mark.parametrize(
"device_info, responses, protocol, battery, changed",
[
- (di_C318, hidpp.r_empty, 1.0, None, {"active": True, "alert": 0, "reason": None}),
- (di_C318, hidpp.r_keyboard_1, 1.0, common.Battery(50, None, 0, None), {"active": True, "alert": 0, "reason": None}),
- (di_B530, hidpp.r_keyboard_2, 4.5, common.Battery(18, 52, None, None), {"active": True, "alert": 0, "reason": None}),
+ (di_C318, fake_hidpp.r_empty, 1.0, None, {"active": True, "alert": 0, "reason": None}),
+ (
+ di_C318,
+ fake_hidpp.r_keyboard_1,
+ 1.0,
+ common.Battery(50, None, 0, None),
+ {"active": True, "alert": 0, "reason": None},
+ ),
+ (
+ di_B530,
+ fake_hidpp.r_keyboard_2,
+ 4.5,
+ common.Battery(18, 52, None, None),
+ {"active": True, "alert": 0, "reason": None},
+ ),
],
)
def test_device_battery(device_info, responses, protocol, battery, changed, mocker):
diff --git a/tests/logitech_receiver/test_hidpp20_complex.py b/tests/logitech_receiver/test_hidpp20_complex.py
index 632d30b7ed..b8dc5612db 100644
--- a/tests/logitech_receiver/test_hidpp20_complex.py
+++ b/tests/logitech_receiver/test_hidpp20_complex.py
@@ -23,16 +23,18 @@
from logitech_receiver import hidpp20_constants
from logitech_receiver import special_keys
-from . import hidpp
+from . import fake_hidpp
_hidpp20 = hidpp20.Hidpp20()
-device_offline = hidpp.Device("REGISTERS", False)
-device_registers = hidpp.Device("OFFLINE", True, 1.0)
-device_nofeatures = hidpp.Device("NOFEATURES", True, 4.5)
-device_zerofeatures = hidpp.Device("ZEROFEATURES", True, 4.5, [hidpp.Response("0000", 0x0000, "0001")])
-device_broken = hidpp.Device("BROKEN", True, 4.5, [hidpp.Response("0500", 0x0000, "0001"), hidpp.Response(None, 0x0100)])
-device_standard = hidpp.Device("STANDARD", True, 4.5, hidpp.r_keyboard_2)
+device_offline = fake_hidpp.Device("REGISTERS", False)
+device_registers = fake_hidpp.Device("OFFLINE", True, 1.0)
+device_nofeatures = fake_hidpp.Device("NOFEATURES", True, 4.5)
+device_zerofeatures = fake_hidpp.Device("ZEROFEATURES", True, 4.5, [fake_hidpp.Response("0000", 0x0000, "0001")])
+device_broken = fake_hidpp.Device(
+ "BROKEN", True, 4.5, [fake_hidpp.Response("0500", 0x0000, "0001"), fake_hidpp.Response(None, 0x0100)]
+)
+device_standard = fake_hidpp.Device("STANDARD", True, 4.5, fake_hidpp.r_keyboard_2)
@pytest.mark.parametrize(
@@ -210,15 +212,15 @@ def test_ReprogrammableKeyV4_key(device, index, cid, tid, flags, pos, group, gma
@pytest.mark.parametrize(
"responses, index, mapped_to, remappable_to, mapping_flags",
[
- (hidpp.responses_key, 1, "Right Click", common.UnsortedNamedInts(Right_Click=81, Left_Click=80), []),
- (hidpp.responses_key, 2, "Left Click", None, ["diverted"]),
- (hidpp.responses_key, 3, "Mouse Back Button", None, ["diverted", "persistently diverted"]),
- (hidpp.responses_key, 4, "Mouse Forward Button", None, ["diverted", "raw XY diverted"]),
+ (fake_hidpp.responses_key, 1, "Right Click", common.UnsortedNamedInts(Right_Click=81, Left_Click=80), []),
+ (fake_hidpp.responses_key, 2, "Left Click", None, ["diverted"]),
+ (fake_hidpp.responses_key, 3, "Mouse Back Button", None, ["diverted", "persistently diverted"]),
+ (fake_hidpp.responses_key, 4, "Mouse Forward Button", None, ["diverted", "raw XY diverted"]),
],
)
# these fields need access all the key data, so start by setting up a device and its key data
def test_ReprogrammableKeyV4_query(responses, index, mapped_to, remappable_to, mapping_flags):
- device = hidpp.Device("KEY", responses=responses, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5)
+ device = fake_hidpp.Device("KEY", responses=responses, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5)
device._keys = _hidpp20.get_keys(device)
key = device.keys[index]
@@ -231,15 +233,15 @@ def test_ReprogrammableKeyV4_query(responses, index, mapped_to, remappable_to, m
@pytest.mark.parametrize(
"responses, index, diverted, persistently_diverted, rawXY_reporting, remap, sets",
[
- (hidpp.responses_key, 1, True, False, True, 0x52, ["0051080000"]),
- (hidpp.responses_key, 2, False, True, False, 0x51, ["0052020000", "0052200000", "0052000051"]),
- (hidpp.responses_key, 3, False, True, True, 0x50, ["0053020000", "00530C0000", "0053300000", "0053000050"]),
- (hidpp.responses_key, 4, False, False, False, 0x50, ["0056020000", "0056080000", "0056200000", "0056000050"]),
+ (fake_hidpp.responses_key, 1, True, False, True, 0x52, ["0051080000"]),
+ (fake_hidpp.responses_key, 2, False, True, False, 0x51, ["0052020000", "0052200000", "0052000051"]),
+ (fake_hidpp.responses_key, 3, False, True, True, 0x50, ["0053020000", "00530C0000", "0053300000", "0053000050"]),
+ (fake_hidpp.responses_key, 4, False, False, False, 0x50, ["0056020000", "0056080000", "0056200000", "0056000050"]),
],
)
def test_ReprogrammableKeyV4_set(responses, index, diverted, persistently_diverted, rawXY_reporting, remap, sets, mocker):
- responses += [hidpp.Response(r, 0x530, r) for r in sets]
- device = hidpp.Device("KEY", responses=responses, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5)
+ responses += [fake_hidpp.Response(r, 0x530, r) for r in sets]
+ device = fake_hidpp.Device("KEY", responses=responses, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5)
device._keys = _hidpp20.get_keys(device)
device._keys._ensure_all_keys_queried() # do this now so that the last requests are sets
spy_request = mocker.spy(device, "request")
@@ -275,23 +277,26 @@ def test_ReprogrammableKeyV4_set(responses, index, diverted, persistently_divert
key.remap(remap)
assert (key.mapped_to == remap) or (remap not in key.remappable_to and remap != 0)
- hidpp.match_requests(len(sets), responses, spy_request.call_args_list)
+ fake_hidpp.match_requests(len(sets), responses, spy_request.call_args_list)
@pytest.mark.parametrize(
"r, index, cid, actionId, remapped, mask, status, action, modifiers, byts, remap",
[
- (hidpp.responses_key, 1, 0x0051, 0x02, 0x0002, 0x01, 0, "Mouse Button: 2", "Cntrl+", "02000201", "01000400"),
- (hidpp.responses_key, 2, 0x0052, 0x01, 0x0001, 0x00, 1, "Key: 1", "", "01000100", "02005004"),
- (hidpp.responses_key, 3, 0x0053, 0x02, 0x0001, 0x00, 1, "Mouse Button: 1", "", "02000100", "7FFFFFFF"),
+ (fake_hidpp.responses_key, 1, 0x0051, 0x02, 0x0002, 0x01, 0, "Mouse Button: 2", "Cntrl+", "02000201", "01000400"),
+ (fake_hidpp.responses_key, 2, 0x0052, 0x01, 0x0001, 0x00, 1, "Key: 1", "", "01000100", "02005004"),
+ (fake_hidpp.responses_key, 3, 0x0053, 0x02, 0x0001, 0x00, 1, "Mouse Button: 1", "", "02000100", "7FFFFFFF"),
],
)
def test_RemappableAction(r, index, cid, actionId, remapped, mask, status, action, modifiers, byts, remap, mocker):
if int(remap, 16) == special_keys.KEYS_Default:
- responses = r + [hidpp.Response("040000", 0x0000, "1C00"), hidpp.Response("00", 0x450, f"{cid:04X}" + "FF")]
+ responses = r + [fake_hidpp.Response("040000", 0x0000, "1C00"), fake_hidpp.Response("00", 0x450, f"{cid:04X}" + "FF")]
else:
- responses = r + [hidpp.Response("040000", 0x0000, "1C00"), hidpp.Response("00", 0x440, f"{cid:04X}" + "FF" + remap)]
- device = hidpp.Device("KEY", responses=responses, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5)
+ responses = r + [
+ fake_hidpp.Response("040000", 0x0000, "1C00"),
+ fake_hidpp.Response("00", 0x440, f"{cid:04X}" + "FF" + remap),
+ ]
+ device = fake_hidpp.Device("KEY", responses=responses, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5)
key = hidpp20.PersistentRemappableAction(device, index, cid, actionId, remapped, mask, status)
spy_request = mocker.spy(device, "request")
@@ -312,7 +317,7 @@ def test_RemappableAction(r, index, cid, actionId, remapped, mask, status, actio
assert key.data_bytes.hex().upper() == (byts if int(remap, 16) == special_keys.KEYS_Default else remap)
if int(remap, 16) != special_keys.KEYS_Default:
- hidpp.match_requests(1, responses, spy_request.call_args_list)
+ fake_hidpp.match_requests(1, responses, spy_request.call_args_list)
# KeysArray methods tested in KeysArrayV4
@@ -378,7 +383,9 @@ def test_KeysArrayV4_index(key, index):
assert result == index
-device_key = hidpp.Device("KEY", responses=hidpp.responses_key, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5)
+device_key = fake_hidpp.Device(
+ "KEY", responses=fake_hidpp.responses_key, feature=hidpp20_constants.FEATURE.REPROG_CONTROLS_V4, offset=5
+)
@pytest.mark.parametrize(
@@ -434,13 +441,13 @@ def test_KeysArrayPersistent_index_error(device, index):
@pytest.mark.parametrize(
"responses, key, index, mapped_to, capabilities",
[
- (hidpp.responses_remap, special_keys.CONTROL.Left_Button, 0, common.NamedInt(0x01, "Mouse Button Left"), 0x41),
- (hidpp.responses_remap, special_keys.CONTROL.Right_Button, 1, common.NamedInt(0x01, "Mouse Button Left"), 0x41),
- (hidpp.responses_remap, special_keys.CONTROL.Middle_Button, 2, common.NamedInt(0x51, "DOWN"), 0x41),
+ (fake_hidpp.responses_remap, special_keys.CONTROL.Left_Button, 0, common.NamedInt(0x01, "Mouse Button Left"), 0x41),
+ (fake_hidpp.responses_remap, special_keys.CONTROL.Right_Button, 1, common.NamedInt(0x01, "Mouse Button Left"), 0x41),
+ (fake_hidpp.responses_remap, special_keys.CONTROL.Middle_Button, 2, common.NamedInt(0x51, "DOWN"), 0x41),
],
)
def test_KeysArrayPersistent_key(responses, key, index, mapped_to, capabilities):
- device = hidpp.Device("REMAP", responses=responses, feature=hidpp20_constants.FEATURE.PERSISTENT_REMAPPABLE_ACTION)
+ device = fake_hidpp.Device("REMAP", responses=responses, feature=hidpp20_constants.FEATURE.PERSISTENT_REMAPPABLE_ACTION)
device._remap_keys = _hidpp20.get_remap_keys(device)
device._remap_keys._ensure_all_keys_queried()
@@ -494,13 +501,13 @@ def test_Gesture(device, low, high, next_index, next_diversion_index, name, cbe,
@pytest.mark.parametrize(
"responses, gest, enabled, diverted, set_result, unset_result, divert_result, undivert_result",
[
- (hidpp.responses_gestures, 20, None, None, None, None, None, None),
- (hidpp.responses_gestures, 1, True, False, "01", "00", "01", "00"),
- (hidpp.responses_gestures, 45, False, None, "01", "00", None, None),
+ (fake_hidpp.responses_gestures, 20, None, None, None, None, None, None),
+ (fake_hidpp.responses_gestures, 1, True, False, "01", "00", "01", "00"),
+ (fake_hidpp.responses_gestures, 45, False, None, "01", "00", None, None),
],
)
def test_Gesture_set(responses, gest, enabled, diverted, set_result, unset_result, divert_result, undivert_result):
- device = hidpp.Device("GESTURE", responses=responses, feature=hidpp20_constants.FEATURE.GESTURE_2)
+ device = fake_hidpp.Device("GESTURE", responses=responses, feature=hidpp20_constants.FEATURE.GESTURE_2)
gestures = _hidpp20.get_gestures(device)
gesture = gestures.gesture(gest)
@@ -516,11 +523,11 @@ def test_Gesture_set(responses, gest, enabled, diverted, set_result, unset_resul
@pytest.mark.parametrize(
"responses, prm, id, index, size, value, default_value, write1, write2",
[
- (hidpp.responses_gestures, 4, common.NamedInt(4, "ScaleFactor"), 0, 2, 256, 256, "0080", "0180"),
+ (fake_hidpp.responses_gestures, 4, common.NamedInt(4, "ScaleFactor"), 0, 2, 256, 256, "0080", "0180"),
],
)
def test_Param(responses, prm, id, index, size, value, default_value, write1, write2):
- device = hidpp.Device("GESTURE", responses=responses, feature=hidpp20_constants.FEATURE.GESTURE_2)
+ device = fake_hidpp.Device("GESTURE", responses=responses, feature=hidpp20_constants.FEATURE.GESTURE_2)
gestures = _hidpp20.get_gestures(device)
param = gestures.param(prm)
@@ -539,13 +546,13 @@ def test_Param(responses, prm, id, index, size, value, default_value, write1, wr
@pytest.mark.parametrize(
"responses, id, s, byte_count, value, string",
[
- (hidpp.responses_gestures, 1, "DVI field width", 1, 8, "[DVI field width=8]"),
- (hidpp.responses_gestures, 2, "field widths", 1, 8, "[field widths=8]"),
- (hidpp.responses_gestures, 3, "period unit", 2, 2048, "[period unit=2048]"),
+ (fake_hidpp.responses_gestures, 1, "DVI field width", 1, 8, "[DVI field width=8]"),
+ (fake_hidpp.responses_gestures, 2, "field widths", 1, 8, "[field widths=8]"),
+ (fake_hidpp.responses_gestures, 3, "period unit", 2, 2048, "[period unit=2048]"),
],
)
def test_Spec(responses, id, s, byte_count, value, string):
- device = hidpp.Device("GESTURE", responses=responses, feature=hidpp20_constants.FEATURE.GESTURE_2)
+ device = fake_hidpp.Device("GESTURE", responses=responses, feature=hidpp20_constants.FEATURE.GESTURE_2)
gestures = _hidpp20.get_gestures(device)
spec = gestures.specs[id]
@@ -558,7 +565,9 @@ def test_Spec(responses, id, s, byte_count, value, string):
def test_Gestures():
- device = hidpp.Device("GESTURES", responses=hidpp.responses_gestures, feature=hidpp20_constants.FEATURE.GESTURE_2)
+ device = fake_hidpp.Device(
+ "GESTURES", responses=fake_hidpp.responses_gestures, feature=hidpp20_constants.FEATURE.GESTURE_2
+ )
gestures = _hidpp20.get_gestures(device)
assert gestures
@@ -584,11 +593,11 @@ def test_Gestures():
responses_backlight = [
- hidpp.Response("010118000001020003000400", 0x0400),
- hidpp.Response("0101FF00020003000400", 0x0410, "0101FF00020003000400"),
+ fake_hidpp.Response("010118000001020003000400", 0x0400),
+ fake_hidpp.Response("0101FF00020003000400", 0x0410, "0101FF00020003000400"),
]
-device_backlight = hidpp.Device("BACKLIGHT", responses=responses_backlight, feature=hidpp20_constants.FEATURE.BACKLIGHT2)
+device_backlight = fake_hidpp.Device("BACKLIGHT", responses=responses_backlight, feature=hidpp20_constants.FEATURE.BACKLIGHT2)
def test_Backlight():
@@ -642,12 +651,26 @@ def test_LEDEffectSetting(hex, ID, color, speed, period, intensity, ramp, form):
@pytest.mark.parametrize(
"feature, function, response, ID, capabilities, period",
[
- [hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, 0x20, hidpp.Response("0102000300040005", 0x0420, "010200"), 3, 4, 5],
- [hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, 0x20, hidpp.Response("0102000700080009", 0x0420, "010200"), 7, 8, 9],
+ [
+ hidpp20_constants.FEATURE.COLOR_LED_EFFECTS,
+ 0x20,
+ fake_hidpp.Response("0102000300040005", 0x0420, "010200"),
+ 3,
+ 4,
+ 5,
+ ],
+ [
+ hidpp20_constants.FEATURE.COLOR_LED_EFFECTS,
+ 0x20,
+ fake_hidpp.Response("0102000700080009", 0x0420, "010200"),
+ 7,
+ 8,
+ 9,
+ ],
],
)
def test_LEDEffectInfo(feature, function, response, ID, capabilities, period):
- device = hidpp.Device(feature=feature, responses=[response])
+ device = fake_hidpp.Device(feature=feature, responses=[response])
info = hidpp20.LEDEffectInfo(feature, function, device, 1, 2)
@@ -661,12 +684,12 @@ def test_LEDEffectInfo(feature, function, response, ID, capabilities, period):
@pytest.mark.parametrize(
"feature, function, offset, effect_function, responses, index, location, count, id_1",
[
- [hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, 0x10, 0, 0x20, hidpp.zone_responses_1, 0, 1, 2, 0xB],
- [hidpp20_constants.FEATURE.RGB_EFFECTS, 0x00, 1, 0x00, hidpp.zone_responses_2, 0, 1, 2, 2],
+ [hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, 0x10, 0, 0x20, fake_hidpp.zone_responses_1, 0, 1, 2, 0xB],
+ [hidpp20_constants.FEATURE.RGB_EFFECTS, 0x00, 1, 0x00, fake_hidpp.zone_responses_2, 0, 1, 2, 2],
],
)
def test_LEDZoneInfo(feature, function, offset, effect_function, responses, index, location, count, id_1):
- device = hidpp.Device(feature=feature, responses=responses, offset=0x07)
+ device = fake_hidpp.Device(feature=feature, responses=responses, offset=0x07)
zone = hidpp20.LEDZoneInfo(feature, function, offset, effect_function, device, index)
@@ -680,13 +703,17 @@ def test_LEDZoneInfo(feature, function, offset, effect_function, responses, inde
@pytest.mark.parametrize(
"responses, setting, expected_command",
[
- [hidpp.zone_responses_1, hidpp20.LEDEffectSetting(ID=0), None],
- [hidpp.zone_responses_1, hidpp20.LEDEffectSetting(ID=3, period=0x20, intensity=0x50), "000000000000000020500000"],
- [hidpp.zone_responses_1, hidpp20.LEDEffectSetting(ID=0xB, color=0x808080, period=0x20), "000180808000002000000000"],
+ [fake_hidpp.zone_responses_1, hidpp20.LEDEffectSetting(ID=0), None],
+ [fake_hidpp.zone_responses_1, hidpp20.LEDEffectSetting(ID=3, period=0x20, intensity=0x50), "000000000000000020500000"],
+ [
+ fake_hidpp.zone_responses_1,
+ hidpp20.LEDEffectSetting(ID=0xB, color=0x808080, period=0x20),
+ "000180808000002000000000",
+ ],
],
)
def test_LEDZoneInfo_to_command(responses, setting, expected_command):
- device = hidpp.Device(feature=hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, responses=responses, offset=0x07)
+ device = fake_hidpp.Device(feature=hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, responses=responses, offset=0x07)
zone = hidpp20.LEDZoneInfo(hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, 0x10, 0, 0x20, device, 0)
command = zone.to_command(setting)
@@ -697,12 +724,12 @@ def test_LEDZoneInfo_to_command(responses, setting, expected_command):
@pytest.mark.parametrize(
"feature, cls, responses, readable, count, count_0",
[
- [hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, hidpp20.LEDEffectsInfo, hidpp.effects_responses_1, 1, 1, 2],
- [hidpp20_constants.FEATURE.RGB_EFFECTS, hidpp20.RGBEffectsInfo, hidpp.effects_responses_2, 1, 1, 2],
+ [hidpp20_constants.FEATURE.COLOR_LED_EFFECTS, hidpp20.LEDEffectsInfo, fake_hidpp.effects_responses_1, 1, 1, 2],
+ [hidpp20_constants.FEATURE.RGB_EFFECTS, hidpp20.RGBEffectsInfo, fake_hidpp.effects_responses_2, 1, 1, 2],
],
)
def test_LED_RGB_EffectsInfo(feature, cls, responses, readable, count, count_0):
- device = hidpp.Device(feature=feature, responses=responses, offset=0x07)
+ device = fake_hidpp.Device(feature=feature, responses=responses, offset=0x07)
effects = cls(device)
@@ -721,7 +748,7 @@ def test_LED_RGB_EffectsInfo(feature, cls, responses, readable, count, count_0):
("80020454", 0x8, None, None, 0x02, 0x54, 0x04, None, None),
("80030454", 0x8, None, None, 0x03, 0x0454, None, None, None),
("900AFF01", 0x9, None, None, None, 0x0A, None, 0x01, None),
- ("709090A0", 0x7, None, None, None, None, None, None, b"\x70\x90\x90\xA0"),
+ ("709090A0", 0x7, None, None, None, None, None, None, b"\x70\x90\x90\xa0"),
],
)
def test_button_bytes(hex, behavior, sector, address, typ, val, modifiers, data, byt):
@@ -820,13 +847,15 @@ def test_OnboardProfile_bytes(hex, name, sector, enabled, buttons, gbuttons, res
@pytest.mark.parametrize(
"responses, name, count, buttons, gbuttons, sectors, size",
[
- (hidpp.responses_profiles, "ONB", 1, 2, 2, 1, 254),
- (hidpp.responses_profiles_rom, "ONB", 1, 2, 2, 1, 254),
- (hidpp.responses_profiles_rom_2, "ONB", 1, 2, 2, 1, 254),
+ (fake_hidpp.responses_profiles, "ONB", 1, 2, 2, 1, 254),
+ (fake_hidpp.responses_profiles_rom, "ONB", 1, 2, 2, 1, 254),
+ (fake_hidpp.responses_profiles_rom_2, "ONB", 1, 2, 2, 1, 254),
],
)
def test_OnboardProfiles_device(responses, name, count, buttons, gbuttons, sectors, size):
- device = hidpp.Device(name, True, 4.5, responses=responses, feature=hidpp20_constants.FEATURE.ONBOARD_PROFILES, offset=0x9)
+ device = fake_hidpp.Device(
+ name, True, 4.5, responses=responses, feature=hidpp20_constants.FEATURE.ONBOARD_PROFILES, offset=0x9
+ )
device._profiles = None
profiles = _hidpp20.get_profiles(device)
diff --git a/tests/logitech_receiver/test_hidpp20_simple.py b/tests/logitech_receiver/test_hidpp20_simple.py
index 5817a4d105..983322f2d2 100644
--- a/tests/logitech_receiver/test_hidpp20_simple.py
+++ b/tests/logitech_receiver/test_hidpp20_simple.py
@@ -20,18 +20,18 @@
from logitech_receiver import hidpp20
from logitech_receiver import hidpp20_constants
-from . import hidpp
+from . import fake_hidpp
_hidpp20 = hidpp20.Hidpp20()
def test_get_firmware():
responses = [
- hidpp.Response("02FFFF", 0x0400),
- hidpp.Response("01414243030401000101000102030405", 0x0410, "00"),
- hidpp.Response("02414243030401000101000102030405", 0x0410, "01"),
+ fake_hidpp.Response("02FFFF", 0x0400),
+ fake_hidpp.Response("01414243030401000101000102030405", 0x0410, "00"),
+ fake_hidpp.Response("02414243030401000101000102030405", 0x0410, "01"),
]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_FW_VERSION)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_FW_VERSION)
result = _hidpp20.get_firmware(device)
@@ -41,8 +41,8 @@ def test_get_firmware():
def test_get_ids():
- responses = [hidpp.Response("FF12345678000D123456789ABC", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_FW_VERSION)
+ responses = [fake_hidpp.Response("FF12345678000D123456789ABC", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_FW_VERSION)
unitId, modelId, tid_map = _hidpp20.get_ids(device)
@@ -52,8 +52,8 @@ def test_get_ids():
def test_get_kind():
- responses = [hidpp.Response("00", 0x0420)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_NAME)
+ responses = [fake_hidpp.Response("00", 0x0420)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_NAME)
result = _hidpp20.get_kind(device)
@@ -63,11 +63,11 @@ def test_get_kind():
def test_get_name():
responses = [
- hidpp.Response("12", 0x0400),
- hidpp.Response("4142434445464748494A4B4C4D4E4F", 0x0410, "00"),
- hidpp.Response("505152530000000000000000000000", 0x0410, "0F"),
+ fake_hidpp.Response("12", 0x0400),
+ fake_hidpp.Response("4142434445464748494A4B4C4D4E4F", 0x0410, "00"),
+ fake_hidpp.Response("505152530000000000000000000000", 0x0410, "0F"),
]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_NAME)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_NAME)
result = _hidpp20.get_name(device)
@@ -76,11 +76,11 @@ def test_get_name():
def test_get_friendly_name():
responses = [
- hidpp.Response("12", 0x0400),
- hidpp.Response("004142434445464748494A4B4C4D4E", 0x0410, "00"),
- hidpp.Response("0E4F50515253000000000000000000", 0x0410, "0E"),
+ fake_hidpp.Response("12", 0x0400),
+ fake_hidpp.Response("004142434445464748494A4B4C4D4E", 0x0410, "00"),
+ fake_hidpp.Response("0E4F50515253000000000000000000", 0x0410, "0E"),
]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_FRIENDLY_NAME)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.DEVICE_FRIENDLY_NAME)
result = _hidpp20.get_friendly_name(device)
@@ -88,8 +88,8 @@ def test_get_friendly_name():
def test_get_battery_status():
- responses = [hidpp.Response("502000FFFF", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.BATTERY_STATUS)
+ responses = [fake_hidpp.Response("502000FFFF", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.BATTERY_STATUS)
feature, battery = _hidpp20.get_battery_status(device)
@@ -100,8 +100,8 @@ def test_get_battery_status():
def test_get_battery_voltage():
- responses = [hidpp.Response("1000FFFFFF", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.BATTERY_VOLTAGE)
+ responses = [fake_hidpp.Response("1000FFFFFF", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.BATTERY_VOLTAGE)
feature, battery = _hidpp20.get_battery_voltage(device)
@@ -112,8 +112,8 @@ def test_get_battery_voltage():
def test_get_battery_unified():
- responses = [hidpp.Response("500100FFFF", 0x0410)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.UNIFIED_BATTERY)
+ responses = [fake_hidpp.Response("500100FFFF", 0x0410)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.UNIFIED_BATTERY)
feature, battery = _hidpp20.get_battery_unified(device)
@@ -123,8 +123,8 @@ def test_get_battery_unified():
def test_get_adc_measurement():
- responses = [hidpp.Response("100003", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.ADC_MEASUREMENT)
+ responses = [fake_hidpp.Response("100003", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.ADC_MEASUREMENT)
feature, battery = _hidpp20.get_adc_measurement(device)
@@ -135,8 +135,8 @@ def test_get_adc_measurement():
def test_get_battery():
- responses = [hidpp.Response("502000FFFF", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.BATTERY_STATUS)
+ responses = [fake_hidpp.Response("502000FFFF", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.BATTERY_STATUS)
feature, battery = _hidpp20.get_battery(device, hidpp20_constants.FEATURE.BATTERY_STATUS)
@@ -148,11 +148,11 @@ def test_get_battery():
def test_get_battery_none():
responses = [
- hidpp.Response(None, 0x0000, f"{hidpp20_constants.FEATURE.BATTERY_STATUS:0>4X}"),
- hidpp.Response(None, 0x0000, f"{hidpp20_constants.FEATURE.BATTERY_VOLTAGE:0>4X}"),
- hidpp.Response("500100ffff", 0x0410),
+ fake_hidpp.Response(None, 0x0000, f"{hidpp20_constants.FEATURE.BATTERY_STATUS:0>4X}"),
+ fake_hidpp.Response(None, 0x0000, f"{hidpp20_constants.FEATURE.BATTERY_VOLTAGE:0>4X}"),
+ fake_hidpp.Response("500100ffff", 0x0410),
]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.UNIFIED_BATTERY)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.UNIFIED_BATTERY)
feature, battery = _hidpp20.get_battery(device, None)
@@ -169,8 +169,8 @@ def test_get_battery_none():
def test_get_mouse_pointer_info():
- responses = [hidpp.Response("01000A", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.MOUSE_POINTER)
+ responses = [fake_hidpp.Response("01000A", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.MOUSE_POINTER)
result = _hidpp20.get_mouse_pointer_info(device)
@@ -183,8 +183,8 @@ def test_get_mouse_pointer_info():
def test_get_vertical_scrolling_info():
- responses = [hidpp.Response("01080C", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.VERTICAL_SCROLLING)
+ responses = [fake_hidpp.Response("01080C", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.VERTICAL_SCROLLING)
result = _hidpp20.get_vertical_scrolling_info(device)
@@ -192,8 +192,8 @@ def test_get_vertical_scrolling_info():
def test_get_hi_res_scrolling_info():
- responses = [hidpp.Response("0102", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HI_RES_SCROLLING)
+ responses = [fake_hidpp.Response("0102", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HI_RES_SCROLLING)
mode, resolution = _hidpp20.get_hi_res_scrolling_info(device)
@@ -202,8 +202,8 @@ def test_get_hi_res_scrolling_info():
def test_get_pointer_speed_info():
- responses = [hidpp.Response("0102", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.POINTER_SPEED)
+ responses = [fake_hidpp.Response("0102", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.POINTER_SPEED)
result = _hidpp20.get_pointer_speed_info(device)
@@ -211,8 +211,8 @@ def test_get_pointer_speed_info():
def test_get_lowres_wheel_status():
- responses = [hidpp.Response("01", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.LOWRES_WHEEL)
+ responses = [fake_hidpp.Response("01", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.LOWRES_WHEEL)
result = _hidpp20.get_lowres_wheel_status(device)
@@ -221,11 +221,11 @@ def test_get_lowres_wheel_status():
def test_get_hires_wheel():
responses = [
- hidpp.Response("010C", 0x0400),
- hidpp.Response("05FF", 0x0410),
- hidpp.Response("03FF", 0x0430),
+ fake_hidpp.Response("010C", 0x0400),
+ fake_hidpp.Response("05FF", 0x0410),
+ fake_hidpp.Response("03FF", 0x0430),
]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HIRES_WHEEL)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HIRES_WHEEL)
multi, has_invert, has_ratchet, inv, res, target, ratchet = _hidpp20.get_hires_wheel(device)
@@ -239,8 +239,8 @@ def test_get_hires_wheel():
def test_get_new_fn_inversion():
- responses = [hidpp.Response("0300", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.NEW_FN_INVERSION)
+ responses = [fake_hidpp.Response("0300", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.NEW_FN_INVERSION)
result = _hidpp20.get_new_fn_inversion(device)
@@ -255,25 +255,25 @@ def mock_gethostname(mocker):
@pytest.mark.parametrize(
"responses, expected_result",
[
- ([hidpp.Response(None, 0x0400)], {}),
- ([hidpp.Response("02000000", 0x0400)], {}),
+ ([fake_hidpp.Response(None, 0x0400)], {}),
+ ([fake_hidpp.Response("02000000", 0x0400)], {}),
(
[
- hidpp.Response("03000200", 0x0400),
- hidpp.Response("FF01FFFF05FFFF", 0x0410, "00"),
- hidpp.Response("0000414243444500FFFFFFFFFF", 0x0430, "0000"),
- hidpp.Response("FF01FFFF10FFFF", 0x0410, "01"),
- hidpp.Response("01004142434445464748494A4B4C4D", 0x0430, "0100"),
- hidpp.Response("01134E4F5000FFFFFFFFFFFFFFFFFF", 0x0430, "010E"),
- hidpp.Response("000000000008", 0x0410, "00"),
- hidpp.Response("0208", 0x0440, "000041424344454647"),
+ fake_hidpp.Response("03000200", 0x0400),
+ fake_hidpp.Response("FF01FFFF05FFFF", 0x0410, "00"),
+ fake_hidpp.Response("0000414243444500FFFFFFFFFF", 0x0430, "0000"),
+ fake_hidpp.Response("FF01FFFF10FFFF", 0x0410, "01"),
+ fake_hidpp.Response("01004142434445464748494A4B4C4D", 0x0430, "0100"),
+ fake_hidpp.Response("01134E4F5000FFFFFFFFFFFFFFFFFF", 0x0430, "010E"),
+ fake_hidpp.Response("000000000008", 0x0410, "00"),
+ fake_hidpp.Response("0208", 0x0440, "000041424344454647"),
],
{0: (True, "ABCDEFG"), 1: (True, "ABCDEFGHIJKLMNO")},
),
],
)
def test_get_host_names(responses, expected_result, mock_gethostname):
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HOSTS_INFO)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HOSTS_INFO)
result = _hidpp20.get_host_names(device)
@@ -283,28 +283,28 @@ def test_get_host_names(responses, expected_result, mock_gethostname):
@pytest.mark.parametrize(
"responses, expected_result",
[
- ([hidpp.Response(None, 0x0400)], None),
+ ([fake_hidpp.Response(None, 0x0400)], None),
(
[
- hidpp.Response("03000002", 0x0400),
- hidpp.Response("000000000008", 0x0410, "02"),
- hidpp.Response("020E", 0x0440, "02004142434445464748494A4B4C4D4E"),
+ fake_hidpp.Response("03000002", 0x0400),
+ fake_hidpp.Response("000000000008", 0x0410, "02"),
+ fake_hidpp.Response("020E", 0x0440, "02004142434445464748494A4B4C4D4E"),
],
True,
),
(
[
- hidpp.Response("03000002", 0x0400),
- hidpp.Response("000000000014", 0x0410, "02"),
- hidpp.Response("020E", 0x0440, "02004142434445464748494A4B4C4D4E"),
- hidpp.Response("0214", 0x0440, "020E4F505152535455565758"),
+ fake_hidpp.Response("03000002", 0x0400),
+ fake_hidpp.Response("000000000014", 0x0410, "02"),
+ fake_hidpp.Response("020E", 0x0440, "02004142434445464748494A4B4C4D4E"),
+ fake_hidpp.Response("0214", 0x0440, "020E4F505152535455565758"),
],
True,
),
],
)
def test_set_host_name(responses, expected_result):
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HOSTS_INFO)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.HOSTS_INFO)
result = _hidpp20.set_host_name(device, "ABCDEFGHIJKLMNOPQRSTUVWX")
@@ -312,8 +312,8 @@ def test_set_host_name(responses, expected_result):
def test_get_onboard_mode():
- responses = [hidpp.Response("03FFFFFFFF", 0x0420)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.ONBOARD_PROFILES)
+ responses = [fake_hidpp.Response("03FFFFFFFF", 0x0420)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.ONBOARD_PROFILES)
result = _hidpp20.get_onboard_mode(device)
@@ -321,8 +321,8 @@ def test_get_onboard_mode():
def test_set_onboard_mode():
- responses = [hidpp.Response("03FFFFFFFF", 0x0410, "03")]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.ONBOARD_PROFILES)
+ responses = [fake_hidpp.Response("03FFFFFFFF", 0x0410, "03")]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.ONBOARD_PROFILES)
res = _hidpp20.set_onboard_mode(device, 0x3)
@@ -332,11 +332,11 @@ def test_set_onboard_mode():
@pytest.mark.parametrize(
"responses, expected_result",
[
- ([hidpp.Response("03FFFF", 0x0420)], "1ms"),
+ ([fake_hidpp.Response("03FFFF", 0x0420)], "1ms"),
(
[
- hidpp.Response(None, 0x0000, f"{hidpp20_constants.FEATURE.REPORT_RATE:0>4X}"),
- hidpp.Response("04FFFF", 0x0420),
+ fake_hidpp.Response(None, 0x0000, f"{hidpp20_constants.FEATURE.REPORT_RATE:0>4X}"),
+ fake_hidpp.Response("04FFFF", 0x0420),
],
"500us",
),
@@ -346,7 +346,7 @@ def test_get_polling_rate(
responses,
expected_result,
):
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE)
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE)
result = _hidpp20.get_polling_rate(device)
@@ -354,8 +354,8 @@ def test_get_polling_rate(
def test_get_remaining_pairing():
- responses = [hidpp.Response("03FFFF", 0x0400)]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.REMAINING_PAIRING)
+ responses = [fake_hidpp.Response("03FFFF", 0x0400)]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.REMAINING_PAIRING)
result = _hidpp20.get_remaining_pairing(device)
@@ -363,8 +363,8 @@ def test_get_remaining_pairing():
def test_config_change():
- responses = [hidpp.Response("03FFFF", 0x0410, "02")]
- device = hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.CONFIG_CHANGE)
+ responses = [fake_hidpp.Response("03FFFF", 0x0410, "02")]
+ device = fake_hidpp.Device(responses=responses, feature=hidpp20_constants.FEATURE.CONFIG_CHANGE)
result = _hidpp20.config_change(device, 0x2)
@@ -383,7 +383,7 @@ def test_decipher_battery_status():
def test_decipher_battery_voltage():
- report = b"\x10\x00\xFF\xff\xff"
+ report = b"\x10\x00\xff\xff\xff"
feature, battery = hidpp20.decipher_battery_voltage(report)
diff --git a/tests/logitech_receiver/test_receiver.py b/tests/logitech_receiver/test_receiver.py
index c2b31a2823..1b3978223d 100644
--- a/tests/logitech_receiver/test_receiver.py
+++ b/tests/logitech_receiver/test_receiver.py
@@ -1,16 +1,40 @@
-import platform
-
from dataclasses import dataclass
from functools import partial
from unittest import mock
import pytest
+from logitech_receiver import base
from logitech_receiver import common
from logitech_receiver import exceptions
from logitech_receiver import receiver
-from . import hidpp
+from . import fake_hidpp
+
+
+class LowLevelInterfaceFake:
+ def __init__(self, responses=None):
+ self.responses = responses
+
+ def open_path(self, path):
+ return fake_hidpp.open_path(path)
+
+ def product_information(self, usb_id: int) -> dict:
+ return base.product_information(usb_id)
+
+ def find_paired_node(self, receiver_path: str, index: int, timeout: int):
+ return None
+
+ def request(self, response, *args, **kwargs):
+ func = partial(fake_hidpp.request, self.responses)
+ return func(response, *args, **kwargs)
+
+ def ping(self, response, *args, **kwargs):
+ func = partial(fake_hidpp.ping, self.responses)
+ return func(response, *args, **kwargs)
+
+ def close(self, *args, **kwargs):
+ pass
@pytest.mark.parametrize(
@@ -47,46 +71,39 @@ def mock_request():
yield mock_request
-@pytest.fixture
-def mock_base():
- with mock.patch("logitech_receiver.base.open_path", return_value=None) as mock_open_path:
- with mock.patch("logitech_receiver.base.request", return_value=None) as mock_request:
- yield mock_open_path, mock_request
-
-
responses_unifying = [
- hidpp.Response("000000", 0x8003, "FF"),
- hidpp.Response("000300", 0x8102),
- hidpp.Response("0316CC9CB40506220000000000000000", 0x83B5, "03"),
- hidpp.Response("20200840820402020700000000000000", 0x83B5, "20"),
- hidpp.Response("21211420110400010D1A000000000000", 0x83B5, "21"),
- hidpp.Response("22220840660402010700000000020000", 0x83B5, "22"),
- hidpp.Response("30198E3EB80600000001000000000000", 0x83B5, "30"),
- hidpp.Response("31811119511A40000002000000000000", 0x83B5, "31"),
- hidpp.Response("32112C46EA1E40000003000000000000", 0x83B5, "32"),
- hidpp.Response("400B4D58204D61737465722033000000", 0x83B5, "40"),
- hidpp.Response("41044B35323020202020202020202020", 0x83B5, "41"),
- hidpp.Response("42054372616674000000000000000000", 0x83B5, "42"),
- hidpp.Response("012411", 0x81F1, "01"),
- hidpp.Response("020036", 0x81F1, "02"),
- hidpp.Response("03AAAC", 0x81F1, "03"),
- hidpp.Response("040209", 0x81F1, "04"),
+ fake_hidpp.Response("000000", 0x8003, "FF"),
+ fake_hidpp.Response("000300", 0x8102),
+ fake_hidpp.Response("0316CC9CB40506220000000000000000", 0x83B5, "03"),
+ fake_hidpp.Response("20200840820402020700000000000000", 0x83B5, "20"),
+ fake_hidpp.Response("21211420110400010D1A000000000000", 0x83B5, "21"),
+ fake_hidpp.Response("22220840660402010700000000020000", 0x83B5, "22"),
+ fake_hidpp.Response("30198E3EB80600000001000000000000", 0x83B5, "30"),
+ fake_hidpp.Response("31811119511A40000002000000000000", 0x83B5, "31"),
+ fake_hidpp.Response("32112C46EA1E40000003000000000000", 0x83B5, "32"),
+ fake_hidpp.Response("400B4D58204D61737465722033000000", 0x83B5, "40"),
+ fake_hidpp.Response("41044B35323020202020202020202020", 0x83B5, "41"),
+ fake_hidpp.Response("42054372616674000000000000000000", 0x83B5, "42"),
+ fake_hidpp.Response("012411", 0x81F1, "01"),
+ fake_hidpp.Response("020036", 0x81F1, "02"),
+ fake_hidpp.Response("03AAAC", 0x81F1, "03"),
+ fake_hidpp.Response("040209", 0x81F1, "04"),
]
responses_c534 = [
- hidpp.Response("000000", 0x8003, "FF", handle=0x12),
- hidpp.Response("000209", 0x8102, handle=0x12),
- hidpp.Response("0316CC9CB40502220000000000000000", 0x83B5, "03", handle=0x12),
- hidpp.Response("00000445AB", 0x83B5, "04", handle=0x12),
+ fake_hidpp.Response("000000", 0x8003, "FF", handle=0x12),
+ fake_hidpp.Response("000209", 0x8102, handle=0x12),
+ fake_hidpp.Response("0316CC9CB40502220000000000000000", 0x83B5, "03", handle=0x12),
+ fake_hidpp.Response("00000445AB", 0x83B5, "04", handle=0x12),
]
responses_unusual = [
- hidpp.Response("000000", 0x8003, "FF", handle=0x13),
- hidpp.Response("000300", 0x8102, handle=0x13),
- hidpp.Response("00000445AB", 0x83B5, "04", handle=0x13),
- hidpp.Response("0326CC9CB40508220000000000000000", 0x83B5, "03", handle=0x13),
+ fake_hidpp.Response("000000", 0x8003, "FF", handle=0x13),
+ fake_hidpp.Response("000300", 0x8102, handle=0x13),
+ fake_hidpp.Response("00000445AB", 0x83B5, "04", handle=0x13),
+ fake_hidpp.Response("0326CC9CB40508220000000000000000", 0x83B5, "03", handle=0x13),
]
responses_lacking = [
- hidpp.Response("000000", 0x8003, "FF", handle=0x14),
- hidpp.Response("000300", 0x8102, handle=0x14),
+ fake_hidpp.Response("000000", 0x8003, "FF", handle=0x14),
+ fake_hidpp.Response("000300", 0x8102, handle=0x14),
]
mouse_info = {
@@ -99,37 +116,34 @@ def mock_base():
c534_info = {"kind": common.NamedInt(0, "unknown"), "polling": "", "power_switch": "(unknown)", "serial": None, "wpid": "45AB"}
-@pytest.mark.skipif(platform.system() == "Darwin", reason="Fails on macOS")
@pytest.mark.parametrize(
"device_info, responses, handle, serial, max_devices, ",
[
- (DeviceInfo(None), [], False, None, None),
- (DeviceInfo(11), [], None, None, None),
- (DeviceInfo("11"), responses_unifying, 0x11, "16CC9CB4", 6),
- (DeviceInfo("12", product_id=0xC534), responses_c534, 0x12, "16CC9CB4", 2),
- (DeviceInfo("12", product_id=0xC539), responses_c534, 0x12, "16CC9CB4", 2),
- (DeviceInfo("13"), responses_unusual, 0x13, "26CC9CB4", 1),
- (DeviceInfo("14"), responses_lacking, 0x14, None, 1),
+ (DeviceInfo(path=None), [], False, None, None),
+ (DeviceInfo(path=11), [], None, None, None),
+ (DeviceInfo(path="11"), responses_unifying, 0x11, "16CC9CB4", 6),
+ (DeviceInfo(path="12", product_id=0xC534), responses_c534, 0x12, "16CC9CB4", 2),
+ (DeviceInfo(path="12", product_id=0xC539), responses_c534, 0x12, "16CC9CB4", 2),
+ (DeviceInfo(path="13"), responses_unusual, 0x13, "26CC9CB4", 1),
+ (DeviceInfo(path="14"), responses_lacking, 0x14, None, 1),
],
)
-def test_ReceiverFactory_create_receiver(device_info, responses, handle, serial, max_devices, mock_base):
- mock_base[0].side_effect = hidpp.open_path
- mock_base[1].side_effect = partial(hidpp.request, responses)
+def test_receiver_factory_create_receiver(device_info, responses, handle, serial, max_devices):
+ mock_low_level = LowLevelInterfaceFake(responses)
if handle is False:
with pytest.raises(Exception): # noqa: B017
- r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
+ receiver.create_receiver(mock_low_level, device_info, lambda x: x)
elif handle is None:
- r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
+ r = receiver.create_receiver(mock_low_level, device_info, lambda x: x)
assert r is None
else:
- r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
+ r = receiver.create_receiver(mock_low_level, device_info, lambda x: x)
assert r.handle == handle
assert r.serial == serial
assert r.max_devices == max_devices
-@pytest.mark.skipif(platform.system() == "Darwin", reason="Fails on macOS")
@pytest.mark.parametrize(
"device_info, responses, firmware, codename, remaining_pairings, pairing_info, count",
[
@@ -138,11 +152,10 @@ def test_ReceiverFactory_create_receiver(device_info, responses, handle, serial,
(DeviceInfo("13", product_id=0xCCCC), responses_unusual, None, None, -1, c534_info, 3),
],
)
-def test_ReceiverFactory_props(device_info, responses, firmware, codename, remaining_pairings, pairing_info, count, mock_base):
- mock_base[0].side_effect = hidpp.open_path
- mock_base[1].side_effect = partial(hidpp.request, responses)
+def test_receiver_factory_props(device_info, responses, firmware, codename, remaining_pairings, pairing_info, count):
+ mock_low_level = LowLevelInterfaceFake(responses)
- r = receiver.ReceiverFactory.create_receiver(device_info, lambda x: x)
+ r = receiver.create_receiver(mock_low_level, device_info, lambda x: x)
assert len(r.firmware) == firmware if firmware is not None else firmware is None
assert r.device_codename(2) == codename
@@ -151,7 +164,6 @@ def test_ReceiverFactory_props(device_info, responses, firmware, codename, remai
assert r.count() == count
-@pytest.mark.skipif(platform.system() == "Darwin", reason="Fails on macOS")
@pytest.mark.parametrize(
"device_info, responses, status_str, strng",
[
@@ -160,17 +172,15 @@ def test_ReceiverFactory_props(device_info, responses, firmware, codename, remai
(DeviceInfo("13", product_id=0xCCCC), responses_unusual, "No paired devices.", "