Skip to content

Commit

Permalink
Changes after code review
Browse files Browse the repository at this point in the history
- Using regex to validate UI and Signer headers
- Added unit tests for header validation functions
  • Loading branch information
italo-sampaio committed Sep 5, 2024
1 parent fdea642 commit ab51987
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 26 deletions.
33 changes: 10 additions & 23 deletions middleware/admin/verify_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import json
import hashlib
import secp256k1 as ec
import re
from .misc import info, head, AdminError
from .utils import is_nonempty_hex_string
from .certificate import HSMCertificate
Expand All @@ -46,23 +47,13 @@


def validate_ui_message_header(ui_message):
minor_offset = len(UI_MESSAGE_HEADER) - 1
if ui_message[:minor_offset] != UI_MESSAGE_HEADER[:minor_offset]:
raise AdminError()
version_minor = ui_message[minor_offset]
# The minor version must be a single digit between 0 and 9
if version_minor < 48 or version_minor > 57:
raise AdminError()
header = ui_message[:len(UI_MESSAGE_HEADER)]
return re.compile(b"^HSM:UI:5.[0-9]$").match(header) is not None


def validate_signer_message_header(signer_message):
minor_offset = len(SIGNER_MESSAGE_HEADER) - 1
if signer_message[:minor_offset] != SIGNER_MESSAGE_HEADER[:minor_offset]:
raise AdminError()
version_minor = signer_message[minor_offset]
# The minor version must be a single digit between 0 and 9
if version_minor < 48 or version_minor > 57:
raise AdminError()
header = signer_message[:len(SIGNER_MESSAGE_HEADER)]
return re.compile(b"^HSM:SIGNER:5.[0-9]$").match(header) is not None


def do_verify_attestation(options):
Expand Down Expand Up @@ -142,13 +133,12 @@ def do_verify_attestation(options):
ui_message = bytes.fromhex(ui_result[1])
ui_hash = bytes.fromhex(ui_result[2])
mh_len = len(UI_MESSAGE_HEADER)
try:
validate_ui_message_header(ui_message)
except Exception:
if not validate_ui_message_header(ui_message):
raise AdminError(
f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}")

# Extract UD value, UI public key and signer version from message
# Extract UI version, UD value, UI public key and signer version from message
ui_version = re.match(b"^HSM:UI:(5.[0-9])$", ui_message[:mh_len]).group(1)
ud_value = ui_message[mh_len:mh_len + UD_VALUE_LENGTH].hex()
ui_public_key = ui_message[mh_len + UD_VALUE_LENGTH:mh_len + UD_VALUE_LENGTH +
PUBKEY_COMPRESSED_LENGTH].hex()
Expand All @@ -160,7 +150,6 @@ def do_verify_attestation(options):
mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH +
SIGNER_HASH_LENGTH + SIGNER_ITERATION_LENGTH]
signer_iteration = int.from_bytes(signer_iteration, byteorder='big', signed=False)
ui_version = ui_message[mh_len - 3:mh_len]

head(
[
Expand All @@ -187,19 +176,17 @@ def do_verify_attestation(options):
signer_message = bytes.fromhex(signer_result[1])
signer_hash = bytes.fromhex(signer_result[2])
mh_len = len(SIGNER_MESSAGE_HEADER)
try:
validate_signer_message_header(signer_message)
except Exception:
if not validate_signer_message_header(signer_message):
raise AdminError(
f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}")

signer_version = re.match(b"^HSM:SIGNER:(5.[0-9])$", signer_message[:mh_len]).group(1)
if signer_message[mh_len:] != pubkeys_hash:
reported = signer_message[mh_len:].hex()
raise AdminError(
f"Signer attestation public keys hash mismatch: expected {pubkeys_hash.hex()}"
f" but attestation reports {reported}"
)
signer_version = signer_message[mh_len - 3:mh_len]

head(
["Signer verified with public keys:"] + pubkeys_output + [
Expand Down
54 changes: 51 additions & 3 deletions middleware/tests/admin/test_verify_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
from unittest.mock import Mock, call, patch, mock_open
from admin.misc import AdminError
from admin.pubkeys import PATHS
from admin.verify_attestation import do_verify_attestation
from admin.verify_attestation import (
do_verify_attestation,
validate_ui_message_header,
validate_signer_message_header
)
import ecdsa
import hashlib
import logging

logging.disable(logging.CRITICAL)

EXPECTED_UI_DERIVATION_PATH = "m/44'/0'/0'/0/0"
SIGNER_HEADER = b"HSM:SIGNER:5.1"
UI_HEADER = b"HSM:UI:5.1"


@patch("sys.stdout.write")
Expand Down Expand Up @@ -65,14 +71,14 @@ def setUp(self):
)
self.pubkeys_hash = pubkeys_hash.digest()

self.ui_msg = b"HSM:UI:5.1" + \
self.ui_msg = UI_HEADER + \
bytes.fromhex("aa"*32) + \
bytes.fromhex("bb"*33) + \
bytes.fromhex("cc"*32) + \
bytes.fromhex("0123")
self.ui_hash = bytes.fromhex("ee" * 32)

self.signer_msg = b"HSM:SIGNER:5.1" + \
self.signer_msg = SIGNER_HEADER + \
bytes.fromhex(self.pubkeys_hash.hex())
self.signer_hash = bytes.fromhex("ff" * 32)

Expand Down Expand Up @@ -278,3 +284,45 @@ def test_verify_attestation_invalid_signer_att(self,
self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list)
self.assertEqual(("Invalid Signer attestation: error validating 'signer'"),
str(e.exception))

def test_validate_ui_message_header_valid_header(self, _):
valid_headers = [
UI_HEADER,
b"HSM:UI:5.0",
b"HSM:UI:5.5",
b"HSM:UI:5.9",
]
for header in valid_headers:
ui_message = header + self.ui_msg[:len(UI_HEADER)]
self.assertTrue(validate_ui_message_header(ui_message))

def test_validate_ui_message_header_invalid_header(self, _):
invalid_headers = [
SIGNER_HEADER,
b"HSM:UI:4.0",
b"HSM:UI:5.X",
]
for header in invalid_headers:
ui_message = header + self.ui_msg[len(UI_HEADER):]
self.assertFalse(validate_ui_message_header(ui_message))

def test_validate_signer_message_header_valid_header(self, _):
valid_headers = [
SIGNER_HEADER,
b"HSM:SIGNER:5.0",
b"HSM:SIGNER:5.5",
b"HSM:SIGNER:5.9",
]
for header in valid_headers:
signer_message = header + self.signer_msg[len(SIGNER_HEADER):]
self.assertTrue(validate_signer_message_header(signer_message))

def test_validate_signer_message_header_invalid_header(self, _):
invalid_headers = [
UI_HEADER,
b"HSM:SIGNER:4.0",
b"HSM:SIGNER:5.X",
]
for header in invalid_headers:
signer_message = header + self.signer_msg[len(SIGNER_HEADER):]
self.assertFalse(validate_signer_message_header(signer_message))

0 comments on commit ab51987

Please sign in to comment.