From a25008b58d24153fa06fa0efcf996347ab4690ec Mon Sep 17 00:00:00 2001 From: Matthias Hagmann <16444067+MattHag@users.noreply.github.com> Date: Sun, 3 Mar 2024 21:43:52 +0100 Subject: [PATCH 1/2] receiver: create subclasses of receiver for different variants Related #2350 --- lib/logitech_receiver/receiver.py | 182 +++++++++++++++++++----------- 1 file changed, 119 insertions(+), 63 deletions(-) diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index a0c97c7810..b28bf88730 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -46,7 +46,19 @@ def create_receiver(device_info, setting_callback=None): handle = _base.open_path(device_info.path) if handle: - return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + receiver_kind = product_info.get("receiver_kind", "unknown") + if receiver_kind == "bolt": + return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "unifying": + return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "lightspeed": + return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "nano": + return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + elif receiver_kind == "27Mhz": + return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) + else: + return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) except OSError as e: logger.exception("open %s", device_info) if e.errno == _errno.EACCES: @@ -64,33 +76,21 @@ class Receiver: number = 0xFF kind = None - def __init__(self, product_info, handle, path, product_id, setting_callback=None): + def __init__(self, receiver_kind, product_info, handle, path, product_id, setting_callback=None): assert handle self.isDevice = False # some devices act as receiver so we need a property to distinguish them self.handle = handle self.path = path self.product_id = product_id self.setting_callback = setting_callback - self.receiver_kind = product_info.get("receiver_kind", "unknown") + self.receiver_kind = receiver_kind + self.serial = None + self.max_devices = None + self.may_unpair = None - # read the serial immediately, so we can find out max_devices - if self.receiver_kind == "bolt": - serial_reply = self.read_register(_R.bolt_uniqueId) - self.serial = serial_reply.hex().upper() - self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) - else: - serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) - if serial_reply: - self.serial = serial_reply[1:5].hex().upper() - self.max_devices = ord(serial_reply[6:7]) - if self.max_devices <= 0 or self.max_devices > 6: - self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) - else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers) - self.serial = None - self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) + self._firmware = None + self._devices = {} + self._remaining_pairings = None self.name = product_info.get("name", "Receiver") self.re_pairs = product_info.get("re_pairs", False) @@ -101,9 +101,21 @@ def __init__(self, product_info, handle, path, product_id, setting_callback=None self.handle, ) - self._firmware = None - self._devices = {} - self._remaining_pairings = None + self.initialize(product_info) + + def initialize(self, product_info: dict): + # read the serial immediately, so we can find out max_devices + serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) + if serial_reply: + self.serial = serial_reply[1:5].hex().upper() + self.max_devices = ord(serial_reply[6:7]) + if self.max_devices <= 0 or self.max_devices > 6: + self.max_devices = product_info.get("max_devices", 1) + self.may_unpair = product_info.get("may_unpair", False) + else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers) + self.serial = None + self.max_devices = product_info.get("max_devices", 1) + self.may_unpair = product_info.get("may_unpair", False) def close(self): handle, self.handle = self.handle, None @@ -157,28 +169,13 @@ def enable_connection_notifications(self, enable=True): return flag_bits def device_codename(self, n): - if self.receiver_kind == "bolt": - codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01) - if codename: - codename = codename[3 : 3 + min(14, ord(codename[2:3]))] - return codename.decode("ascii") - else: - codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1) - if codename: - codename = codename[2 : 2 + ord(codename[1:2])] - return codename.decode("ascii") + codename = self.read_register(_R.receiver_info, _IR.device_name + n - 1) + if codename: + codename = codename[2 : 2 + ord(codename[1:2])] + return codename.decode("ascii") def device_pairing_information(self, n: int) -> dict: """Return information from pairing registers (and elsewhere when necessary)""" - if self.receiver_kind == "bolt": - pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) - if pair_info: - wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper() - kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] - serial = pair_info[4:8].hex().upper() - return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"} - else: - raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register") polling_rate = "" serial = None power_switch = "(unknown)" @@ -202,6 +199,7 @@ def device_pairing_information(self, n: int) -> dict: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") + pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) if pair_info: power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] @@ -272,23 +270,11 @@ def set_lock(self, lock_closed=True, device=0, timeout=0): return True logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open") - def discover(self, cancel=False, timeout=30): # Bolt device discovery - assert self.receiver_kind == "bolt" - if self.handle: - action = 0x02 if cancel else 0x01 - reply = self.write_register(_R.bolt_device_discovery, timeout, action) - if reply: - return True - logger.warning("%s: failed to %s device discovery", self, "cancel" if cancel else "start") + def discover(self, cancel=False, timeout=30): + pass - def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): # Bolt pairing - assert self.receiver_kind == "bolt" - if self.handle: - action = 0x01 if pair is True else 0x03 if pair is False else 0x02 - reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy) - if reply: - return True - logger.warning("%s: failed to %s device %s", self, "pair" if pair else "unpair", address) + def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): + pass def count(self): count = self.read_register(_R.receiver_connection) @@ -356,10 +342,7 @@ def _unpair_device(self, key, force=False): del self._devices[key] logger.warning("%s removed device %s", self, dev) else: - if self.receiver_kind == "bolt": - reply = self.write_register(_R.bolt_pairing, 0x03, key) - else: - reply = self.write_register(_R.receiver_pairing, 0x03, key) + reply = self._unpair_device_per_receiver(key) if reply: # invalidate the device dev.online = False @@ -372,6 +355,10 @@ def _unpair_device(self, key, force=False): logger.error("%s failed to unpair device %s", self, dev) raise Exception("failed to unpair device %s: %s" % (dev.name, key)) + def _unpair_device_per_receiver(self, key): + """Receiver specific unpairing.""" + return self.write_register(_R.receiver_pairing, 0x03, key) + def __len__(self): return len([d for d in self._devices.values() if d is not None]) @@ -396,3 +383,72 @@ def __str__(self): __repr__ = __str__ __bool__ = __nonzero__ = lambda self: self.handle is not None + + +class BoltReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("bolt", product_info, handle, path, product_id, setting_callback) + + def initialize(self, product_info: dict): + serial_reply = self.read_register(_R.bolt_uniqueId) + self.serial = serial_reply.hex().upper() + self.max_devices = product_info.get("max_devices", 1) + self.may_unpair = product_info.get("may_unpair", False) + + def device_codename(self, n): + codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01) + if codename: + codename = codename[3 : 3 + min(14, ord(codename[2:3]))] + return codename.decode("ascii") + + def device_pairing_information(self, n: int) -> dict: + pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) + if pair_info: + wpid = (pair_info[3:4] + pair_info[2:3]).hex().upper() + kind = hidpp10_constants.DEVICE_KIND[pair_info[1] & 0x0F] + serial = pair_info[4:8].hex().upper() + return {"wpid": wpid, "kind": kind, "polling": None, "serial": serial, "power_switch": "(unknown)"} + else: + raise exceptions.NoSuchDevice(number=n, receiver=self, error="can't read Bolt pairing register") + + def discover(self, cancel=False, timeout=30): + """Discover Logitech Bolt devices.""" + if self.handle: + action = 0x02 if cancel else 0x01 + reply = self.write_register(_R.bolt_device_discovery, timeout, action) + if reply: + return True + logger.warning("%s: failed to %s device discovery", self, "cancel" if cancel else "start") + + def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): + """Pair a Bolt device.""" + if self.handle: + action = 0x01 if pair is True else 0x03 if pair is False else 0x02 + reply = self.write_register(_R.bolt_pairing, action, slot, address, authentication, entropy) + if reply: + return True + logger.warning("%s: failed to %s device %s", self, "pair" if pair else "unpair", address) + + def _unpair_device_per_receiver(self, key): + """Receiver specific unpairing.""" + return self.write_register(_R.bolt_pairing, 0x03, key) + + +class UnifyingReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("unifying", product_info, handle, path, product_id, setting_callback) + + +class NanoReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("nano", product_info, handle, path, product_id, setting_callback) + + +class LightSpeedReceiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("lightspeed", product_info, handle, path, product_id, setting_callback) + + +class Ex100Receiver(Receiver): + def __init__(self, product_info, handle, path, product_id, setting_callback=None): + super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback) From dea496f117ddd560254761a8381d912aa5861e9f Mon Sep 17 00:00:00 2001 From: "Peter F. Patel-Schneider" Date: Tue, 5 Mar 2024 14:16:20 -0500 Subject: [PATCH 2/2] receiver: move more method code to subclasses --- lib/logitech_receiver/receiver.py | 228 ++++++++++++++++-------------- 1 file changed, 120 insertions(+), 108 deletions(-) diff --git a/lib/logitech_receiver/receiver.py b/lib/logitech_receiver/receiver.py index b28bf88730..2dc71dc998 100644 --- a/lib/logitech_receiver/receiver.py +++ b/lib/logitech_receiver/receiver.py @@ -18,6 +18,8 @@ import errno as _errno import logging +from typing import Optional + import hidapi as _hid from . import base as _base @@ -31,45 +33,8 @@ _IR = hidpp10_constants.INFO_SUBREGISTERS -class ReceiverFactory: - @staticmethod - def create_receiver(device_info, setting_callback=None): - """Opens a Logitech Receiver found attached to the machine, by Linux device path. - - :returns: An open file handle for the found receiver, or ``None``. - """ - try: - product_info = _base.product_information(device_info.product_id) - if not product_info: - logger.warning("Unknown receiver type: %s", device_info.product_id) - product_info = {} - - handle = _base.open_path(device_info.path) - if handle: - receiver_kind = product_info.get("receiver_kind", "unknown") - if receiver_kind == "bolt": - return BoltReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "unifying": - return UnifyingReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "lightspeed": - return LightSpeedReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "nano": - return NanoReceiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - elif receiver_kind == "27Mhz": - return Ex100Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - else: - return Receiver(product_info, handle, device_info.path, device_info.product_id, setting_callback) - except OSError as e: - logger.exception("open %s", device_info) - if e.errno == _errno.EACCES: - raise - except Exception: - logger.exception("open %s", device_info) - - class Receiver: - """A Unifying Receiver instance. - + """A generic Receiver instance, mostly implementing the interface used on Unifying, Nano, and LightSpeed receivers" The paired devices are available through the sequence interface. """ @@ -86,36 +51,25 @@ def __init__(self, receiver_kind, product_info, handle, path, product_id, settin self.receiver_kind = receiver_kind self.serial = None self.max_devices = None - self.may_unpair = None - self._firmware = None - self._devices = {} self._remaining_pairings = None - + self._devices = {} self.name = product_info.get("name", "Receiver") + self.may_unpair = product_info.get("may_unpair", False) self.re_pairs = product_info.get("re_pairs", False) - self._str = "<%s(%s,%s%s)>" % ( - self.name.replace(" ", ""), - self.path, - "" if isinstance(self.handle, int) else "T", - self.handle, - ) - self.initialize(product_info) def initialize(self, product_info: dict): - # read the serial immediately, so we can find out max_devices + # read the receiver information subregister, so we can find out max_devices serial_reply = self.read_register(_R.receiver_info, _IR.receiver_information) if serial_reply: self.serial = serial_reply[1:5].hex().upper() - self.max_devices = ord(serial_reply[6:7]) + self.max_devices = serial_reply[6] if self.max_devices <= 0 or self.max_devices > 6: self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) - else: # handle receivers that don't have a serial number specially (i.e., c534 and Bolt receivers) + else: # handle receivers that don't have a serial number specially (i.e., c534) self.serial = None self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) def close(self): handle, self.handle = self.handle, None @@ -174,63 +128,52 @@ def device_codename(self, n): codename = codename[2 : 2 + ord(codename[1:2])] return codename.decode("ascii") + def notify_devices(self): + """Scan all devices.""" + if self.handle: + if not self.write_register(_R.receiver_connection, 0x02): + logger.warning("%s: failed to trigger device link notifications", self) + + def notification_information(self, number, notification): + """Extract information from unifying-style notification""" + assert notification.address != 0x02 + online = not bool(notification.data[0] & 0x40) + encrypted = bool(notification.data[0] & 0x20) or notification.address == 0x10 + kind = hidpp10_constants.DEVICE_KIND[notification.data[0] & 0x0F] + wpid = (notification.data[2:3] + notification.data[1:2]).hex().upper() + return online, encrypted, wpid, kind + def device_pairing_information(self, n: int) -> dict: """Return information from pairing registers (and elsewhere when necessary)""" polling_rate = "" serial = None power_switch = "(unknown)" pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) - if pair_info: # either a Unifying receiver or a Unifying-ready receiver + if pair_info: # a receiver that uses Unifying-style pairing registers wpid = pair_info[3:5].hex().upper() kind = hidpp10_constants.DEVICE_KIND[pair_info[7] & 0x0F] - polling_rate = str(ord(pair_info[2:3])) + "ms" - elif self.receiver_kind == "27Mz": # 27Mhz receiver, extract WPID from udev path - wpid = _hid.find_paired_node_wpid(self.path, n) - if not wpid: - logger.error("Unable to get wpid from udev for device %d of %s", n, self) - raise exceptions.NoSuchDevice(number=n, receiver=self, error="Not present 27Mhz device") - kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(n)] + polling_rate = str(pair_info[2]) + "ms" elif not self.receiver_kind == "unifying": # may be an old Nano receiver - device_info = self.read_register(_R.receiver_info, 0x04) + device_info = self.read_register(_R.receiver_info, 0x04) # undocumented if device_info: + logger.warning("using undocumented register for device wpid") wpid = device_info[3:5].hex().upper() kind = hidpp10_constants.DEVICE_KIND[0x00] # unknown kind else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information - non-unifying") else: raise exceptions.NoSuchDevice(number=n, receiver=self, error="read pairing information") - pair_info = self.read_register(_R.receiver_info, _IR.extended_pairing_information + n - 1) if pair_info: power_switch = hidpp10_constants.POWER_SWITCH_LOCATION[pair_info[9] & 0x0F] - else: # some Nano receivers? - pair_info = self.read_register(0x2D5) - if pair_info: serial = pair_info[1:5].hex().upper() + else: # some Nano receivers? + pair_info = self.read_register(0x2D5) # undocumented and questionable + if pair_info: + logger.warning("using undocumented register for device serial number") + serial = pair_info[1:5].hex().upper() return {"wpid": wpid, "kind": kind, "polling": polling_rate, "serial": serial, "power_switch": power_switch} - def get_kind_from_index(self, index): - """Get device kind from 27Mhz device index""" - # From drivers/hid/hid-logitech-dj.c - if index == 1: # mouse - kind = 2 - elif index == 2: # mouse - kind = 2 - elif index == 3: # keyboard - kind = 1 - elif index == 4: # numpad - kind = 3 - else: # unknown device number on 27Mhz receiver - logger.error("failed to calculate device kind for device %d of %s", index, self) - raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number") - return kind - - def notify_devices(self): - """Scan all devices.""" - if self.handle: - if not self.write_register(_R.receiver_connection, 0x02): - logger.warning("%s: failed to trigger device link notifications", self) - def register_new_device(self, number, notification=None): if self._devices.get(number) is not None: raise IndexError("%s: device number %d already registered" % (self, number)) @@ -241,14 +184,15 @@ def register_new_device(self, number, notification=None): try: info = self.device_pairing_information(number) if notification is not None: - online = not bool(ord(notification.data[0:1]) & 0x40) - # the rest may be redundant, but keep it around for now - info["wpid"] = (notification.data[2:3] + notification.data[1:2]).hex().upper() - kind = ord(notification.data[0:1]) & 0x0F - if self.receiver_kind == "27Mhz": # get 27Mhz wpid and set kind based on index - info["wpid"] = "00" + notification.data[2:3].hex().upper() - kind = self.get_kind_from_index(number) - info["kind"] = hidpp10_constants.DEVICE_KIND[kind] + online, _e, nwpid, nkind = self.notification_information(number, notification) + if info["wpid"] is None: + info["wpid"] = nwpid + elif nwpid is not None and info["wpid"] != nwpid: + logger.warning("mismatch on device WPID %s %s", info["wpid"], nwpid) + if info["kind"] is None: + info["kind"] = nkind + elif nkind is not None and info["kind"] != nkind: + logger.warning("mismatch on device kind %s %s", info["kind"], nkind) else: online = True dev = Device(self, number, online, pairing_info=info, setting_callback=self.setting_callback) @@ -270,19 +214,10 @@ def set_lock(self, lock_closed=True, device=0, timeout=0): return True logger.warning("%s: failed to %s the receiver lock", self, "close" if lock_closed else "open") - def discover(self, cancel=False, timeout=30): - pass - - def pair_device(self, pair=True, slot=0, address=b"\0\0\0\0\0\0", authentication=0x00, entropy=20): - pass - def count(self): count = self.read_register(_R.receiver_connection) return 0 if count is None else ord(count[1:2]) - # def has_devices(self): - # return len(self) > 0 or self.count() > 0 - def request(self, request_id, *params): if bool(self): return _base.request(self.handle, 0xFF, request_id, *params) @@ -378,7 +313,12 @@ def __hash__(self): return self.path.__hash__() def __str__(self): - return self._str + return "<%s(%s,%s%s)>" % ( + self.name.replace(" ", ""), + self.path, + "" if isinstance(self.handle, int) else "T", + self.handle, + ) __repr__ = __str__ @@ -386,6 +326,8 @@ def __str__(self): class BoltReceiver(Receiver): + """Bolt receivers use a different pairing prototol and have different pairing registers""" + def __init__(self, product_info, handle, path, product_id, setting_callback=None): super().__init__("bolt", product_info, handle, path, product_id, setting_callback) @@ -393,7 +335,6 @@ def initialize(self, product_info: dict): serial_reply = self.read_register(_R.bolt_uniqueId) self.serial = serial_reply.hex().upper() self.max_devices = product_info.get("max_devices", 1) - self.may_unpair = product_info.get("may_unpair", False) def device_codename(self, n): codename = self.read_register(_R.receiver_info, _IR.bolt_device_name + n, 0x01) @@ -450,5 +391,76 @@ def __init__(self, product_info, handle, path, product_id, setting_callback=None class Ex100Receiver(Receiver): + """A very old style receiver, somewhat different from newer receivers""" + def __init__(self, product_info, handle, path, product_id, setting_callback=None): super().__init__("27Mhz", product_info, handle, path, product_id, setting_callback) + + def initialize(self, product_info: dict): + self.serial = None + self.max_devices = product_info.get("max_devices", 1) + + def notification_information(self, number, notification): + """Extract information from 27Mz-style notification and device index""" + assert notification.address == 0x02 + online = True + encrypted = bool(notification.data[0] & 0x80) + kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(number)] + wpid = "00" + notification.data[2:3].hex().upper() + return online, encrypted, wpid, kind + + def device_pairing_information(self, number: int) -> dict: + wpid = _hid.find_paired_node_wpid(self.path, number) # extract WPID from udev path + if not wpid: + logger.error("Unable to get wpid from udev for device %d of %s", number, self) + raise exceptions.NoSuchDevice(number=number, receiver=self, error="Not present 27Mhz device") + kind = hidpp10_constants.DEVICE_KIND[self.get_kind_from_index(number)] + return {"wpid": wpid, "kind": kind, "polling": "", "serial": None, "power_switch": "(unknown)"} + + def get_kind_from_index(self, index): + """Get device kind from 27Mhz device index""" + # From drivers/hid/hid-logitech-dj.c + if index == 1: # mouse + kind = 2 + elif index == 2: # mouse + kind = 2 + elif index == 3: # keyboard + kind = 1 + elif index == 4: # numpad + kind = 3 + else: # unknown device number on 27Mhz receiver + logger.error("failed to calculate device kind for device %d of %s", index, self) + raise exceptions.NoSuchDevice(number=index, receiver=self, error="Unknown 27Mhz device number") + return kind + + +receiver_class_mapping = { + "bolt": BoltReceiver, + "unifying": UnifyingReceiver, + "lightspeed": LightSpeedReceiver, + "nano": NanoReceiver, + "27Mhz": Ex100Receiver, +} + + +class ReceiverFactory: + @staticmethod + def create_receiver(device_info, setting_callback=None) -> Optional[Receiver]: + """Opens a Logitech Receiver found attached to the machine, by Linux device path.""" + + try: + handle = _base.open_path(device_info.path) + if handle: + product_info = _base.product_information(device_info.product_id) + if not product_info: + logger.warning("Unknown receiver type: %s", device_info.product_id) + product_info = {} + receiver_kind = product_info.get("receiver_kind", "unknown") + receiver_class = receiver_class_mapping.get(receiver_kind, Receiver) + return receiver_class(product_info, handle, device_info.path, device_info.product_id, setting_callback) + except OSError as e: + logger.exception("open %s", device_info) + if e.errno == _errno.EACCES: + raise + except Exception: + logger.exception("open %s", device_info)