Skip to content

Commit

Permalink
device: clean up receiver code
Browse files Browse the repository at this point in the history
  • Loading branch information
pfps committed Feb 22, 2024
1 parent 320d880 commit 1d2c908
Showing 1 changed file with 29 additions and 38 deletions.
67 changes: 29 additions & 38 deletions lib/logitech_receiver/receiver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- python-mode -*-

## Copyright (C) 2012-2013 Daniel Pavel
## Copyright (C) 2014-2024 Solaar Contributors https://pwr-solaar.github.io/Solaar/
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
Expand All @@ -22,21 +21,13 @@
import hidapi as _hid

from . import base as _base
from . import exceptions
from . import hidpp10 as _hidpp10
from . import hidpp10_constants as _hidpp10_constants
from .base import product_information as _product_information
from .common import strhex as _strhex
from . import exceptions, hidpp10, hidpp10_constants
from .device import Device

logger = logging.getLogger(__name__)

_R = _hidpp10_constants.REGISTERS
_IR = _hidpp10_constants.INFO_SUBREGISTERS

#
#
#
_R = hidpp10_constants.REGISTERS
_IR = hidpp10_constants.INFO_SUBREGISTERS


class Receiver:
Expand All @@ -55,7 +46,7 @@ def __init__(self, handle, path, product_id, setting_callback=None):
self.path = path
self.product_id = product_id
self.setting_callback = setting_callback
product_info = _product_information(self.product_id)
product_info = _base.product_information(self.product_id)
if not product_info:
logger.warning("Unknown receiver type: %s", self.product_id)
product_info = {}
Expand All @@ -64,13 +55,13 @@ def __init__(self, handle, path, product_id, setting_callback=None):
# read the serial immediately, so we can find out max_devices
if self.receiver_kind == "bolt":
serial_reply = self.read_register(_R.bolt_uniqueId)
self.serial = _strhex(serial_reply)
self.serial = serial_reply.hex().upper()
self.max_devices = product_info.get("max_devices", 1)
self.may_unpair = product_info.get("may_unpair", False)
else:
serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information)
if serial_reply:
self.serial = _strhex(serial_reply[1:5])
self.serial = serial_reply[1:5].hex().upper()
self.max_devices = ord(serial_reply[6:7])
if self.max_devices <= 0 or self.max_devices > 6:
self.max_devices = product_info.get("max_devices", 1)
Expand Down Expand Up @@ -107,7 +98,7 @@ def __del__(self):
@property
def firmware(self):
if self._firmware is None and self.handle:
self._firmware = _hidpp10.get_firmware(self)
self._firmware = hidpp10.get_firmware(self)
return self._firmware

# how many pairings remain (None for unknown, -1 for unlimited)
Expand All @@ -127,19 +118,19 @@ def enable_connection_notifications(self, enable=True):

if enable:
set_flag_bits = (
_hidpp10_constants.NOTIFICATION_FLAG.battery_status
| _hidpp10_constants.NOTIFICATION_FLAG.wireless
| _hidpp10_constants.NOTIFICATION_FLAG.software_present
hidpp10_constants.NOTIFICATION_FLAG.battery_status
| hidpp10_constants.NOTIFICATION_FLAG.wireless
| hidpp10_constants.NOTIFICATION_FLAG.software_present
)
else:
set_flag_bits = 0
ok = _hidpp10.set_notification_flags(self, set_flag_bits)
ok = hidpp10.set_notification_flags(self, set_flag_bits)
if ok is None:
logger.warning("%s: failed to %s receiver notifications", self, "enable" if enable else "disable")
return None

flag_bits = _hidpp10.get_notification_flags(self)
flag_names = None if flag_bits is None else tuple(_hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits))
flag_bits = hidpp10.get_notification_flags(self)
flag_names = None if flag_bits is None else tuple(hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits))
if logger.isEnabledFor(logging.INFO):
logger.info("%s: receiver notifications %s => %s", self, "enabled" if enable else "disabled", flag_names)
return flag_bits
Expand All @@ -161,9 +152,9 @@ def device_pairing_information(self, n: int) -> dict:
if self.receiver_kind == "bolt":
pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n)
if pair_info:
wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3])
kind = _hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F]
serial = _strhex(pair_info[4:8])
wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F]
serial = pair_info[4:8].hex().upper()
return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"}
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register")
Expand All @@ -172,31 +163,31 @@ def device_pairing_information(self, n: int) -> dict:
power_switch = "(unknown)"
pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1)
if pair_info: # either a Unifying receiver or a Unifying-ready receiver
wpid = _strhex(pair_info[3:5])
kind = _hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F]
wpid = pair_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F]
polling_rate = str(ord(pair_info[2:3])) + "ms"
elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path
wpid = _hid.find_paired_node_wpid(self.path, n)
if not wpid:
logger.error("Unable to get wpid from udev for device %d of %s", n, self)
raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device")
kind = _hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)]
kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)]
elif not self.receiver_kind == "unifying": # may be an old Nano receiver
device_info = self.read_register(_R.receiver_info, 0x04)
if device_info:
wpid = _strhex(device_info[3:5])
kind = _hidpp10_constants.DEVICE_KIND[0x00] # unknown kind
wpid = device_info[3:5].hex().upper()
kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying")
else:
raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information")
pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1)
if pair_info:
power_switch = _hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F]
power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F]
else: # some Nano receivers?
pair_info = self.read_register(0x2D5)
if pair_info:
serial = _strhex(pair_info[1:5])
serial = pair_info[1:5].hex().upper()
return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch}

def get_kind_from_index(self, index):
Expand Down Expand Up @@ -233,12 +224,12 @@ def register_new_device(self, number, notification=None):
if notification is not None:
online = not bool(ord(notification.data[0:1]) & 0x40)
# the rest may be redundant, but keep it around for now
info["wpid"] = _strhex(notification.data[2:3] + notification.data[1:2])
info["wpid"] = (notification.data[2:3] + notification.data[1:2]).hex().upper()
kind = ord(notification.data[0:1]) & 0x0F
if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index
info["wpid"] = "00" + _strhex(notification.data[2:3])
info["wpid"] = "00" + notification.data[2:3].hex().upper()
kind = self.get_kind_from_index(number)
info["kind"] = _hidpp10_constants.DEVICE_KIND[kind]
info["kind"] = hidpp10_constants.DEVICE_KIND[kind]
else:
online = True
dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback)
Expand Down Expand Up @@ -289,8 +280,8 @@ def request(self, request_id, *params):
if bool(self):
return _base.request(self.handle, 0xFF, request_id, *params)

read_register = _hidpp10.read_register
write_register = _hidpp10.write_register
read_register = hidpp10.read_register
write_register = hidpp10.write_register

def __iter__(self):
connected_devices = self.count()
Expand Down

0 comments on commit 1d2c908

Please sign in to comment.