From 0663ff7c132200fddf7065270d1597daf50a97dd Mon Sep 17 00:00:00 2001 From: Ariel Mendelzon Date: Sat, 7 Dec 2024 02:42:04 +1300 Subject: [PATCH] SGX attestation gathering (#228) - Unified certificates in certificate module - Moved existing HSMCertificate and HSMCertificateElement to certificate_v1 - Added new HSMCertificateV2 and HSMCertificateV2Element (and subclasses) - Added new attestation gathering operation for adm_sgx - Added new attestation gathering module used within adm_sgx - Factored out user-defined value gathering into admin/misc module - Current SGX attestation saving to file is directly translated from value grabbed with HSM2Dongle - Added and updated unit tests --- middleware/adm_ledger.py | 4 +- middleware/adm_sgx.py | 15 +- middleware/admin/certificate.py | 230 +--------------- middleware/admin/certificate_v1.py | 251 ++++++++++++++++++ middleware/admin/certificate_v2.py | 49 ++++ middleware/admin/ledger_attestation.py | 28 +- middleware/admin/misc.py | 28 ++ middleware/admin/sgx_attestation.py | 71 +++++ middleware/tests/admin/test_adm_sgx.py | 40 ++- ..._certificate.py => test_certificate_v1.py} | 4 +- ...ment.py => test_certificate_v1_element.py} | 2 +- .../tests/admin/test_ledger_attestation.py | 81 +----- middleware/tests/admin/test_misc.py | 63 +++++ .../tests/admin/test_sgx_attestation.py | 113 ++++++++ setup.cfg | 4 +- 15 files changed, 654 insertions(+), 329 deletions(-) create mode 100644 middleware/admin/certificate_v1.py create mode 100644 middleware/admin/certificate_v2.py create mode 100644 middleware/admin/sgx_attestation.py rename middleware/tests/admin/{test_certificate.py => test_certificate_v1.py} (99%) rename middleware/tests/admin/{test_certificate_element.py => test_certificate_v1_element.py} (99%) create mode 100644 middleware/tests/admin/test_misc.py create mode 100644 middleware/tests/admin/test_sgx_attestation.py diff --git a/middleware/adm_ledger.py b/middleware/adm_ledger.py index 758060e6..6ba72dce 100644 --- a/middleware/adm_ledger.py +++ b/middleware/adm_ledger.py @@ -25,7 +25,7 @@ import logging from ledger.hsm2dongle import HSM2DongleError from comm.platform import Platform -from admin.misc import not_implemented, info, AdminError +from admin.misc import not_implemented, info, AdminError, DEFAULT_ATT_UD_SOURCE from admin.unlock import do_unlock from admin.onboard import do_onboard from admin.pubkeys import do_get_pubkeys @@ -34,8 +34,6 @@ from admin.verify_ledger_attestation import do_verify_attestation from admin.authorize_signer import do_authorize_signer -DEFAULT_ATT_UD_SOURCE = "https://public-node.rsk.co" - def main(): logging.disable(logging.CRITICAL) diff --git a/middleware/adm_sgx.py b/middleware/adm_sgx.py index 6a4dcf0c..d6a8f289 100644 --- a/middleware/adm_sgx.py +++ b/middleware/adm_sgx.py @@ -25,11 +25,12 @@ import logging from ledger.hsm2dongle import HSM2DongleError from comm.platform import Platform -from admin.misc import not_implemented, info, AdminError +from admin.misc import not_implemented, info, AdminError, DEFAULT_ATT_UD_SOURCE from admin.unlock import do_unlock from admin.onboard import do_onboard from admin.pubkeys import do_get_pubkeys from admin.changepin import do_changepin +from admin.sgx_attestation import do_attestation def main(): @@ -40,6 +41,7 @@ def main(): "onboard": do_onboard, "pubkeys": do_get_pubkeys, "changepin": do_changepin, + "attestation": do_attestation, } parser = ArgumentParser(description="SGX powHSM Administrative tool") @@ -79,7 +81,7 @@ def main(): "-o", "--output", dest="output_file_path", - help="Output file (only valid for 'onboard' and 'pubkeys' " + help="Output file (only valid for 'onboard', 'pubkeys' and 'attestation' " "operations).", ) parser.add_argument( @@ -92,6 +94,15 @@ def main(): default=False, const=True, ) + parser.add_argument( + "--attudsource", + dest="attestation_ud_source", + default=DEFAULT_ATT_UD_SOURCE, + help="JSON-RPC endpoint used to retrieve the latest RSK block hash used " + "as the user defined value for the attestation (defaults to " + f"{DEFAULT_ATT_UD_SOURCE}). Can also specify a 32-byte hex string to use as" + " the value.", + ) parser.add_argument( "-v", "--verbose", diff --git a/middleware/admin/certificate.py b/middleware/admin/certificate.py index 29ce300a..64b3bb6b 100644 --- a/middleware/admin/certificate.py +++ b/middleware/admin/certificate.py @@ -20,231 +20,5 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -import json -import hmac -import secp256k1 as ec -import hashlib -from .utils import is_nonempty_hex_string - - -class HSMCertificate: - VERSION = 1 # Only supported version - ROOT_ELEMENT = "root" - - @staticmethod - def from_jsonfile(path): - try: - with open(path, "r") as file: - certificate_map = json.loads(file.read()) - - if type(certificate_map) != dict: - raise ValueError( - "JSON file must contain an object as a top level element") - - return HSMCertificate(certificate_map) - except (ValueError, json.JSONDecodeError) as e: - raise ValueError('Unable to read HSM certificate from "%s": %s' % - (path, str(e))) - - def __init__(self, certificate_map=None): - self._targets = [] - self._elements = {} - - if certificate_map is not None: - self._parse(certificate_map) - - def validate_and_get_values(self, raw_root_pubkey_hex): - # Parse the root public key - try: - root_pubkey = ec.PublicKey(bytes.fromhex(raw_root_pubkey_hex), raw=True) - except Exception: - return dict([(target, (False, self.ROOT_ELEMENT)) - for target in self._targets]) - - result = {} - for target in self._targets: - # Build the chain from the target to the root - chain = [] - current = self._elements[target] - while True: - if current.signed_by == self.ROOT_ELEMENT: - break - chain.append(current) - current = self._elements[current.signed_by] - - # Validate the chain from root to leaf - # If valid, return True and the value of the leaf - # If not valid, return False and the name of the element that - # failed the validation - current_pubkey = root_pubkey - while True: - # Validate this element - if not current.is_valid(current_pubkey): - result[target] = (False, current.name) - break - # Reached the leaf? => valid! - if len(chain) == 0: - result[target] = (True, current.get_value(), current.tweak) - break - - current_pubkey = ec.PublicKey(bytes.fromhex(current.get_value()), - raw=True) - current = chain.pop() - - return result - - def add_element(self, element): - if type(element) != HSMCertificateElement: - raise ValueError( - f"Expected an HSMCertificateElement but got a {type(element)}") - self._elements[element.name] = element - - def clear_targets(self): - self._targets = [] - - def add_target(self, target): - if target not in self._elements: - raise ValueError(f"Target {target} not in elements") - self._targets.append(target) - - def to_dict(self): - return { - "version": self.VERSION, - "targets": self._targets, - "elements": list(map(lambda e: e.to_dict(), self._elements.values())), - } - - def save_to_jsonfile(self, path): - with open(path, "w") as file: - file.write("%s\n" % json.dumps(self.to_dict(), indent=2)) - - def _parse(self, certificate_map): - if "version" not in certificate_map or certificate_map["version"] != self.VERSION: - raise ValueError( - "Invalid or unsupported HSM certificate version " - f"(current version is {self.VERSION})" - ) - - if "targets" not in certificate_map or type(certificate_map["targets"]) != list: - raise ValueError("Missing or invalid targets") - - self._targets = certificate_map["targets"] - - if "elements" not in certificate_map: - raise ValueError("Missing elements") - - for item in certificate_map["elements"]: - element = HSMCertificateElement(item) - self._elements[item["name"]] = element - - # Sanity: check each target has a path to the root authority - for target in self._targets: - if target not in self._elements: - raise ValueError(f"Target {target} not in elements") - - visited = [] - current = self._elements[target] - while True: - if current.name in visited: - raise ValueError( - f"Target {target} has not got a path to the root authority") - if current.signed_by == self.ROOT_ELEMENT: - break - if current.signed_by not in self._elements: - raise ValueError(f"Signer {current.signed_by} not in elements") - visited.append(current.name) - current = self._elements[current.signed_by] - - -class HSMCertificateElement: - VALID_NAMES = ["device", "attestation", "ui", "signer"] - EXTRACTORS = { - "device": lambda b: b[-65:], - "attestation": lambda b: b[1:], - "ui": lambda b: b[:], - "signer": lambda b: b[:], - } - - def __init__(self, element_map): - if ("name" not in element_map - or element_map["name"] not in self.VALID_NAMES): - raise ValueError("Missing or invalid name for HSM certificate element") - self._name = element_map["name"] - - if "signed_by" not in element_map: - raise ValueError("Missing certifier for HSM certificate element") - self._signed_by = element_map["signed_by"] - - self._tweak = None - if "tweak" in element_map: - if not is_nonempty_hex_string(element_map["tweak"]): - raise ValueError( - f"Invalid signer tweak for HSM certificate element {self.name}") - self._tweak = element_map["tweak"] - - if "message" not in element_map or not is_nonempty_hex_string( - element_map["message"]): - raise ValueError( - f"Missing or invalid message for HSM certificate element {self.name}") - self._message = element_map["message"] - - if "signature" not in element_map or not is_nonempty_hex_string( - element_map["signature"]): - raise ValueError( - f"Missing or invalid signature for HSM certificate element {self.name}") - self._signature = element_map["signature"] - - @property - def name(self): - return self._name - - @property - def signed_by(self): - return self._signed_by - - @property - def tweak(self): - return self._tweak - - @property - def message(self): - return self._message - - @property - def signature(self): - return self._signature - - def to_dict(self): - result = { - "name": self.name, - "message": self.message, - "signature": self.signature, - "signed_by": self.signed_by, - } - - if self.tweak is not None: - result["tweak"] = self.tweak - - return result - - def is_valid(self, certifier_pubkey): - try: - message = bytes.fromhex(self.message) - - verifier_pubkey = certifier_pubkey - if self.tweak is not None: - tweak = hmac.new( - bytes.fromhex(self.tweak), - certifier_pubkey.serialize(compressed=False), - hashlib.sha256, - ).digest() - - verifier_pubkey = verifier_pubkey.tweak_add(tweak) - - return verifier_pubkey.ecdsa_verify( - message, verifier_pubkey.ecdsa_deserialize(bytes.fromhex(self.signature))) - except Exception: - return False - - def get_value(self): - return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex() +from .certificate_v1 import HSMCertificate, HSMCertificateElement +from .certificate_v2 import HSMCertificateV2, HSMCertificateV2Element diff --git a/middleware/admin/certificate_v1.py b/middleware/admin/certificate_v1.py new file mode 100644 index 00000000..a37802ed --- /dev/null +++ b/middleware/admin/certificate_v1.py @@ -0,0 +1,251 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import json +import hmac +import secp256k1 as ec +import hashlib +from .utils import is_nonempty_hex_string + + +class HSMCertificateElement: + VALID_NAMES = ["device", "attestation", "ui", "signer"] + EXTRACTORS = { + "device": lambda b: b[-65:], + "attestation": lambda b: b[1:], + "ui": lambda b: b[:], + "signer": lambda b: b[:], + } + + def __init__(self, element_map): + if ("name" not in element_map + or element_map["name"] not in self.VALID_NAMES): + raise ValueError("Missing or invalid name for HSM certificate element") + self._name = element_map["name"] + + if "signed_by" not in element_map: + raise ValueError("Missing certifier for HSM certificate element") + self._signed_by = element_map["signed_by"] + + self._tweak = None + if "tweak" in element_map: + if not is_nonempty_hex_string(element_map["tweak"]): + raise ValueError( + f"Invalid signer tweak for HSM certificate element {self.name}") + self._tweak = element_map["tweak"] + + if "message" not in element_map or not is_nonempty_hex_string( + element_map["message"]): + raise ValueError( + f"Missing or invalid message for HSM certificate element {self.name}") + self._message = element_map["message"] + + if "signature" not in element_map or not is_nonempty_hex_string( + element_map["signature"]): + raise ValueError( + f"Missing or invalid signature for HSM certificate element {self.name}") + self._signature = element_map["signature"] + + @property + def name(self): + return self._name + + @property + def signed_by(self): + return self._signed_by + + @property + def tweak(self): + return self._tweak + + @property + def message(self): + return self._message + + @property + def signature(self): + return self._signature + + def to_dict(self): + result = { + "name": self.name, + "message": self.message, + "signature": self.signature, + "signed_by": self.signed_by, + } + + if self.tweak is not None: + result["tweak"] = self.tweak + + return result + + def is_valid(self, certifier_pubkey): + try: + message = bytes.fromhex(self.message) + + verifier_pubkey = certifier_pubkey + if self.tweak is not None: + tweak = hmac.new( + bytes.fromhex(self.tweak), + certifier_pubkey.serialize(compressed=False), + hashlib.sha256, + ).digest() + + verifier_pubkey = verifier_pubkey.tweak_add(tweak) + + return verifier_pubkey.ecdsa_verify( + message, verifier_pubkey.ecdsa_deserialize(bytes.fromhex(self.signature))) + except Exception: + return False + + def get_value(self): + return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex() + + +class HSMCertificate: + VERSION = 1 # Only supported version + ROOT_ELEMENT = "root" + ELEMENT_BASE_CLASS = HSMCertificateElement + + @staticmethod + def from_jsonfile(path): + try: + with open(path, "r") as file: + certificate_map = json.loads(file.read()) + + if type(certificate_map) != dict: + raise ValueError( + "JSON file must contain an object as a top level element") + + return HSMCertificate(certificate_map) + except (ValueError, json.JSONDecodeError) as e: + raise ValueError('Unable to read HSM certificate from "%s": %s' % + (path, str(e))) + + def __init__(self, certificate_map=None): + self._targets = [] + self._elements = {} + + if certificate_map is not None: + self._parse(certificate_map) + + def validate_and_get_values(self, raw_root_pubkey_hex): + # Parse the root public key + try: + root_pubkey = ec.PublicKey(bytes.fromhex(raw_root_pubkey_hex), raw=True) + except Exception: + return dict([(target, (False, self.ROOT_ELEMENT)) + for target in self._targets]) + + result = {} + for target in self._targets: + # Build the chain from the target to the root + chain = [] + current = self._elements[target] + while True: + if current.signed_by == self.ROOT_ELEMENT: + break + chain.append(current) + current = self._elements[current.signed_by] + + # Validate the chain from root to leaf + # If valid, return True and the value of the leaf + # If not valid, return False and the name of the element that + # failed the validation + current_pubkey = root_pubkey + while True: + # Validate this element + if not current.is_valid(current_pubkey): + result[target] = (False, current.name) + break + # Reached the leaf? => valid! + if len(chain) == 0: + result[target] = (True, current.get_value(), current.tweak) + break + + current_pubkey = ec.PublicKey(bytes.fromhex(current.get_value()), + raw=True) + current = chain.pop() + + return result + + def add_element(self, element): + if not isinstance(element, self.ELEMENT_BASE_CLASS): + raise ValueError( + f"Expected an HSMCertificateElement but got a {type(element)}") + self._elements[element.name] = element + + def clear_targets(self): + self._targets = [] + + def add_target(self, target): + if target not in self._elements: + raise ValueError(f"Target {target} not in elements") + self._targets.append(target) + + def to_dict(self): + return { + "version": self.VERSION, + "targets": self._targets, + "elements": list(map(lambda e: e.to_dict(), self._elements.values())), + } + + def save_to_jsonfile(self, path): + with open(path, "w") as file: + file.write("%s\n" % json.dumps(self.to_dict(), indent=2)) + + def _parse(self, certificate_map): + if "version" not in certificate_map or certificate_map["version"] != self.VERSION: + raise ValueError( + "Invalid or unsupported HSM certificate version " + f"(current version is {self.VERSION})" + ) + + if "targets" not in certificate_map or type(certificate_map["targets"]) != list: + raise ValueError("Missing or invalid targets") + + self._targets = certificate_map["targets"] + + if "elements" not in certificate_map: + raise ValueError("Missing elements") + + for item in certificate_map["elements"]: + element = HSMCertificateElement(item) + self._elements[item["name"]] = element + + # Sanity: check each target has a path to the root authority + for target in self._targets: + if target not in self._elements: + raise ValueError(f"Target {target} not in elements") + + visited = [] + current = self._elements[target] + while True: + if current.name in visited: + raise ValueError( + f"Target {target} has not got a path to the root authority") + if current.signed_by == self.ROOT_ELEMENT: + break + if current.signed_by not in self._elements: + raise ValueError(f"Signer {current.signed_by} not in elements") + visited.append(current.name) + current = self._elements[current.signed_by] diff --git a/middleware/admin/certificate_v2.py b/middleware/admin/certificate_v2.py new file mode 100644 index 00000000..61e74ce5 --- /dev/null +++ b/middleware/admin/certificate_v2.py @@ -0,0 +1,49 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from .certificate_v1 import HSMCertificate + + +class HSMCertificateV2Element: + # TODO: actual logic and subclasses + def __init__(self, element_map): + self.element_map = element_map + + # Stub + def name(self): + return "attestation" + + def to_dict(self): + return self.element_map + + +class HSMCertificateV2(HSMCertificate): + VERSION = 2 + ELEMENT_BASE_CLASS = HSMCertificateV2Element + + def validate_and_get_values(self, raw_root_pubkey_hex): + # TODO + pass + + def _parse(self, certificate_map): + # TODO + pass diff --git a/middleware/admin/ledger_attestation.py b/middleware/admin/ledger_attestation.py index a10fa2e0..9242d08e 100644 --- a/middleware/admin/ledger_attestation.py +++ b/middleware/admin/ledger_attestation.py @@ -20,13 +20,10 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from .misc import info, head, get_hsm, dispose_hsm, AdminError, wait_for_reconnection -from .utils import is_hex_string_of_length, normalize_hex_string +from .misc import info, head, get_hsm, dispose_hsm, AdminError, wait_for_reconnection, \ + get_ud_value_for_attestation from .unlock import do_unlock from .certificate import HSMCertificate, HSMCertificateElement -from .rsk_client import RskClient, RskClientError - -UD_VALUE_LENGTH = 32 def do_attestation(options): @@ -46,24 +43,9 @@ def do_attestation(options): except Exception as e: raise AdminError(f"While loading the attestation certificate file: {str(e)}") - # Get the UD value for the UI attestation - info("Gathering user-defined UI attestation value... ", options.verbose) - - if is_hex_string_of_length(options.attestation_ud_source, - UD_VALUE_LENGTH, - allow_prefix=True): - ud_value = normalize_hex_string(options.attestation_ud_source) - else: - try: - rsk_client = RskClient(options.attestation_ud_source) - best_block = rsk_client.get_block_by_number( - rsk_client.get_best_block_number()) - ud_value = best_block["hash"][2:] - if not is_hex_string_of_length(ud_value, UD_VALUE_LENGTH): - raise ValueError(f"Got invalid best block from RSK server: {ud_value}") - except RskClientError as e: - raise AdminError(f"While fetching the best RSK block hash: {str(e)}") - + # Get the UD value for the attestations + info("Gathering user-defined attestation value... ", options.verbose) + ud_value = get_ud_value_for_attestation(options.attestation_ud_source) info(f"Using {ud_value} as the user-defined attestation value") # Attempt to unlock the device without exiting the UI diff --git a/middleware/admin/misc.py b/middleware/admin/misc.py index c8dcd59b..e72e46f7 100644 --- a/middleware/admin/misc.py +++ b/middleware/admin/misc.py @@ -29,6 +29,9 @@ from .dongle_admin import DongleAdmin from .dongle_eth import DongleEth from comm.platform import Platform +from .utils import is_hex_string_of_length, normalize_hex_string +from .rsk_client import RskClient, RskClientError + PIN_ERROR_MESSAGE = ("Invalid pin given. It must be exactly 8 alphanumeric " "characters with at least one alphabetic character.") @@ -37,6 +40,9 @@ SIGNER_WAIT_TIME = 3 # seconds +ATTESTATION_UD_VALUE_LENGTH = 32 # bytes +DEFAULT_ATT_UD_SOURCE = "https://public-node.rsk.co" + class AdminError(RuntimeError): pass @@ -127,3 +133,25 @@ def ask_for_pin(any_pin): def wait_for_reconnection(): time.sleep(SIGNER_WAIT_TIME) + + +def get_ud_value_for_attestation(user_provided_ud_source): + if is_hex_string_of_length(user_provided_ud_source, + ATTESTATION_UD_VALUE_LENGTH, + allow_prefix=True): + # Final value provided by user + ud_value = normalize_hex_string(user_provided_ud_source) + else: + # Final value taken from a specific Rootstock node + try: + rsk_client = RskClient(user_provided_ud_source) + best_block = rsk_client.get_block_by_number( + rsk_client.get_best_block_number()) + ud_value = best_block["hash"][2:] + if not is_hex_string_of_length(ud_value, ATTESTATION_UD_VALUE_LENGTH): + raise ValueError("Got invalid best block from " + f"Rootstock server: {ud_value}") + except RskClientError as e: + raise AdminError(f"While fetching the best Rootstock block hash: {str(e)}") + + return ud_value diff --git a/middleware/admin/sgx_attestation.py b/middleware/admin/sgx_attestation.py new file mode 100644 index 00000000..c261f0ba --- /dev/null +++ b/middleware/admin/sgx_attestation.py @@ -0,0 +1,71 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from .misc import info, head, get_hsm, AdminError, get_ud_value_for_attestation +from .unlock import do_unlock +from .certificate import HSMCertificateV2, HSMCertificateV2Element + + +def do_attestation(options): + head("### -> Get powHSM attestation", fill="#") + hsm = None + + # Require an output file + if options.output_file_path is None: + raise AdminError("No output file path given") + + # Get the UD value for the attestation + info("Gathering user-defined attestation value... ", options.verbose) + ud_value = get_ud_value_for_attestation(options.attestation_ud_source) + info(f"Using {ud_value} as the user-defined attestation value") + + # Attempt to unlock the device + if not options.no_unlock: + try: + do_unlock(options, label=False) + except Exception as e: + raise AdminError(f"Failed to unlock device: {str(e)}") + + # Connection + hsm = get_hsm(options.verbose) + + # powHSM attestation + info("Gathering powHSM attestation... ", options.verbose) + try: + powhsm_attestation = hsm.get_powhsm_attestation(ud_value) + except Exception as e: + raise AdminError(f"Failed to gather powHSM attestation: {str(e)}") + info("powHSM attestation gathered") + + hsm.disconnect() + + # Generate and save the attestation certificate + info("Generating the attestation certificate... ", options.verbose) + + att_cert = HSMCertificateV2() + # TODO: + # 1. Parse envelope + # 2. Add actual elements of the certificate + att_cert.add_element(HSMCertificateV2Element(powhsm_attestation)) + att_cert.save_to_jsonfile(options.output_file_path) + + info(f"Attestation certificate saved to {options.output_file_path}") diff --git a/middleware/tests/admin/test_adm_sgx.py b/middleware/tests/admin/test_adm_sgx.py index d5a5f5cb..53c139cd 100644 --- a/middleware/tests/admin/test_adm_sgx.py +++ b/middleware/tests/admin/test_adm_sgx.py @@ -30,7 +30,7 @@ logging.disable(logging.CRITICAL) -class TestAdmLedger(TestCase): +class TestAdmSgx(TestCase): def setUp(self): self.old_stdout = sys.stdout self.DEFAULT_OPTIONS = { @@ -39,6 +39,7 @@ def setUp(self): "any_pin": False, "new_pin": None, "no_unlock": False, + "attestation_ud_source": "https://public-node.rsk.co", "operation": None, "output_file_path": None, "pin": None, @@ -169,3 +170,40 @@ def test_changepin(self, do_changepin): self.assertTrue(do_changepin.called) self.assertEqual(do_changepin.call_count, 2) self.assertEqual(expected_call_args_list, do_changepin.call_args_list) + + @patch("adm_sgx.do_attestation") + def test_attestation(self, do_attestation): + expected_options = { + **self.DEFAULT_OPTIONS, + 'attestation_ud_source': 'user-defined-source', + 'operation': 'attestation', + 'output_file_path': 'out-path', + 'pin': 'a-pin', + } + expected_call_args_list = [ + call(Namespace(**expected_options)), + call(Namespace(**{**expected_options, "no_unlock": True})) + ] + + with patch('sys.argv', ['adm_sgx.py', + '-p', 'a-pin', + '-o', 'out-path', + '--attudsource', 'user-defined-source', + 'attestation']): + with self.assertRaises(SystemExit) as e: + main() + self.assertEqual(e.exception.code, 0) + + with patch('sys.argv', ['adm_sgx.py', + '--pin', 'a-pin', + '--output', 'out-path', + '--attudsource', 'user-defined-source', + '--nounlock', + 'attestation']): + with self.assertRaises(SystemExit) as e: + main() + self.assertEqual(e.exception.code, 0) + + self.assertTrue(do_attestation.called) + self.assertEqual(do_attestation.call_count, 2) + self.assertEqual(expected_call_args_list, do_attestation.call_args_list) diff --git a/middleware/tests/admin/test_certificate.py b/middleware/tests/admin/test_certificate_v1.py similarity index 99% rename from middleware/tests/admin/test_certificate.py rename to middleware/tests/admin/test_certificate_v1.py index 856e414e..591cfc91 100644 --- a/middleware/tests/admin/test_certificate.py +++ b/middleware/tests/admin/test_certificate_v1.py @@ -29,7 +29,7 @@ from admin.certificate import HSMCertificate, HSMCertificateElement -class TestCertificate(TestCase): +class TestHSMCertificate(TestCase): def test_create_valid_certificate_ok(self): cert = HSMCertificate({ "version": 1, @@ -155,7 +155,7 @@ def test_create_certificate_missing_elements(self): "targets": ["attestation", "device"] }) - @patch('admin.certificate.HSMCertificateElement') + @patch('admin.certificate_v1.HSMCertificateElement') def test_create_certificate_invalid_element(self, certElementMock): certElementMock.side_effect = ValueError() with self.assertRaises(ValueError): diff --git a/middleware/tests/admin/test_certificate_element.py b/middleware/tests/admin/test_certificate_v1_element.py similarity index 99% rename from middleware/tests/admin/test_certificate_element.py rename to middleware/tests/admin/test_certificate_v1_element.py index 74087f1d..ff2f206c 100644 --- a/middleware/tests/admin/test_certificate_element.py +++ b/middleware/tests/admin/test_certificate_v1_element.py @@ -30,7 +30,7 @@ from admin.certificate import HSMCertificateElement -class TestCertificateElement(TestCase): +class TestHSMCertificateElement(TestCase): def test_create_certificate_element_ok(self): element = HSMCertificateElement({ "name": "device", diff --git a/middleware/tests/admin/test_ledger_attestation.py b/middleware/tests/admin/test_ledger_attestation.py index 935b1cc2..27ab2672 100644 --- a/middleware/tests/admin/test_ledger_attestation.py +++ b/middleware/tests/admin/test_ledger_attestation.py @@ -28,7 +28,6 @@ from admin.ledger_attestation import do_attestation from admin.certificate import HSMCertificate from admin.misc import AdminError -from admin.rsk_client import RskClientError @patch("sys.stdout.write") @@ -79,68 +78,11 @@ def setupDefaultOptions(self): options.attestation_ud_source = 'aa' * 32 return options - @patch('admin.ledger_attestation.RskClient') - def test_attestation_ok_provided_ud_value(self, - RskClient, - from_jsonfile, - get_hsm, - *_): + @patch('admin.ledger_attestation.get_ud_value_for_attestation') + def test_attestation_ok(self, get_ud_value_for_attestation, + from_jsonfile, get_hsm, *_): self.setupMocks(from_jsonfile, get_hsm) - options = self.setupDefaultOptions() - with patch('builtins.open', mock_open()) as file_mock: - do_attestation(options) - - self.assertEqual([call(options.attestation_ud_source)], - get_hsm.return_value.get_ui_attestation.call_args_list) - self.assertEqual([], RskClient.call_args_list) - self.assertEqual([call(options.attestation_certificate_file_path)], - from_jsonfile.call_args_list) - self.assertEqual([call(options.verbose), call(options.verbose)], - get_hsm.call_args_list) - self.assertEqual([call(options.output_file_path, 'w')], file_mock.call_args_list) - self.assertEqual([call("%s\n" % json.dumps({ - 'version': 1, - 'targets': [ - 'ui', - 'signer' - ], - 'elements': [ - { - "name": "attestation", - "message": '11' * 32, - "signature": '22' * 32, - "signed_by": "device" - }, - { - "name": "device", - "message": '33' * 32, - "signature": '44' * 32, - "signed_by": "root" - }, - { - 'name': 'ui', - 'message': 'aa' * 32, - 'signature': 'bb' * 32, - 'signed_by': 'attestation', - 'tweak': 'cc' * 32 - }, - { - 'name': 'signer', - 'message': 'dd' * 32, - 'signature': 'ee' * 32, - 'signed_by': 'attestation', - 'tweak': 'ff' * 32 - } - ] - }, indent=2))], - file_mock.return_value.write.call_args_list) - - @patch('admin.ledger_attestation.RskClient') - def test_attestation_ok_get_ud_value(self, RskClient, from_jsonfile, get_hsm, *_): - self.setupMocks(from_jsonfile, get_hsm) - RskClient.return_value = Mock() - rsk_client = RskClient.return_value - rsk_client.get_block_by_number = Mock(return_value={'hash': '0x' + 'bb' * 32}) + get_ud_value_for_attestation.return_value = 'bb'*32 options = self.setupDefaultOptions() options.attestation_ud_source = 'an-url' @@ -149,8 +91,7 @@ def test_attestation_ok_get_ud_value(self, RskClient, from_jsonfile, get_hsm, *_ self.assertEqual([call(options.attestation_certificate_file_path)], from_jsonfile.call_args_list) - self.assertEqual([call('an-url')], RskClient.call_args_list) - self.assertTrue(rsk_client.get_block_by_number.called) + self.assertEqual([call('an-url')], get_ud_value_for_attestation.call_args_list) self.assertNotEqual([call(options.attestation_ud_source)], get_hsm.return_value.get_ui_attestation.call_args_list) self.assertEqual([call('bb' * 32)], @@ -228,15 +169,19 @@ def test_attestation_invalid_jsonfile(self, from_jsonfile, get_hsm, *_): self.assertFalse(get_hsm.called) self.assertFalse(file_mock.return_value.write.called) - @patch('admin.ledger_attestation.RskClient') - def test_attestation_rsk_client_error(self, RskClient, from_jsonfile, get_hsm, *_): + @patch('admin.ledger_attestation.get_ud_value_for_attestation') + def test_attestation_get_ud_value_for_attestation_error(self, + get_ud_value_for_attestation, + from_jsonfile, get_hsm, *_): self.setupMocks(from_jsonfile, get_hsm) - RskClient.side_effect = RskClientError('error-msg') + get_ud_value_for_attestation.side_effect = AdminError('error-msg') options = self.setupDefaultOptions() - options.attestation_ud_source = 'an-url' + options.attestation_ud_source = 'another-url' with patch('builtins.open', mock_open()) as file_mock: with self.assertRaises(AdminError): do_attestation(options) + self.assertEqual([call('another-url')], + get_ud_value_for_attestation.call_args_list) self.assertTrue(from_jsonfile.called) self.assertFalse(get_hsm.called) self.assertFalse(file_mock.return_value.write.called) diff --git a/middleware/tests/admin/test_misc.py b/middleware/tests/admin/test_misc.py new file mode 100644 index 00000000..d6ac6279 --- /dev/null +++ b/middleware/tests/admin/test_misc.py @@ -0,0 +1,63 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from unittest import TestCase +from unittest.mock import Mock, patch +from admin.misc import get_ud_value_for_attestation, RskClientError, AdminError + +import logging + +logging.disable(logging.CRITICAL) + + +class TestGetUdValueForAttestation(TestCase): + def test_hex_string(self): + self.assertEqual("aa"*32, get_ud_value_for_attestation("aa"*32)) + self.assertEqual("aa"*32, get_ud_value_for_attestation("0x"+"aa"*32)) + + @patch("admin.misc.RskClient") + def test_ud_source_ok(self, RskClient): + rsk_client = Mock() + RskClient.return_value = rsk_client + rsk_client.get_best_block_number.return_value = "the-best-block-number" + rsk_client.get_block_by_number.return_value = {"hash": "0x" + "bb"*32} + + self.assertEqual("bb"*32, get_ud_value_for_attestation("a-ud-source")) + + RskClient.assert_called_with("a-ud-source") + rsk_client.get_best_block_number.assert_called() + rsk_client.get_block_by_number.assert_called_with("the-best-block-number") + + @patch("admin.misc.RskClient") + def test_ud_source_client_error(self, RskClient): + rsk_client = Mock() + RskClient.return_value = rsk_client + rsk_client.get_best_block_number.return_value = "the-best-block-number" + rsk_client.get_block_by_number.side_effect = RskClientError("an-error") + + with self.assertRaises(AdminError) as e: + get_ud_value_for_attestation("a-ud-source") + self.assertIn("an-error", str(e.exception)) + + RskClient.assert_called_with("a-ud-source") + rsk_client.get_best_block_number.assert_called() + rsk_client.get_block_by_number.assert_called_with("the-best-block-number") diff --git a/middleware/tests/admin/test_sgx_attestation.py b/middleware/tests/admin/test_sgx_attestation.py new file mode 100644 index 00000000..2f05f2da --- /dev/null +++ b/middleware/tests/admin/test_sgx_attestation.py @@ -0,0 +1,113 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from types import SimpleNamespace +from unittest import TestCase +from unittest.mock import Mock, patch +from parameterized import parameterized +from admin.sgx_attestation import do_attestation +from admin.misc import AdminError + + +@patch("sys.stdout") +@patch("admin.sgx_attestation.HSMCertificateV2Element") +@patch("admin.sgx_attestation.HSMCertificateV2") +@patch("admin.sgx_attestation.do_unlock") +@patch("admin.sgx_attestation.get_ud_value_for_attestation") +@patch("admin.sgx_attestation.get_hsm") +class TestSgxAttestation(TestCase): + def setUp(self): + options = SimpleNamespace() + options.output_file_path = "an-output-file" + options.no_unlock = False + options.verbose = "is-verbose" + options.attestation_ud_source = "an-ud-source" + self.options = options + + def setupMocks(self, get_hsm, get_ud_value_for_attestation, do_unlock, + HSMCertificateV2, HSMCertificateV2Element): + self.get_hsm = get_hsm + self.get_ud_value_for_attestation = get_ud_value_for_attestation + self.do_unlock = do_unlock + self.HSMCertificateV2 = HSMCertificateV2 + self.HSMCertificateV2Element = HSMCertificateV2Element + + self.hsm = Mock() + self.hsm.get_powhsm_attestation.return_value = "the-attestation" + get_hsm.return_value = self.hsm + get_ud_value_for_attestation.return_value = "some-random-value" + + @parameterized.expand([ + ("unlock", False), + ("no_unlock", True), + ]) + def test_ok(self, *args): + self.setupMocks(*args[:-3]) + self.options.no_unlock = args[-1] + + do_attestation(self.options) + + self.get_ud_value_for_attestation.assert_called_with("an-ud-source") + if self.options.no_unlock: + self.do_unlock.assert_not_called() + else: + self.do_unlock.assert_called_with(self.options, label=False) + self.get_hsm.assert_called_with("is-verbose") + self.hsm.get_powhsm_attestation.assert_called_with("some-random-value") + self.hsm.disconnect.assert_called() + self.HSMCertificateV2Element.assert_called_with("the-attestation") + elem = self.HSMCertificateV2Element.return_value + cert = self.HSMCertificateV2.return_value + cert.add_element.assert_called_with(elem) + cert.save_to_jsonfile.assert_called_with("an-output-file") + + def test_no_output_path(self, *args): + self.setupMocks(*args[:-1]) + self.options.output_file_path = None + + with self.assertRaises(AdminError) as e: + do_attestation(self.options) + self.assertIn("output file", str(e.exception)) + + self.get_ud_value_for_attestation.assert_not_called() + self.get_hsm.assert_not_called() + self.hsm.get_powhsm_attestation.assert_not_called() + self.do_unlock.assert_not_called() + self.HSMCertificateV2.assert_not_called() + self.HSMCertificateV2Element.assert_not_called() + + def test_adm_err(self, *args): + self.setupMocks(*args[:-1]) + + self.hsm.get_powhsm_attestation.side_effect = RuntimeError("an error") + + with self.assertRaises(RuntimeError) as e: + do_attestation(self.options) + self.assertIn("an error", str(e.exception)) + + self.get_ud_value_for_attestation.assert_called_with("an-ud-source") + self.do_unlock.assert_called_with(self.options, label=False) + self.get_hsm.assert_called_with("is-verbose") + self.hsm.get_powhsm_attestation.assert_called_with("some-random-value") + self.hsm.disconnect.assert_not_called() + self.HSMCertificateV2.assert_not_called() + self.HSMCertificateV2Element.assert_not_called() diff --git a/setup.cfg b/setup.cfg index bd38e3cf..b11ff18e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,7 +22,9 @@ max-line-length = 90 indent-size = 4 # Ignore unused imports in __init__.py files -per-file-ignores = __init__.py:F401 +per-file-ignores = + __init__.py:F401, + middleware/admin/certificate.py:F401, show-source = False statistics = True