From ab51987f75ad0e79aedfb55fcffdb22caa1fa0a7 Mon Sep 17 00:00:00 2001 From: Italo Sampaio Date: Thu, 5 Sep 2024 12:32:51 -0300 Subject: [PATCH] Changes after code review - Using regex to validate UI and Signer headers - Added unit tests for header validation functions --- middleware/admin/verify_attestation.py | 33 ++++-------- .../tests/admin/test_verify_attestation.py | 54 +++++++++++++++++-- 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/middleware/admin/verify_attestation.py b/middleware/admin/verify_attestation.py index 514c2d60..dce67200 100644 --- a/middleware/admin/verify_attestation.py +++ b/middleware/admin/verify_attestation.py @@ -23,6 +23,7 @@ import json import hashlib import secp256k1 as ec +import re from .misc import info, head, AdminError from .utils import is_nonempty_hex_string from .certificate import HSMCertificate @@ -46,23 +47,13 @@ def validate_ui_message_header(ui_message): - minor_offset = len(UI_MESSAGE_HEADER) - 1 - if ui_message[:minor_offset] != UI_MESSAGE_HEADER[:minor_offset]: - raise AdminError() - version_minor = ui_message[minor_offset] - # The minor version must be a single digit between 0 and 9 - if version_minor < 48 or version_minor > 57: - raise AdminError() + header = ui_message[:len(UI_MESSAGE_HEADER)] + return re.compile(b"^HSM:UI:5.[0-9]$").match(header) is not None def validate_signer_message_header(signer_message): - minor_offset = len(SIGNER_MESSAGE_HEADER) - 1 - if signer_message[:minor_offset] != SIGNER_MESSAGE_HEADER[:minor_offset]: - raise AdminError() - version_minor = signer_message[minor_offset] - # The minor version must be a single digit between 0 and 9 - if version_minor < 48 or version_minor > 57: - raise AdminError() + header = signer_message[:len(SIGNER_MESSAGE_HEADER)] + return re.compile(b"^HSM:SIGNER:5.[0-9]$").match(header) is not None def do_verify_attestation(options): @@ -142,13 +133,12 @@ def do_verify_attestation(options): ui_message = bytes.fromhex(ui_result[1]) ui_hash = bytes.fromhex(ui_result[2]) mh_len = len(UI_MESSAGE_HEADER) - try: - validate_ui_message_header(ui_message) - except Exception: + if not validate_ui_message_header(ui_message): raise AdminError( f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}") - # Extract UD value, UI public key and signer version from message + # Extract UI version, UD value, UI public key and signer version from message + ui_version = re.match(b"^HSM:UI:(5.[0-9])$", ui_message[:mh_len]).group(1) ud_value = ui_message[mh_len:mh_len + UD_VALUE_LENGTH].hex() ui_public_key = ui_message[mh_len + UD_VALUE_LENGTH:mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH].hex() @@ -160,7 +150,6 @@ def do_verify_attestation(options): mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH + SIGNER_HASH_LENGTH + SIGNER_ITERATION_LENGTH] signer_iteration = int.from_bytes(signer_iteration, byteorder='big', signed=False) - ui_version = ui_message[mh_len - 3:mh_len] head( [ @@ -187,19 +176,17 @@ def do_verify_attestation(options): signer_message = bytes.fromhex(signer_result[1]) signer_hash = bytes.fromhex(signer_result[2]) mh_len = len(SIGNER_MESSAGE_HEADER) - try: - validate_signer_message_header(signer_message) - except Exception: + if not validate_signer_message_header(signer_message): raise AdminError( f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}") + signer_version = re.match(b"^HSM:SIGNER:(5.[0-9])$", signer_message[:mh_len]).group(1) if signer_message[mh_len:] != pubkeys_hash: reported = signer_message[mh_len:].hex() raise AdminError( f"Signer attestation public keys hash mismatch: expected {pubkeys_hash.hex()}" f" but attestation reports {reported}" ) - signer_version = signer_message[mh_len - 3:mh_len] head( ["Signer verified with public keys:"] + pubkeys_output + [ diff --git a/middleware/tests/admin/test_verify_attestation.py b/middleware/tests/admin/test_verify_attestation.py index c651c2c7..37d1f467 100644 --- a/middleware/tests/admin/test_verify_attestation.py +++ b/middleware/tests/admin/test_verify_attestation.py @@ -25,7 +25,11 @@ from unittest.mock import Mock, call, patch, mock_open from admin.misc import AdminError from admin.pubkeys import PATHS -from admin.verify_attestation import do_verify_attestation +from admin.verify_attestation import ( + do_verify_attestation, + validate_ui_message_header, + validate_signer_message_header +) import ecdsa import hashlib import logging @@ -33,6 +37,8 @@ logging.disable(logging.CRITICAL) EXPECTED_UI_DERIVATION_PATH = "m/44'/0'/0'/0/0" +SIGNER_HEADER = b"HSM:SIGNER:5.1" +UI_HEADER = b"HSM:UI:5.1" @patch("sys.stdout.write") @@ -65,14 +71,14 @@ def setUp(self): ) self.pubkeys_hash = pubkeys_hash.digest() - self.ui_msg = b"HSM:UI:5.1" + \ + self.ui_msg = UI_HEADER + \ bytes.fromhex("aa"*32) + \ bytes.fromhex("bb"*33) + \ bytes.fromhex("cc"*32) + \ bytes.fromhex("0123") self.ui_hash = bytes.fromhex("ee" * 32) - self.signer_msg = b"HSM:SIGNER:5.1" + \ + self.signer_msg = SIGNER_HEADER + \ bytes.fromhex(self.pubkeys_hash.hex()) self.signer_hash = bytes.fromhex("ff" * 32) @@ -278,3 +284,45 @@ def test_verify_attestation_invalid_signer_att(self, self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) self.assertEqual(("Invalid Signer attestation: error validating 'signer'"), str(e.exception)) + + def test_validate_ui_message_header_valid_header(self, _): + valid_headers = [ + UI_HEADER, + b"HSM:UI:5.0", + b"HSM:UI:5.5", + b"HSM:UI:5.9", + ] + for header in valid_headers: + ui_message = header + self.ui_msg[:len(UI_HEADER)] + self.assertTrue(validate_ui_message_header(ui_message)) + + def test_validate_ui_message_header_invalid_header(self, _): + invalid_headers = [ + SIGNER_HEADER, + b"HSM:UI:4.0", + b"HSM:UI:5.X", + ] + for header in invalid_headers: + ui_message = header + self.ui_msg[len(UI_HEADER):] + self.assertFalse(validate_ui_message_header(ui_message)) + + def test_validate_signer_message_header_valid_header(self, _): + valid_headers = [ + SIGNER_HEADER, + b"HSM:SIGNER:5.0", + b"HSM:SIGNER:5.5", + b"HSM:SIGNER:5.9", + ] + for header in valid_headers: + signer_message = header + self.signer_msg[len(SIGNER_HEADER):] + self.assertTrue(validate_signer_message_header(signer_message)) + + def test_validate_signer_message_header_invalid_header(self, _): + invalid_headers = [ + UI_HEADER, + b"HSM:SIGNER:4.0", + b"HSM:SIGNER:5.X", + ] + for header in invalid_headers: + signer_message = header + self.signer_msg[len(SIGNER_HEADER):] + self.assertFalse(validate_signer_message_header(signer_message))