Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
wip
Browse files Browse the repository at this point in the history
MattHag committed May 16, 2024
1 parent ac98f58 commit 6bb5c51
Showing 8 changed files with 68 additions and 63 deletions.
24 changes: 9 additions & 15 deletions lib/logitech_receiver/base.py
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@
from random import getrandbits
from time import time

import hidapi as _hid
import hidapi

from . import base_usb
from . import common
@@ -42,10 +42,6 @@

_hidpp20 = hidpp20.Hidpp20()

#
#
#


def _wired_device(product_id, interface):
return {"vendor_id": 1133, "product_id": product_id, "bus_id": 3, "usb_interface": interface, "isDevice": True}
@@ -140,7 +136,7 @@ def filter_receivers(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_lon

def receivers():
"""Enumerate all the receivers attached to the machine."""
yield from _hid.enumerate(filter_receivers)
yield from hidapi.enumerate(filter_receivers)


def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):
@@ -159,12 +155,12 @@ def filter(bus_id, vendor_id, product_id, hidpp_short=False, hidpp_long=False):

def receivers_and_devices():
"""Enumerate all the receivers and devices directly attached to the machine."""
yield from _hid.enumerate(filter)
yield from hidapi.enumerate(filter)


def notify_on_receivers_glib(callback):
"""Watch for matching devices and notifies the callback on the GLib thread."""
return _hid.monitor_glib(callback, filter)
return hidapi.monitor_glib(callback, filter)


