diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index e9f390cad3..609d5dec44 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -30,9 +30,8 @@ import hidapi as _hid -from . import exceptions +from . import exceptions, hidpp20 from . import hidpp10_constants as _hidpp10_constants -from . import hidpp20 as _hidpp20 from . import hidpp20_constants as _hidpp20_constants from .base_usb import ALL as _RECEIVER_USB_IDS from .common import strhex as _strhex @@ -40,6 +39,8 @@ logger = logging.getLogger(__name__) +_hidpp20 = hidpp20.Hidpp20() + # # # diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index df11a43dd8..0c610632b3 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -41,6 +41,8 @@ logger = logging.getLogger(__name__) +KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KIND} + def hexint_presenter(dumper, data): return dumper.represent_int(hex(data)) @@ -1339,11 +1341,10 @@ def show(self): _yaml.SafeLoader.add_constructor("!OnboardProfiles", OnboardProfiles.from_yaml) _yaml.add_representer(OnboardProfiles, OnboardProfiles.to_yaml) + # # # - - def feature_request(device, feature, function=0x00, *params, no_reply=False): if device.online and device.features: if feature in device.features: @@ -1351,121 +1352,374 @@ def feature_request(device, feature, function=0x00, *params, no_reply=False): return device.request((feature_index << 8) + (function & 0xFF), *params, no_reply=no_reply) -def get_firmware(device): - """Reads a device's firmware info. +# voltage to remaining charge from Logitech +battery_voltage_remaining = ( + (4186, 100), + (4067, 90), + (3989, 80), + (3922, 70), + (3859, 60), + (3811, 50), + (3778, 40), + (3751, 30), + (3717, 20), + (3671, 10), + (3646, 5), + (3579, 2), + (3500, 0), + (-1000, 0), +) - :returns: a list of FirmwareInfo tuples, ordered by firmware layer. - """ - count = feature_request(device, FEATURE.DEVICE_FW_VERSION) - if count: - count = ord(count[:1]) - - fw = [] - for index in range(0, count): - fw_info = feature_request(device, FEATURE.DEVICE_FW_VERSION, 0x10, index) - if fw_info: - level = ord(fw_info[:1]) & 0x0F - if level == 0 or level == 1: - name, version_major, version_minor, build = _unpack("!3sBBH", fw_info[1:8]) - version = "%02X.%02X" % (version_major, version_minor) - if build: - version += ".B%04X" % build - extras = fw_info[9:].rstrip(b"\x00") or None - fw_info = _FirmwareInfo(FIRMWARE_KIND[level], name.decode("ascii"), version, extras) - elif level == FIRMWARE_KIND.Hardware: - fw_info = _FirmwareInfo(FIRMWARE_KIND.Hardware, "", str(ord(fw_info[1:2])), None) + +class Hidpp20: + def get_firmware(self, device): + """Reads a device's firmware info. + + :returns: a list of FirmwareInfo tuples, ordered by firmware layer. + """ + count = feature_request(device, FEATURE.DEVICE_FW_VERSION) + if count: + count = ord(count[:1]) + + fw = [] + for index in range(0, count): + fw_info = feature_request(device, FEATURE.DEVICE_FW_VERSION, 0x10, index) + if fw_info: + level = ord(fw_info[:1]) & 0x0F + if level == 0 or level == 1: + name, version_major, version_minor, build = _unpack("!3sBBH", fw_info[1:8]) + version = "%02X.%02X" % (version_major, version_minor) + if build: + version += ".B%04X" % build + extras = fw_info[9:].rstrip(b"\x00") or None + fw_info = _FirmwareInfo(FIRMWARE_KIND[level], name.decode("ascii"), version, extras) + elif level == FIRMWARE_KIND.Hardware: + fw_info = _FirmwareInfo(FIRMWARE_KIND.Hardware, "", str(ord(fw_info[1:2])), None) + else: + fw_info = _FirmwareInfo(FIRMWARE_KIND.Other, "", "", None) + + fw.append(fw_info) + # if logger.isEnabledFor(logging.DEBUG): + # logger.debug("device %d firmware %s", devnumber, fw_info) + return tuple(fw) + + def get_ids(self, device): + """Reads a device's ids (unit and model numbers)""" + ids = feature_request(device, FEATURE.DEVICE_FW_VERSION) + if ids: + unitId = ids[1:5] + modelId = ids[7:13] + transport_bits = ord(ids[6:7]) + offset = 0 + tid_map = {} + for transport, flag in [("btid", 0x1), ("btleid", 0x02), ("wpid", 0x04), ("usbid", 0x08)]: + if transport_bits & flag: + tid_map[transport] = modelId[offset : offset + 2].hex().upper() + offset = offset + 2 + return (unitId.hex().upper(), modelId.hex().upper(), tid_map) + + def get_kind(self, device): + """Reads a device's type. + + :see DEVICE_KIND: + :returns: a string describing the device type, or ``None`` if the device is + not available or does not support the ``DEVICE_NAME`` feature. + """ + kind = feature_request(device, FEATURE.DEVICE_NAME, 0x20) + if kind: + kind = ord(kind[:1]) + # if logger.isEnabledFor(logging.DEBUG): + # logger.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind]) + return KIND_MAP[DEVICE_KIND[kind]] + + def get_name(self, device): + """Reads a device's name. + + :returns: a string with the device name, or ``None`` if the device is not + available or does not support the ``DEVICE_NAME`` feature. + """ + name_length = feature_request(device, FEATURE.DEVICE_NAME) + if name_length: + name_length = ord(name_length[:1]) + + name = b"" + while len(name) < name_length: + fragment = feature_request(device, FEATURE.DEVICE_NAME, 0x10, len(name)) + if fragment: + name += fragment[: name_length - len(name)] else: - fw_info = _FirmwareInfo(FIRMWARE_KIND.Other, "", "", None) - - fw.append(fw_info) - # if logger.isEnabledFor(logging.DEBUG): - # logger.debug("device %d firmware %s", devnumber, fw_info) - return tuple(fw) - - -def get_ids(device): - """Reads a device's ids (unit and model numbers)""" - ids = feature_request(device, FEATURE.DEVICE_FW_VERSION) - if ids: - unitId = ids[1:5] - modelId = ids[7:13] - transport_bits = ord(ids[6:7]) - offset = 0 - tid_map = {} - for transport, flag in [("btid", 0x1), ("btleid", 0x02), ("wpid", 0x04), ("usbid", 0x08)]: - if transport_bits & flag: - tid_map[transport] = modelId[offset : offset + 2].hex().upper() - offset = offset + 2 - return (unitId.hex().upper(), modelId.hex().upper(), tid_map) + logger.error("failed to read whole name of %s (expected %d chars)", device, name_length) + return None + return name.decode("utf-8") -KIND_MAP = {kind: _hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KIND} + def get_friendly_name(self, device): + """Reads a device's friendly name. + :returns: a string with the device name, or ``None`` if the device is not + available or does not support the ``DEVICE_NAME`` feature. + """ + name_length = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME) + if name_length: + name_length = ord(name_length[:1]) + + name = b"" + while len(name) < name_length: + fragment = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME, 0x10, len(name)) + if fragment: + initial_null = 0 if fragment[0] else 1 # initial null actually seen on a device + name += fragment[initial_null : name_length + initial_null - len(name)] + else: + logger.error("failed to read whole name of %s (expected %d chars)", device, name_length) + return None -def get_kind(device): - """Reads a device's type. + return name.decode("utf-8") - :see DEVICE_KIND: - :returns: a string describing the device type, or ``None`` if the device is - not available or does not support the ``DEVICE_NAME`` feature. - """ - kind = feature_request(device, FEATURE.DEVICE_NAME, 0x20) - if kind: - kind = ord(kind[:1]) - # if logger.isEnabledFor(logging.DEBUG): - # logger.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind]) - return KIND_MAP[DEVICE_KIND[kind]] + def get_battery_status(self, device): + report = feature_request(device, FEATURE.BATTERY_STATUS) + if report: + return self.decipher_battery_status(report) + def get_battery_unified(self, device): + report = feature_request(device, FEATURE.UNIFIED_BATTERY, 0x10) + if report is not None: + return decipher_battery_unified(report) -def get_name(device): - """Reads a device's name. + def get_battery_voltage(self, device): + report = feature_request(device, FEATURE.BATTERY_VOLTAGE) + if report is not None: + return decipher_battery_voltage(report) + + def get_adc_measurement(self, device): + try: # this feature call produces an error for headsets that are connected but inactive + report = feature_request(device, FEATURE.ADC_MEASUREMENT) + if report is not None: + return self.decipher_adc_measurement(report) + except exceptions.FeatureCallError: + return FEATURE.ADC_MEASUREMENT if FEATURE.ADC_MEASUREMENT in device.features else None + + def get_battery(self, device, feature): + """Return battery information - feature, approximate level, next, charging, voltage + or battery feature if there is one but it is not responding or None for no battery feature""" + battery_functions = { + FEATURE.BATTERY_STATUS: self.get_battery_status, + FEATURE.BATTERY_VOLTAGE: self.get_battery_voltage, + FEATURE.UNIFIED_BATTERY: self.get_battery_unified, + FEATURE.ADC_MEASUREMENT: self.get_adc_measurement, + } - :returns: a string with the device name, or ``None`` if the device is not - available or does not support the ``DEVICE_NAME`` feature. - """ - name_length = feature_request(device, FEATURE.DEVICE_NAME) - if name_length: - name_length = ord(name_length[:1]) - - name = b"" - while len(name) < name_length: - fragment = feature_request(device, FEATURE.DEVICE_NAME, 0x10, len(name)) - if fragment: - name += fragment[: name_length - len(name)] - else: - logger.error("failed to read whole name of %s (expected %d chars)", device, name_length) - return None + if feature is not None: + battery_function = battery_functions.get(feature, None) + if battery_function: + result = battery_function(device) + if result: + return result + else: + for battery_function in battery_functions.values(): + result = battery_function(device) + if result: + return result + return 0 - return name.decode("utf-8") + def get_keys(self, device): + # TODO: add here additional variants for other REPROG_CONTROLS + count = None + if FEATURE.REPROG_CONTROLS_V2 in device.features: + count = feature_request(device, FEATURE.REPROG_CONTROLS_V2) + return KeysArrayV1(device, ord(count[:1])) + elif FEATURE.REPROG_CONTROLS_V4 in device.features: + count = feature_request(device, FEATURE.REPROG_CONTROLS_V4) + return KeysArrayV4(device, ord(count[:1])) + return None + def get_remap_keys(self, device): + count = feature_request(device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10) + if count: + return KeysArrayPersistent(device, ord(count[:1])) + + def get_gestures(self, device): + if getattr(device, "_gestures", None) is not None: + return device._gestures + if FEATURE.GESTURE_2 in device.features: + return Gestures(device) + + def get_backlight(self, device): + if getattr(device, "_backlight", None) is not None: + return device._backlight + if FEATURE.BACKLIGHT2 in device.features: + return Backlight(device) + + def get_profiles(self, device): + if getattr(device, "_profiles", None) is not None: + return device._profiles + if FEATURE.ONBOARD_PROFILES in device.features: + return OnboardProfiles.from_device(device) + + def get_mouse_pointer_info(self, device): + pointer_info = feature_request(device, FEATURE.MOUSE_POINTER) + if pointer_info: + dpi, flags = _unpack("!HB", pointer_info[:3]) + acceleration = ("none", "low", "med", "high")[flags & 0x3] + suggest_os_ballistics = (flags & 0x04) != 0 + suggest_vertical_orientation = (flags & 0x08) != 0 + return { + "dpi": dpi, + "acceleration": acceleration, + "suggest_os_ballistics": suggest_os_ballistics, + "suggest_vertical_orientation": suggest_vertical_orientation, + } + + def get_vertical_scrolling_info(self, device): + vertical_scrolling_info = feature_request(device, FEATURE.VERTICAL_SCROLLING) + if vertical_scrolling_info: + roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3]) + roller_type = ( + "reserved", + "standard", + "reserved", + "3G", + "micro", + "normal touch pad", + "inverted touch pad", + "reserved", + )[roller] + return {"roller": roller_type, "ratchet": ratchet, "lines": lines} + + def get_hi_res_scrolling_info(self, device): + hi_res_scrolling_info = feature_request(device, FEATURE.HI_RES_SCROLLING) + if hi_res_scrolling_info: + mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2]) + return mode, resolution + + def get_pointer_speed_info(self, device): + pointer_speed_info = feature_request(device, FEATURE.POINTER_SPEED) + if pointer_speed_info: + pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2]) + # if pointer_speed_lo > 0: + # pointer_speed_lo = pointer_speed_lo + return pointer_speed_hi + pointer_speed_lo / 256 + + def get_lowres_wheel_status(self, device): + lowres_wheel_status = feature_request(device, FEATURE.LOWRES_WHEEL) + if lowres_wheel_status: + wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0] + wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01] + return wheel_reporting + + def get_hires_wheel(self, device): + caps = feature_request(device, FEATURE.HIRES_WHEEL, 0x00) + mode = feature_request(device, FEATURE.HIRES_WHEEL, 0x10) + ratchet = feature_request(device, FEATURE.HIRES_WHEEL, 0x030) + + if caps and mode and ratchet: + # Parse caps + multi, flags = _unpack("!BB", caps[:2]) + + has_invert = (flags & 0x08) != 0 + has_ratchet = (flags & 0x04) != 0 + + # Parse mode + wheel_mode, reserved = _unpack("!BB", mode[:2]) + + target = (wheel_mode & 0x01) != 0 + res = (wheel_mode & 0x02) != 0 + inv = (wheel_mode & 0x04) != 0 + + # Parse Ratchet switch + ratchet_mode, reserved = _unpack("!BB", ratchet[:2]) + + ratchet = (ratchet_mode & 0x01) != 0 + + return multi, has_invert, has_ratchet, inv, res, target, ratchet + + def get_new_fn_inversion(self, device): + state = feature_request(device, FEATURE.NEW_FN_INVERSION, 0x00) + if state: + inverted, default_inverted = _unpack("!BB", state[:2]) + inverted = (inverted & 0x01) != 0 + default_inverted = (default_inverted & 0x01) != 0 + return inverted, default_inverted + + def get_host_names(self, device): + state = feature_request(device, FEATURE.HOSTS_INFO, 0x00) + host_names = {} + if state: + capability_flags, _ignore, numHosts, currentHost = _unpack("!BBBB", state[:4]) + if capability_flags & 0x01: # device can get host names + for host in range(0, numHosts): + hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, host) + _ignore, status, _ignore, _ignore, nameLen, _ignore = _unpack("!BBBBBB", hostinfo[:6]) + name = "" + remaining = nameLen + while remaining > 0: + name_piece = feature_request(device, FEATURE.HOSTS_INFO, 0x30, host, nameLen - remaining) + if name_piece: + name += name_piece[2 : 2 + min(remaining, 14)].decode() + remaining = max(0, remaining - 14) + else: + remaining = 0 + host_names[host] = (bool(status), name) + if host_names: # update the current host's name if it doesn't match the system name + hostname = socket.gethostname().partition(".")[0] + if host_names[currentHost][1] != hostname: + self.set_host_name(device, hostname, host_names[currentHost][1]) + host_names[currentHost] = (host_names[currentHost][0], hostname) + return host_names + + def set_host_name(self, device, name, currentName=""): + name = bytearray(name, "utf-8") + currentName = bytearray(currentName, "utf-8") + if logger.isEnabledFor(logging.INFO): + logger.info("Setting host name to %s", name) + state = feature_request(device, FEATURE.HOSTS_INFO, 0x00) + if state: + flags, _ignore, _ignore, currentHost = _unpack("!BBBB", state[:4]) + if flags & 0x02: + hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, currentHost) + _ignore, _ignore, _ignore, _ignore, _ignore, maxNameLen = _unpack("!BBBBBB", hostinfo[:6]) + if name[:maxNameLen] == currentName[:maxNameLen] and False: + return True + length = min(maxNameLen, len(name)) + chunk = 0 + while chunk < length: + response = feature_request(device, FEATURE.HOSTS_INFO, 0x40, currentHost, chunk, name[chunk : chunk + 14]) + if not response: + return False + chunk += 14 + return True -def get_friendly_name(device): - """Reads a device's friendly name. + def get_onboard_mode(self, device): + state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x20) - :returns: a string with the device name, or ``None`` if the device is not - available or does not support the ``DEVICE_NAME`` feature. - """ - name_length = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME) - if name_length: - name_length = ord(name_length[:1]) - - name = b"" - while len(name) < name_length: - fragment = feature_request(device, FEATURE.DEVICE_FRIENDLY_NAME, 0x10, len(name)) - if fragment: - initial_null = 0 if fragment[0] else 1 # initial null actually seen on a device - name += fragment[initial_null : name_length + initial_null - len(name)] - else: - logger.error("failed to read whole name of %s (expected %d chars)", device, name_length) - return None + if state: + mode = _unpack("!B", state[:1])[0] + return mode - return name.decode("utf-8") + def set_onboard_mode(self, device, mode): + state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x10, mode) + return state + def get_polling_rate(self, device): + state = feature_request(device, FEATURE.REPORT_RATE, 0x10) + if state: + rate = _unpack("!B", state[:1])[0] + return str(rate) + "ms" + else: + rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"] + state = feature_request(device, FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20) + if state: + rate = _unpack("!B", state[:1])[0] + return rates[rate] + + def get_remaining_pairing(self, device): + result = feature_request(device, FEATURE.REMAINING_PAIRING, 0x0) + if result: + result = _unpack("!B", result[:1])[0] + FEATURE._fallback = lambda x: "unknown:%04X" % x + return result -def get_battery_status(device): - report = feature_request(device, FEATURE.BATTERY_STATUS) - if report: - return decipher_battery_status(report) + def config_change(device, configuration, no_reply=False): + return feature_request(device, FEATURE.CONFIG_CHANGE, 0x00, configuration, no_reply=no_reply) def decipher_battery_status(report): @@ -1477,56 +1731,6 @@ def decipher_battery_status(report): return FEATURE.BATTERY_STATUS, discharge, next, status, None -def get_battery_unified(device): - report = feature_request(device, FEATURE.UNIFIED_BATTERY, 0x10) - if report is not None: - return decipher_battery_unified(report) - - -def decipher_battery_unified(report): - discharge, level, status, _ignore = _unpack("!BBBB", report[:4]) - status = _BATTERY_STATUS[status] - if logger.isEnabledFor(logging.DEBUG): - logger.debug("battery unified %s%% charged, level %s, charging %s", discharge, level, status) - level = ( - _BATTERY_APPROX.full - if level == 8 # full - else _BATTERY_APPROX.good - if level == 4 # good - else _BATTERY_APPROX.low - if level == 2 # low - else _BATTERY_APPROX.critical - if level == 1 # critical - else _BATTERY_APPROX.empty - ) - return FEATURE.UNIFIED_BATTERY, discharge if discharge else level, None, status, None - - -# voltage to remaining charge from Logitech -battery_voltage_remaining = ( - (4186, 100), - (4067, 90), - (3989, 80), - (3922, 70), - (3859, 60), - (3811, 50), - (3778, 40), - (3751, 30), - (3717, 20), - (3671, 10), - (3646, 5), - (3579, 2), - (3500, 0), - (-1000, 0), -) - - -def get_battery_voltage(device): - report = feature_request(device, FEATURE.BATTERY_VOLTAGE) - if report is not None: - return decipher_battery_voltage(report) - - def decipher_battery_voltage(report): voltage, flags = _unpack(">HB", report[:3]) status = _BATTERY_STATUS.discharging @@ -1565,13 +1769,23 @@ def decipher_battery_voltage(report): return FEATURE.BATTERY_VOLTAGE, charge_lvl, None, status, voltage -def get_adc_measurement(device): - try: # this feature call produces an error for headsets that are connected but inactive - report = feature_request(device, FEATURE.ADC_MEASUREMENT) - if report is not None: - return decipher_adc_measurement(report) - except exceptions.FeatureCallError: - return FEATURE.ADC_MEASUREMENT if FEATURE.ADC_MEASUREMENT in device.features else None +def decipher_battery_unified(report): + discharge, level, status, _ignore = _unpack("!BBBB", report[:4]) + status = _BATTERY_STATUS[status] + if logger.isEnabledFor(logging.DEBUG): + logger.debug("battery unified %s%% charged, level %s, charging %s", discharge, level, status) + level = ( + _BATTERY_APPROX.full + if level == 8 # full + else _BATTERY_APPROX.good + if level == 4 # good + else _BATTERY_APPROX.low + if level == 2 # low + else _BATTERY_APPROX.critical + if level == 1 # critical + else _BATTERY_APPROX.empty + ) + return FEATURE.UNIFIED_BATTERY, discharge if discharge else level, None, status, None def decipher_adc_measurement(report): @@ -1584,247 +1798,3 @@ def decipher_adc_measurement(report): if flags & 0x01: status = _BATTERY_STATUS.recharging if flags & 0x02 else _BATTERY_STATUS.discharging return FEATURE.ADC_MEASUREMENT, charge_level, None, status, adc - - -battery_functions = { - FEATURE.BATTERY_STATUS: get_battery_status, - FEATURE.BATTERY_VOLTAGE: get_battery_voltage, - FEATURE.UNIFIED_BATTERY: get_battery_unified, - FEATURE.ADC_MEASUREMENT: get_adc_measurement, -} - - -def get_battery(device, feature): - """Return battery information - feature, approximate level, next, charging, voltage - or battery feature if there is one but it is not responding or None for no battery feature""" - if feature is not None: - battery_function = battery_functions.get(feature, None) - if battery_function: - result = battery_function(device) - if result: - return result - else: - for battery_function in battery_functions.values(): - result = battery_function(device) - if result: - return result - return 0 - - -def get_keys(device): - # TODO: add here additional variants for other REPROG_CONTROLS - count = None - if FEATURE.REPROG_CONTROLS_V2 in device.features: - count = feature_request(device, FEATURE.REPROG_CONTROLS_V2) - return KeysArrayV1(device, ord(count[:1])) - elif FEATURE.REPROG_CONTROLS_V4 in device.features: - count = feature_request(device, FEATURE.REPROG_CONTROLS_V4) - return KeysArrayV4(device, ord(count[:1])) - return None - - -def get_remap_keys(device): - count = feature_request(device, FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10) - if count: - return KeysArrayPersistent(device, ord(count[:1])) - - -def get_gestures(device): - if getattr(device, "_gestures", None) is not None: - return device._gestures - if FEATURE.GESTURE_2 in device.features: - return Gestures(device) - - -def get_backlight(device): - if getattr(device, "_backlight", None) is not None: - return device._backlight - if FEATURE.BACKLIGHT2 in device.features: - return Backlight(device) - - -def get_profiles(device): - if getattr(device, "_profiles", None) is not None: - return device._profiles - if FEATURE.ONBOARD_PROFILES in device.features: - return OnboardProfiles.from_device(device) - - -def get_mouse_pointer_info(device): - pointer_info = feature_request(device, FEATURE.MOUSE_POINTER) - if pointer_info: - dpi, flags = _unpack("!HB", pointer_info[:3]) - acceleration = ("none", "low", "med", "high")[flags & 0x3] - suggest_os_ballistics = (flags & 0x04) != 0 - suggest_vertical_orientation = (flags & 0x08) != 0 - return { - "dpi": dpi, - "acceleration": acceleration, - "suggest_os_ballistics": suggest_os_ballistics, - "suggest_vertical_orientation": suggest_vertical_orientation, - } - - -def get_vertical_scrolling_info(device): - vertical_scrolling_info = feature_request(device, FEATURE.VERTICAL_SCROLLING) - if vertical_scrolling_info: - roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3]) - roller_type = ( - "reserved", - "standard", - "reserved", - "3G", - "micro", - "normal touch pad", - "inverted touch pad", - "reserved", - )[roller] - return {"roller": roller_type, "ratchet": ratchet, "lines": lines} - - -def get_hi_res_scrolling_info(device): - hi_res_scrolling_info = feature_request(device, FEATURE.HI_RES_SCROLLING) - if hi_res_scrolling_info: - mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2]) - return mode, resolution - - -def get_pointer_speed_info(device): - pointer_speed_info = feature_request(device, FEATURE.POINTER_SPEED) - if pointer_speed_info: - pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2]) - # if pointer_speed_lo > 0: - # pointer_speed_lo = pointer_speed_lo - return pointer_speed_hi + pointer_speed_lo / 256 - - -def get_lowres_wheel_status(device): - lowres_wheel_status = feature_request(device, FEATURE.LOWRES_WHEEL) - if lowres_wheel_status: - wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0] - wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01] - return wheel_reporting - - -def get_hires_wheel(device): - caps = feature_request(device, FEATURE.HIRES_WHEEL, 0x00) - mode = feature_request(device, FEATURE.HIRES_WHEEL, 0x10) - ratchet = feature_request(device, FEATURE.HIRES_WHEEL, 0x030) - - if caps and mode and ratchet: - # Parse caps - multi, flags = _unpack("!BB", caps[:2]) - - has_invert = (flags & 0x08) != 0 - has_ratchet = (flags & 0x04) != 0 - - # Parse mode - wheel_mode, reserved = _unpack("!BB", mode[:2]) - - target = (wheel_mode & 0x01) != 0 - res = (wheel_mode & 0x02) != 0 - inv = (wheel_mode & 0x04) != 0 - - # Parse Ratchet switch - ratchet_mode, reserved = _unpack("!BB", ratchet[:2]) - - ratchet = (ratchet_mode & 0x01) != 0 - - return multi, has_invert, has_ratchet, inv, res, target, ratchet - - -def get_new_fn_inversion(device): - state = feature_request(device, FEATURE.NEW_FN_INVERSION, 0x00) - if state: - inverted, default_inverted = _unpack("!BB", state[:2]) - inverted = (inverted & 0x01) != 0 - default_inverted = (default_inverted & 0x01) != 0 - return inverted, default_inverted - - -def get_host_names(device): - state = feature_request(device, FEATURE.HOSTS_INFO, 0x00) - host_names = {} - if state: - capability_flags, _ignore, numHosts, currentHost = _unpack("!BBBB", state[:4]) - if capability_flags & 0x01: # device can get host names - for host in range(0, numHosts): - hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, host) - _ignore, status, _ignore, _ignore, nameLen, _ignore = _unpack("!BBBBBB", hostinfo[:6]) - name = "" - remaining = nameLen - while remaining > 0: - name_piece = feature_request(device, FEATURE.HOSTS_INFO, 0x30, host, nameLen - remaining) - if name_piece: - name += name_piece[2 : 2 + min(remaining, 14)].decode() - remaining = max(0, remaining - 14) - else: - remaining = 0 - host_names[host] = (bool(status), name) - if host_names: # update the current host's name if it doesn't match the system name - hostname = socket.gethostname().partition(".")[0] - if host_names[currentHost][1] != hostname: - set_host_name(device, hostname, host_names[currentHost][1]) - host_names[currentHost] = (host_names[currentHost][0], hostname) - return host_names - - -def set_host_name(device, name, currentName=""): - name = bytearray(name, "utf-8") - currentName = bytearray(currentName, "utf-8") - if logger.isEnabledFor(logging.INFO): - logger.info("Setting host name to %s", name) - state = feature_request(device, FEATURE.HOSTS_INFO, 0x00) - if state: - flags, _ignore, _ignore, currentHost = _unpack("!BBBB", state[:4]) - if flags & 0x02: - hostinfo = feature_request(device, FEATURE.HOSTS_INFO, 0x10, currentHost) - _ignore, _ignore, _ignore, _ignore, _ignore, maxNameLen = _unpack("!BBBBBB", hostinfo[:6]) - if name[:maxNameLen] == currentName[:maxNameLen] and False: - return True - length = min(maxNameLen, len(name)) - chunk = 0 - while chunk < length: - response = feature_request(device, FEATURE.HOSTS_INFO, 0x40, currentHost, chunk, name[chunk : chunk + 14]) - if not response: - return False - chunk += 14 - return True - - -def get_onboard_mode(device): - state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x20) - - if state: - mode = _unpack("!B", state[:1])[0] - return mode - - -def set_onboard_mode(device, mode): - state = feature_request(device, FEATURE.ONBOARD_PROFILES, 0x10, mode) - return state - - -def get_polling_rate(device): - state = feature_request(device, FEATURE.REPORT_RATE, 0x10) - if state: - rate = _unpack("!B", state[:1])[0] - return str(rate) + "ms" - else: - rates = ["8ms", "4ms", "2ms", "1ms", "500us", "250us", "125us"] - state = feature_request(device, FEATURE.EXTENDED_ADJUSTABLE_REPORT_RATE, 0x20) - if state: - rate = _unpack("!B", state[:1])[0] - return rates[rate] - - -def get_remaining_pairing(device): - result = feature_request(device, FEATURE.REMAINING_PAIRING, 0x0) - if result: - result = _unpack("!B", result[:1])[0] - FEATURE._fallback = lambda x: "unknown:%04X" % x - return result - - -def config_change(device, configuration, no_reply=False): - return feature_request(device, FEATURE.CONFIG_CHANGE, 0x00, configuration, no_reply=no_reply) diff --git a/lib/logitech_receiver/notifications.py b/lib/logitech_receiver/notifications.py index a77607bace..c4a37c48ad 100644 --- a/lib/logitech_receiver/notifications.py +++ b/lib/logitech_receiver/notifications.py @@ -24,9 +24,8 @@ from struct import unpack as _unpack from . import diversion as _diversion -from . import hidpp10 +from . import hidpp10, hidpp20 from . import hidpp10_constants as _hidpp10_constants -from . import hidpp20 as _hidpp20 from . import hidpp20_constants as _hidpp20_constants from . import settings_templates as _st from .base import DJ_MESSAGE_ID as _DJ_MESSAGE_ID @@ -39,6 +38,7 @@ logger = logging.getLogger(__name__) _hidpp10 = hidpp10.Hidpp10() +_hidpp20 = hidpp20.Hidpp20() _R = _hidpp10_constants.REGISTERS _F = _hidpp20_constants.FEATURE @@ -296,7 +296,7 @@ def _process_feature_notification(device, status, n, feature): if feature == _F.BATTERY_STATUS: if n.address == 0x00: - _ignore, discharge_level, discharge_next_level, battery_status, voltage = _hidpp20.decipher_battery_status(n.data) + _ignore, discharge_level, discharge_next_level, battery_status, voltage = hidpp20.decipher_battery_status(n.data) status.set_battery_info(discharge_level, discharge_next_level, battery_status, voltage) elif n.address == 0x10: if logger.isEnabledFor(logging.INFO): @@ -306,21 +306,21 @@ def _process_feature_notification(device, status, n, feature): elif feature == _F.BATTERY_VOLTAGE: if n.address == 0x00: - _ignore, level, nextl, battery_status, voltage = _hidpp20.decipher_battery_voltage(n.data) + _ignore, level, nextl, battery_status, voltage = hidpp20.decipher_battery_voltage(n.data) status.set_battery_info(level, nextl, battery_status, voltage) else: logger.warning("%s: unknown VOLTAGE %s", device, n) elif feature == _F.UNIFIED_BATTERY: if n.address == 0x00: - _ignore, level, nextl, battery_status, voltage = _hidpp20.decipher_battery_unified(n.data) + _ignore, level, nextl, battery_status, voltage = hidpp20.decipher_battery_unified(n.data) status.set_battery_info(level, nextl, battery_status, voltage) else: logger.warning("%s: unknown UNIFIED BATTERY %s", device, n) elif feature == _F.ADC_MEASUREMENT: if n.address == 0x00: - result = _hidpp20.decipher_adc_measurement(n.data) + result = hidpp20.decipher_adc_measurement(n.data) if result: _ignore, level, nextl, battery_status, voltage = result status.set_battery_info(level, nextl, battery_status, voltage) diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index 9e0ff9f496..0200c4750f 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -27,7 +27,7 @@ from . import descriptors as _descriptors from . import hidpp10_constants as _hidpp10_constants -from . import hidpp20 as _hidpp20 +from . import hidpp20 from . import hidpp20_constants as _hidpp20_constants from . import notify as _notify from . import special_keys as _special_keys @@ -60,6 +60,7 @@ logger = logging.getLogger(__name__) +_hidpp20 = hidpp20.Hidpp20() _DK = _hidpp10_constants.DEVICE_KIND _R = _hidpp10_constants.REGISTERS _F = _hidpp20_constants.FEATURE @@ -528,7 +529,7 @@ def write(self, device, data_bytes): class validator_class(_ChoicesV): @classmethod def build(cls, setting_class, device): - headers = _hidpp20.OnboardProfiles.get_profile_headers(device) + headers = hidpp20.OnboardProfiles.get_profile_headers(device) profiles_list = [setting_class.choices_universe[0]] if headers: for sector, enabled in headers: @@ -1234,7 +1235,7 @@ class Gesture2Gestures(_BitFieldOMSetting): description = _("Tweak the mouse/touchpad behaviour.") feature = _F.GESTURE_2 rw_options = {"read_fnid": 0x10, "write_fnid": 0x20} - validator_options = {"om_method": _hidpp20.Gesture.enable_offset_mask} + validator_options = {"om_method": hidpp20.Gesture.enable_offset_mask} choices_universe = _hidpp20_constants.GESTURE _labels = _GESTURE2_GESTURES_LABELS @@ -1251,7 +1252,7 @@ class Gesture2Divert(_BitFieldOMSetting): description = _("Divert mouse/touchpad gestures.") feature = _F.GESTURE_2 rw_options = {"read_fnid": 0x30, "write_fnid": 0x40} - validator_options = {"om_method": _hidpp20.Gesture.diversion_offset_mask} + validator_options = {"om_method": hidpp20.Gesture.diversion_offset_mask} choices_universe = _hidpp20_constants.GESTURE _labels = _GESTURE2_GESTURES_LABELS @@ -1268,8 +1269,8 @@ class Gesture2Params(_LongSettings): description = _("Change numerical parameters of a mouse/touchpad.") feature = _F.GESTURE_2 rw_options = {"read_fnid": 0x70, "write_fnid": 0x80} - choices_universe = _hidpp20.PARAM - sub_items_universe = _hidpp20.SUB_PARAM + choices_universe = hidpp20.PARAM + sub_items_universe = hidpp20.SUB_PARAM # item (NamedInt) -> list/tuple of objects that have the following attributes # .id (sub-item text), .length (in bytes), .minimum and .maximum @@ -1465,7 +1466,7 @@ class LEDControl(_Setting): colors = _special_keys.COLORS -_LEDP = _hidpp20.LEDParam +_LEDP = hidpp20.LEDParam # an LED Zone has an index, a set of possible LED effects, and an LED effect setting @@ -1478,7 +1479,7 @@ class LEDZoneSetting(_Setting): speed_field = {"name": _LEDP.speed, "kind": _KIND.range, "label": _("Speed"), "min": 0, "max": 255} period_field = {"name": _LEDP.period, "kind": _KIND.range, "label": _("Period"), "min": 100, "max": 5000} intensity_field = {"name": _LEDP.intensity, "kind": _KIND.range, "label": _("Intensity"), "min": 0, "max": 100} - ramp_field = {"name": _LEDP.ramp, "kind": _KIND.choice, "label": _("Ramp"), "choices": _hidpp20.LEDRampChoices} + ramp_field = {"name": _LEDP.ramp, "kind": _KIND.choice, "label": _("Ramp"), "choices": hidpp20.LEDRampChoices} # form_field = { 'name': _LEDP.form, 'kind': _KIND.choice, 'label': _('Form'), 'choices': _hidpp20.LEDFormChoices } possible_fields = [color_field, speed_field, period_field, intensity_field, ramp_field] @@ -1489,14 +1490,14 @@ def build(cls, device): for zone in infos.zones: prefix = _int2bytes(zone.index, 1) rw = _FeatureRW(_F.COLOR_LED_EFFECTS, read_fnid=0xE0, write_fnid=0x30, prefix=prefix) - validator = _HeteroV(data_class=_hidpp20.LEDEffectSetting, options=zone.effects, readable=infos.readable) + validator = _HeteroV(data_class=hidpp20.LEDEffectSetting, options=zone.effects, readable=infos.readable) setting = cls(device, rw, validator) setting.name = cls.name + str(int(zone.location)) - setting.label = _("LEDs") + " " + str(_hidpp20.LEDZoneLocations[zone.location]) - choices = [_hidpp20.LEDEffects[e.ID][0] for e in zone.effects] + setting.label = _("LEDs") + " " + str(hidpp20.LEDZoneLocations[zone.location]) + choices = [hidpp20.LEDEffects[e.ID][0] for e in zone.effects] ID_field = {"name": "ID", "kind": _KIND.choice, "label": None, "choices": choices} setting.possible_fields = [ID_field] + cls.possible_fields - setting.fields_map = _hidpp20.LEDEffects + setting.fields_map = hidpp20.LEDEffects settings.append(setting) return settings diff --git a/lib/solaar/cli/show.py b/lib/solaar/cli/show.py index eb09407c50..ecf6cc77ca 100644 --- a/lib/solaar/cli/show.py +++ b/lib/solaar/cli/show.py @@ -16,9 +16,8 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -from logitech_receiver import exceptions, hidpp10 +from logitech_receiver import exceptions, hidpp10, hidpp20 from logitech_receiver import hidpp10_constants as _hidpp10_constants -from logitech_receiver import hidpp20 as _hidpp20 from logitech_receiver import hidpp20_constants as _hidpp20_constants from logitech_receiver import receiver as _receiver from logitech_receiver import settings_templates as _settings_templates @@ -28,6 +27,7 @@ from solaar import NAME, __version__ _hidpp10 = hidpp10.Hidpp10() +_hidpp20 = hidpp20.Hidpp20() def _print_receiver(receiver):