Skip to content

Commit

Permalink
refactoring(logitech_receiver/notifications): create tests before cre…
Browse files Browse the repository at this point in the history
…ating BOLT_PAIRING_ERRORS enums
  • Loading branch information
rloutrel committed Oct 11, 2024
1 parent 15659a1 commit 620e7a6
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 27 deletions.
69 changes: 42 additions & 27 deletions lib/logitech_receiver/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
from . import hidpp20
from . import hidpp20_constants
from . import settings_templates

This comment has been minimized.

Copy link
@MattHag

MattHag Oct 11, 2024

Collaborator

Put type hint related imports into a condition, so they o not become a hard dependency.
if typing.TYPE_CHECK...

from .base import HIDPPNotification
from .common import Alert
from .common import BatteryStatus
from .common import Notification
from .hidpp10_constants import Registers
from .receiver import Receiver

logger = logging.getLogger(__name__)

Expand All @@ -55,29 +57,42 @@ def process(device, notification):
return _process_device_notification(device, notification)


def _process_receiver_notification(receiver, n):
# supposedly only 0x4x notifications arrive for the receiver
assert n.sub_id & 0x40 == 0x40

if n.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(n.address & 0x01)
def _process_receiver_notification(receiver: Receiver, hidpp_notification: HIDPPNotification):

This comment has been minimized.

Copy link
@MattHag

MattHag Oct 11, 2024

Collaborator

Add return type too. I think it is bool | None.

# supposedly only 0x4x notifications arrive for the receiver:
# Explicitly specify the list

This comment has been minimized.

Copy link
@MattHag

MattHag Oct 11, 2024

Collaborator

You can drop this line, it is clear by the code

assert hidpp_notification.sub_id in [
Notification.CONNECT_DISCONNECT,
Notification.DJ_PAIRING,
Notification.CONNECTED,
Notification.RAW_INPUT,
Notification.PAIRING_LOCK,
Notification.POWER,
Registers.DEVICE_DISCOVERY_NOTIFICATION,
Registers.DISCOVERY_STATUS_NOTIFICATION,
Registers.PAIRING_STATUS_NOTIFICATION,
Registers.PASSKEY_PRESSED_NOTIFICATION,
Registers.PASSKEY_REQUEST_NOTIFICATION
]

