Skip to content

Commit

Permalink
receiver: move more method code to subclasses
Browse files Browse the repository at this point in the history
  • Loading branch information
pfps committed Mar 5, 2024
1 parent 880e7cc commit b1bdb17
Showing 1 changed file with 120 additions and 108 deletions.
228 changes: 120 additions & 108 deletions lib/logitech_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import errno as _errno
import logging

from typing import Optional

import hidapi as _hid

from . import base as _base
Expand All @@ -31,45 +33,8 @@
_IR = hidpp10_constants.INFO_SUBREGISTERS


class ReceiverFactory:
@staticmethod
def create_receiver(device_info, setting_callback=None):
"""Opens a Logitech Receiver found attached to the machine, by Linux device path.
:returns: An open file handle for the found receiver, or ``None``.
"""
try:
product_info = _base.product_information(device_info.product_id)
if not product_info:
logger.warning("Unknown receiver type: %s", device_info.product_id)
product_info = {}

handle = _base.open_path(device_info.path)
if handle:
receiver_kind = product_info.get("receiver_kind", "unknown")
if receiver_kind == "bolt":
return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "unifying":
return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "lightspeed":
return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "nano":
return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
elif receiver_kind == "27Mhz":
return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
else:
return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)


class Receiver:
"""A Unifying Receiver instance.
"""A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers"
The paired devices are available through the sequence interface.
"""

Expand All @@ -86,36 +51,25 @@ def __init__(self, receiver_kind, product_info, handle, path, product_id, settin
self.receiver_kind = receiver_kind
self.serial = None
self.max_devices = None
self.may_unpair = None

self._firmware = None
self._devices = {}
self._remaining_pairings = None

self._devices = {}
self.name = product_info.get("name", "Receiver")
self.may_unpair = product_info.get("may_unpair", False)
self.re_pairs = product_info.get("re_pairs", False)
self._str = "<%s(%s,%s%s)>" % (
self.name.replace(" ", ""),
self.path,
"" if isinstance(self.handle, int) else "T",
self.handle,
)

self.initialize(product_info)

def initialize(self, product_info: dict):
# read the serial immediately, so we can find out max_devices
# read the receiver information subregister, so we can find out max_devices
serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information)
if serial_reply:
self.serial = serial_reply[1:5].hex().upper()
self.max_devices = ord(serial_reply[6:7])
self.max_devices = serial_reply[6]
if self.max_devices <= 0 or self.max_devices > 6:
self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)
else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers)
else: # handle receivers that don't have a serial number specially (i.e., c534)
self.serial = None
self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)

def close(self):
handle, self.handle = self.handle, None
Expand Down Expand Up @@ -174,63 +128,52 @@ def device_codename(self, n):
codename = codename[2 : 2 + ord(codename[1:2])]
return codename.decode("ascii")

def notify_devices(self):
"""Scan all devices."""
if self.handle:
if not self.write_register(_R.receiver_connection, 0x02):
logger.warning("%s: failed to trigger device link notifications", self)

def notification_information(self, number, notification):
"""Extract information from unifying-style notification"""
assert notification.address != 0x02
online = not bool(notification.data[0] & 0x40)
encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10
kind = hidpp10_constants.DEVICE_KIND[notification.data[0] & 0x0F]
wpid = (notification.data[2:3] + notification.data[1:2]).hex().upper()
return online, encrypted, wpid, kind

def device_pairing_information(self, n: int) -> dict:
"""Return information from pairing registers (and elsewhere when necessary)"""
polling_rate = ""
serial = None
power_switch = "(unknown)"
pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1)
if pair_info: # either a Unifying receiver or a Unifying-ready receiver
if pair_info: # a receiver that uses Unifying-style pairing registers
wpid = pair_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F]
polling_rate = str(ord(pair_info[2:3])) + "ms"
elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path
wpid = _hid.find_paired_node_wpid(self.path, n)
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", n, self)
raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device")
kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)]
polling_rate = str(pair_info[2]) + "ms"
elif not self.receiver_kind == "unifying": # may be an old Nano receiver
device_info = self.read_register(_R.receiver_info, 0x04)
device_info = self.read_register(_R.receiver_info, 0x04) # undocumented
if device_info:
logger.warning("using undocumented register for device wpid")
wpid = device_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")

pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1)
if pair_info:
power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F]
else: # some Nano receivers?
pair_info = self.read_register(0x2D5)
if pair_info:
serial = pair_info[1:5].hex().upper()
else: # some Nano receivers?
pair_info = self.read_register(0x2D5) # undocumented and questionable
if pair_info:
logger.warning("using undocumented register for device serial number")
serial = pair_info[1:5].hex().upper()
return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch}

def get_kind_from_index(self, index):
"""Get device kind from 27Mhz device index"""
# From drivers/hid/hid-logitech-dj.c
if index == 1: # mouse
kind = 2
elif index == 2: # mouse
kind = 2
elif index == 3: # keyboard
kind = 1
elif index == 4: # numpad
kind = 3
else: # unknown device number on 27Mhz receiver
logger.error("failed to calculate device kind for device %d of %s", index, self)
raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number")
return kind

def notify_devices(self):
"""Scan all devices."""
if self.handle:
if not self.write_register(_R.receiver_connection, 0x02):
logger.warning("%s: failed to trigger device link notifications", self)

def register_new_device(self, number, notification=None):
if self._devices.get(number) is not None:
raise IndexError("%s: device number %d already registered" % (self, number))
Expand All @@ -241,14 +184,15 @@ def register_new_device(self, number, notification=None):
try:
info = self.device_pairing_information(number)
if notification is not None:
online = not bool(ord(notification.data[0:1]) & 0x40)
# the rest may be redundant, but keep it around for now
info["wpid"] = (notification.data[2:3] + notification.data[1:2]).hex().upper()
kind = ord(notification.data[0:1]) & 0x0F
if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index
info["wpid"] = "00" + notification.data[2:3].hex().upper()
kind = self.get_kind_from_index(number)
info["kind"] = hidpp10_constants.DEVICE_KIND[kind]
online, _e, nwpid, nkind = self.notification_information(number, notification)
if info["wpid"] is None:
info["wpid"] = nwpid
elif nwpid is not None and info["wpid"] != nwpid:
logger.warning("mismatch on device WPID %s %s", info["wpid"], nwpid)
if info["kind"] is None:
info["kind"] = nkind
elif nkind is not None and info["kind"] != nkind:
logger.warning("mismatch on device kind %s %s", info["kind"], nkind)
else:
online = True
dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback)
Expand All @@ -270,19 +214,10 @@ def set_lock(self, lock_closed=True, device=0, timeout=0):
return True
logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open")

def discover(self, cancel=False, timeout=30):
pass

def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20):
pass

def count(self):
count = self.read_register(_R.receiver_connection)
return 0 if count is None else ord(count[1:2])

# def has_devices(self):
# return len(self) > 0 or self.count() > 0

def request(self, request_id, *params):
if bool(self):
return _base.request(self.handle, 0xFF, request_id, *params)
Expand Down Expand Up @@ -378,22 +313,28 @@ def __hash__(self):
return self.path.__hash__()

def __str__(self):
return self._str
return "<%s(%s,%s%s)>" % (
self.name.replace(" ", ""),
self.path,
"" if isinstance(self.handle, int) else "T",
self.handle,
)

__repr__ = __str__

__bool__ = __nonzero__ = lambda self: self.handle is not None


class BoltReceiver(Receiver):
"""Bolt receivers use a different pairing prototol and have different pairing registers"""

def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("bolt", product_info, handle, path, product_id, setting_callback)

def initialize(self, product_info: dict):
serial_reply = self.read_register(_R.bolt_uniqueId)
self.serial = serial_reply.hex().upper()
self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)

def device_codename(self, n):
codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01)
Expand Down Expand Up @@ -450,5 +391,76 @@ def __init__(self, product_info, handle, path, product_id, setting_callback=None


class Ex100Receiver(Receiver):
"""A very old style receiver, somewhat different from newer receivers"""

def __init__(self, product_info, handle, path, product_id, setting_callback=None):
super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback)

def initialize(self, product_info: dict):
self.serial = None
self.max_devices = product_info.get("max_devices", 1)

def notification_information(self, number, notification):
"""Extract information from 27Mz-style notification and device index"""
assert notification.address == 0x02
online = True
encrypted = bool(notification.data[0] & 0x80)
kind = hidpp10_constants.DEVICE_KIND(self.get_kind_from_index(number))
wpid = "00" + notification.data[2:3].hex().upper()
return online, encrypted, wpid, kind

def device_pairing_information(self, number: int) -> dict:
wpid = _hid.find_paired_node_wpid(self.path, number) # extract WPID from udev path
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", number, self)
raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device")
kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(number)]
return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"}

def get_kind_from_index(self, index):
"""Get device kind from 27Mhz device index"""
# From drivers/hid/hid-logitech-dj.c
if index == 1: # mouse
kind = 2
elif index == 2: # mouse
kind = 2
elif index == 3: # keyboard
kind = 1
elif index == 4: # numpad
kind = 3
else: # unknown device number on 27Mhz receiver
logger.error("failed to calculate device kind for device %d of %s", index, self)
raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number")
return kind


receiver_class_mapping = {
"bolt": BoltReceiver,
"unifying": UnifyingReceiver,
"lightspeed": LightSpeedReceiver,
"nano": NanoReceiver,
"27Mhz": Ex100Receiver,
}


class ReceiverFactory:
@staticmethod
def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]:
"""Opens a Logitech Receiver found attached to the machine, by Linux device path."""

try:
handle = _base.open_path(device_info.path)
if handle:
product_info = _base.product_information(device_info.product_id)
if not product_info:
logger.warning("Unknown receiver type: %s", device_info.product_id)
product_info = {}
receiver_kind = product_info.get("receiver_kind", "unknown")
receiver_class = receiver_class_mapping.get(receiver_kind, Receiver)
return receiver_class(product_info, handle, device_info.path, device_info.product_id, setting_callback)
except OSError as e:
logger.exception("open %s", device_info)
if e.errno == _errno.EACCES:
raise
except Exception:
logger.exception("open %s", device_info)

0 comments on commit b1bdb17

Please sign in to comment.