#
@@ -185,7 +181,7 @@ def open_path(path):
:returns: an open receiver handle if this is the right Linux device, or
``None``.
"""
return _hid.open_path(path)
return hidapi.open_path(path)


def open():
@@ -204,13 +200,11 @@ def close(handle):
if handle:
try:
if isinstance(handle, int):
_hid.close(handle)
hidapi.close(handle)
else:
handle.close()
# logger.info("closed receiver handle %r", handle)
return True
except Exception:
# logger.exception("closing receiver handle %r", handle)
pass

return False
@@ -248,7 +242,7 @@ def write(handle, devnumber, data, long_message=False):
)

try:
_hid.write(int(handle), wdata)
hidapi.write(int(handle), wdata)
except Exception as reason:
logger.error("write failed, assuming handle %r no longer available", handle)
close(handle)
@@ -297,7 +291,7 @@ def _read(handle, timeout):
try:
# convert timeout to milliseconds, the hidapi expects it
timeout = int(timeout * 1000)
data = _hid.read(int(handle), _MAX_READ_SIZE, timeout)
data = hidapi.read(int(handle), _MAX_READ_SIZE, timeout)
except Exception as reason:
logger.warning("read failed, assuming handle %r no longer available", handle)
close(handle)
@@ -331,7 +325,7 @@ def _skip_incoming(handle, ihandle, notifications_hook):
while True:
try:
# read whatever is already in the buffer, if any
data = _hid.read(ihandle, _MAX_READ_SIZE, 0)
data = hidapi.read(ihandle, _MAX_READ_SIZE, 0)
except Exception as reason:
logger.error("read failed, assuming receiver %s no longer available", handle)
close(handle)
8 changes: 5 additions & 3 deletions lib/logitech_receiver/common.py
Original file line number Diff line number Diff line change
@@ -14,14 +14,16 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from __future__ import annotations

import binascii
import dataclasses

from collections import namedtuple
from typing import Optional
from typing import Union

import yaml as _yaml
import yaml

from solaar.i18n import _

@@ -355,8 +357,8 @@ def to_yaml(cls, dumper, data):
return dumper.represent_mapping("!NamedInt", {"value": int(data), "name": data.name}, flow_style=True)


_yaml.SafeLoader.add_constructor("!NamedInt", NamedInt.from_yaml)
_yaml.add_representer(NamedInt, NamedInt.to_yaml)
yaml.SafeLoader.add_constructor("!NamedInt", NamedInt.from_yaml)
yaml.add_representer(NamedInt, NamedInt.to_yaml)


class NamedInts:
16 changes: 13 additions & 3 deletions lib/logitech_receiver/device.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@

from typing import Optional

import hidapi as _hid
import hidapi
import solaar.configuration as _configuration

from . import base
@@ -69,7 +69,16 @@ class Device:
read_register = hidpp10.read_register
write_register = hidpp10.write_register

def __init__(self, receiver, number, online, pairing_info=None, handle=None, device_info=None, setting_callback=None):
def __init__(
self,
receiver,
number,
online,
pairing_info=None,
handle=None,
device_info=None,
setting_callback=None,
):
assert receiver or device_info
if receiver:
assert 0 < number <= 15 # some receivers have devices past their max # of devices
@@ -116,7 +125,7 @@ def __init__(self, receiver, number, online, pairing_info=None, handle=None, dev
self.cleanups = [] # functions to run on the device when it is closed

if not self.path:
self.path = _hid.find_paired_node(receiver.path, number, 1) if receiver else None
self.path = hidapi.find_paired_node(receiver.path, number, 1) if receiver else None
if not self.handle:
try:
self.handle = base.open_path(self.path) if self.path else None
@@ -551,3 +560,4 @@ def __str__(self):

def __del__(self):
self.close()
# pass
11 changes: 5 additions & 6 deletions lib/logitech_receiver/diversion.py
Original file line number Diff line number Diff line change
@@ -18,8 +18,7 @@
import logging
import math
import numbers
import os as _os
import os.path as _path
import os
import platform as _platform
import socket
import subprocess
@@ -103,7 +102,7 @@
if logger.isEnabledFor(logging.INFO):
logger.info("GDK Keymap %sset up", "" if gkeymap else "not ")

wayland = _os.getenv("WAYLAND_DISPLAY") # is this Wayland?
wayland = os.getenv("WAYLAND_DISPLAY") # is this Wayland?
if wayland:
logger.warning(
"rules cannot access modifier keys in Wayland, "
@@ -1502,8 +1501,8 @@ def process_notification(device, notification, feature):
GLib.idle_add(evaluate_rules, feature, notification, device)


_XDG_CONFIG_HOME = _os.environ.get("XDG_CONFIG_HOME") or _path.expanduser(_path.join("~", ".config"))
_file_path = _path.join(_XDG_CONFIG_HOME, "solaar", "rules.yaml")
_XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser(os.path.join("~", ".config"))
_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "rules.yaml")

rules = built_in_rules

@@ -1560,7 +1559,7 @@ def load_config_rule_file():
"""Loads user configured rules."""
global rules

if _path.isfile(_file_path):
if os.path.isfile(_file_path):
rules = _load_rule_config(_file_path)


12 changes: 6 additions & 6 deletions lib/logitech_receiver/exceptions.py
Original file line number Diff line number Diff line change
@@ -15,14 +15,14 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

from .common import KwException as _KwException
from .common import KwException

#
# Exceptions that may be raised by this API.
#


class NoReceiver(_KwException):
class NoReceiver(KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
@@ -31,25 +31,25 @@ class NoReceiver(_KwException):
pass


class NoSuchDevice(_KwException):
class NoSuchDevice(KwException):
"""Raised when trying to reach a device number not paired to the receiver."""

pass


class DeviceUnreachable(_KwException):
class DeviceUnreachable(KwException):
"""Raised when a request is made to an unreachable (turned off) device."""

pass


class FeatureNotSupported(_KwException):
class FeatureNotSupported(KwException):
"""Raised when trying to request a feature not supported by the device."""

pass


class FeatureCallError(_KwException):
class FeatureCallError(KwException):
"""Raised if the device replied to a feature call with an error."""

pass
26 changes: 13 additions & 13 deletions lib/logitech_receiver/hidpp20.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
from typing import List
from typing import Optional

import yaml as _yaml
import yaml

from solaar.i18n import _

@@ -949,11 +949,11 @@ def __eq__(self, other):
return type(self) == type(other) and self.to_bytes() == other.to_bytes()

def __str__(self):
return _yaml.dump(self, width=float("inf")).rstrip("\n")
return yaml.dump(self, width=float("inf")).rstrip("\n")


_yaml.SafeLoader.add_constructor("!LEDEffectSetting", LEDEffectSetting.from_yaml)
_yaml.add_representer(LEDEffectSetting, LEDEffectSetting.to_yaml)
yaml.SafeLoader.add_constructor("!LEDEffectSetting", LEDEffectSetting.from_yaml)
yaml.add_representer(LEDEffectSetting, LEDEffectSetting.to_yaml)


class LEDEffectInfo: # an effect that a zone can do
@@ -1132,8 +1132,8 @@ def __repr__(self):
)


_yaml.SafeLoader.add_constructor("!Button", Button.from_yaml)
_yaml.add_representer(Button, Button.to_yaml)
yaml.SafeLoader.add_constructor("!Button", Button.from_yaml)
yaml.add_representer(Button, Button.to_yaml)


class OnboardProfile:
@@ -1172,7 +1172,7 @@ def from_bytes(cls, sector, enabled, buttons, gbuttons, bytes):
po_timeout=_unpack("<H", bytes[30:32])[0],
buttons=[Button.from_bytes(bytes[32 + i * 4 : 32 + i * 4 + 4]) for i in range(0, buttons)],
gbuttons=[Button.from_bytes(bytes[96 + i * 4 : 96 + i * 4 + 4]) for i in range(0, gbuttons)],
name=bytes[160:208].decode("utf-16le").rstrip("\x00").rstrip("\uFFFF"),
name=bytes[160:208].decode("utf-16le").rstrip("\x00").rstrip("\uffff"),
lighting=[LEDEffectSetting.from_bytes(bytes[208 + i * 11 : 219 + i * 11]) for i in range(0, 4)],
)

@@ -1219,8 +1219,8 @@ def dump(self):
print(" G-BUTTON", i + 1, self.gbuttons[i])


_yaml.SafeLoader.add_constructor("!OnboardProfile", OnboardProfile.from_yaml)
_yaml.add_representer(OnboardProfile, OnboardProfile.to_yaml)
yaml.SafeLoader.add_constructor("!OnboardProfile", OnboardProfile.from_yaml)
yaml.add_representer(OnboardProfile, OnboardProfile.to_yaml)

OnboardProfilesVersion = 3

@@ -1248,7 +1248,7 @@ def get_profile_headers(cls, device):
headers = []
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, 0, 0, 0, i)
s = 0x00
if chunk[0:4] == b"\x00\x00\x00\x00" or chunk[0:4] == b"\xFF\xFF\xFF\xFF": # look in ROM instead
if chunk[0:4] == b"\x00\x00\x00\x00" or chunk[0:4] == b"\xff\xff\xff\xff": # look in ROM instead
chunk = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, 0x01, 0, 0, i)
s = 0x01
while chunk[0:2] != b"\xff\xff":
@@ -1337,11 +1337,11 @@ def write(self, device):
return written

def show(self):
print(_yaml.dump(self))
print(yaml.dump(self))


_yaml.SafeLoader.add_constructor("!OnboardProfiles", OnboardProfiles.from_yaml)
_yaml.add_representer(OnboardProfiles, OnboardProfiles.to_yaml)
yaml.SafeLoader.add_constructor("!OnboardProfiles", OnboardProfiles.from_yaml)
yaml.add_representer(OnboardProfiles, OnboardProfiles.to_yaml)


def feature_request(device, feature, function=0x00, *params, no_reply=False):
22 changes: 11 additions & 11 deletions lib/logitech_receiver/receiver.py
Original file line number Diff line number Diff line change
@@ -15,23 +15,23 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import errno as _errno
import errno
import logging
import time

from dataclasses import dataclass
from typing import Optional

import hidapi as _hid
import hidapi

from solaar.i18n import _
from solaar.i18n import ngettext

from . import base as _base
from . import base
from . import common
from . import exceptions
from . import hidpp10
from . import hidpp10_constants
from .common import ALERT
from .device import Device

logger = logging.getLogger(__name__)
@@ -105,12 +105,12 @@ def close(self):
if d:
d.close()
self._devices.clear()
return handle and _base.close(handle)
return handle and base.close(handle)

def __del__(self):
self.close()

def changed(self, alert=ALERT.NOTIFICATION, reason=None):
def changed(self, alert=common.ALERT.NOTIFICATION, reason=None):
"""The status of the device had changed, so invoke the status callback"""
if self.status_callback is not None:
self.status_callback(self, alert=alert, reason=reason)
@@ -250,7 +250,7 @@ def count(self):

def request(self, request_id, *params):
if bool(self):
return _base.request(self.handle, 0xFF, request_id, *params)
return base.request(self.handle, 0xFF, request_id, *params)

def reset_pairing(self):
self.pairing = Pairing()
@@ -451,7 +451,7 @@ def notification_information(self, number, notification):
return online, encrypted, wpid, kind

def device_pairing_information(self, number: int) -> dict:
wpid = _hid.find_paired_node_wpid(self.path, number) # extract WPID from udev path
wpid = hidapi.find_paired_node_wpid(self.path, number) # extract WPID from udev path
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
@@ -491,9 +491,9 @@ def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]:
"""Opens a Logitech Receiver found attached to the machine, by Linux device path."""

try:
handle = _base.open_path(device_info.path)
handle = base.open_path(device_info.path)
if handle:
product_info = _base.product_information(device_info.product_id)
product_info = base.product_information(device_info.product_id)
if not product_info:
logger.warning("Unknown receiver type: %s", device_info.product_id)
product_info = {}
@@ -502,7 +502,7 @@ def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]:
return rclass(kind, product_info, handle, device_info.path, device_info.product_id, setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
if e.errno == errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)
12 changes: 6 additions & 6 deletions lib/logitech_receiver/special_keys.py
Original file line number Diff line number Diff line change
@@ -17,15 +17,15 @@
# Reprogrammable keys information
# Mostly from Logitech documentation, but with some edits for better Lunix compatibility

import os as _os
import os

import yaml as _yaml
import yaml

from .common import NamedInts as _NamedInts
from .common import UnsortedNamedInts as _UnsortedNamedInts

_XDG_CONFIG_HOME = _os.environ.get("XDG_CONFIG_HOME") or _os.path.expanduser(_os.path.join("~", ".config"))
_keys_file_path = _os.path.join(_XDG_CONFIG_HOME, "solaar", "keys.yaml")
_XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME") or os.path.expanduser(os.path.join("~", ".config"))
_keys_file_path = os.path.join(_XDG_CONFIG_HOME, "solaar", "keys.yaml")

# <controls.xml awk -F\" '/<Control /{sub(/^LD_FINFO_(CTRLID_)?/, "", $2);printf("\t%s=0x%04X,\n", $2, $4)}' | sort -t= -k2
CONTROL = _NamedInts(
@@ -1529,9 +1529,9 @@ def persistent_keys(action_ids):

# load in override dictionary for KEYCODES
try:
if _os.path.isfile(_keys_file_path):
if os.path.isfile(_keys_file_path):
with open(_keys_file_path) as keys_file:
keys = _yaml.safe_load(keys_file)
keys = yaml.safe_load(keys_file)
if isinstance(keys, dict):
keys = _NamedInts(**keys)
for k in KEYCODES:

0 comments on commit 6bb5c51

Please sign in to comment.