if hidpp_notification.sub_id == Notification.PAIRING_LOCK:
receiver.pairing.lock_open = bool(hidpp_notification.address & 0x01)
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
receiver.pairing.error = None
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
pair_error = ord(n.data[:1])
pair_error = ord(hidpp_notification.data[:1])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.DISCOVERY_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.discovering = n.address == 0x00
receiver.pairing.discovering = hidpp_notification.address == 0x00
reason = _("discovery lock is open") if receiver.pairing.discovering else _("discovery lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -86,33 +101,33 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.counter = receiver.pairing.device_address = None
receiver.pairing.device_authentication = receiver.pairing.device_name = None
receiver.pairing.device_passkey = None
discover_error = ord(n.data[:1])
discover_error = ord(hidpp_notification.data[:1])
if discover_error:
receiver.pairing.error = discover_string = hidpp10_constants.BOLT_PAIRING_ERRORS[discover_error]
logger.warning("bolt discovering error %d: %s", discover_error, discover_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.DEVICE_DISCOVERY_NOTIFICATION: # Bolt pairing
with notification_lock:
counter = n.address + n.data[0] * 256 # notification counter
counter = hidpp_notification.address + hidpp_notification.data[0] * 256 # notification counter
if receiver.pairing.counter is None:
receiver.pairing.counter = counter
else:
if not receiver.pairing.counter == counter:
return None
if n.data[1] == 0:
receiver.pairing.device_kind = n.data[3]
receiver.pairing.device_address = n.data[6:12]
receiver.pairing.device_authentication = n.data[14]
elif n.data[1] == 1:
receiver.pairing.device_name = n.data[3 : 3 + n.data[2]].decode("utf-8")
if hidpp_notification.data[1] == 0:
receiver.pairing.device_kind = hidpp_notification.data[3]
receiver.pairing.device_address = hidpp_notification.data[6:12]
receiver.pairing.device_authentication = hidpp_notification.data[14]
elif hidpp_notification.data[1] == 1:
receiver.pairing.device_name = hidpp_notification.data[3: 3 + hidpp_notification.data[2]].decode("utf-8")
return True

elif n.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PAIRING_STATUS_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = None
receiver.pairing.lock_open = n.address == 0x00
receiver.pairing.lock_open = hidpp_notification.address == 0x00
reason = _("pairing lock is open") if receiver.pairing.lock_open else _("pairing lock is closed")
if logger.isEnabledFor(logging.INFO):
logger.info("%s: %s", receiver, reason)
Expand All @@ -122,27 +137,27 @@ def _process_receiver_notification(receiver, n):
receiver.pairing.device_address = None
receiver.pairing.device_authentication = None
receiver.pairing.device_name = None
pair_error = n.data[0]
pair_error = hidpp_notification.data[0]
if receiver.pairing.lock_open:
receiver.pairing.new_device = None
elif n.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(n.data[7])
elif hidpp_notification.address == 0x02 and not pair_error:
receiver.pairing.new_device = receiver.register_new_device(hidpp_notification.data[7])
if pair_error:
receiver.pairing.error = error_string = hidpp10_constants.BOLT_PAIRING_ERRORS[pair_error]
receiver.pairing.new_device = None
logger.warning("pairing error %d: %s", pair_error, error_string)
receiver.changed(reason=reason)
return True

elif n.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PASSKEY_REQUEST_NOTIFICATION: # Bolt pairing
with notification_lock:
receiver.pairing.device_passkey = n.data[0:6].decode("utf-8")
receiver.pairing.device_passkey = hidpp_notification.data[0:6].decode("utf-8")
return True

elif n.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
elif hidpp_notification.sub_id == Registers.PASSKEY_PRESSED_NOTIFICATION: # Bolt pairing
return True

logger.warning("%s: unhandled notification %s", receiver, n)
logger.warning("%s: unhandled notification %s", receiver, hidpp_notification)


def _process_device_notification(device, n):
Expand Down
44 changes: 44 additions & 0 deletions tests/logitech_receiver/test_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from logitech_receiver import notifications, hidpp10_constants
from logitech_receiver.base import HIDPPNotification
from logitech_receiver.hidpp10_constants import Registers
from logitech_receiver.receiver import Receiver


# Create a mock LowLevelInterface for testing
class MockLowLevelInterface:

This comment has been minimized.

Copy link
@MattHag

MattHag Oct 11, 2024

Collaborator

mock.Mock() can propbably replace this class.

def open_path(self, path):
pass

def find_paired_node_wpid(self, receiver_path: str, index: int):
pass

def ping(self, handle, number, long_message=False):
pass

def request(self, handle, devnumber, request_id, *params, **kwargs):
pass

def close(self, handle):
pass


def test__process_receiver_notification_discovery_status_notification_bolt_pairing_error():

This comment has been minimized.

Copy link
@MattHag

MattHag Oct 11, 2024

Collaborator

With @pytest.mark you can parametrize a test, as they are they need 3 configurable inputs. Then it is a single line to cover an additonal case.

And please avoid __, I don't take care of them for the test name and always use single _.

receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
notification = HIDPPNotification(0, 0, Registers.DISCOVERY_STATUS_NOTIFICATION, 0, b'\x01')

result = notifications._process_receiver_notification(receiver, notification)

assert result
assert receiver.pairing.error == hidpp10_constants.BOLT_PAIRING_ERRORS['device_timeout']
assert receiver.pairing.new_device is None


def test__process_receiver_notification_pairing_status_notification_bolt_pairing_error():
receiver: Receiver = Receiver(MockLowLevelInterface(), None, {}, True, None, None)
notification = HIDPPNotification(0, 0, Registers.PAIRING_STATUS_NOTIFICATION, 0, b'\x02')

result = notifications._process_receiver_notification(receiver, notification)

assert result
assert receiver.pairing.error == hidpp10_constants.BOLT_PAIRING_ERRORS['failed']
assert receiver.pairing.new_device is None

1 comment on commit 620e7a6

@MattHag
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, that covers a quite huge function and makes it more readable 👏

Please sign in to comment.