From 198aee8b90820a9a3f4a81788994286782c837c9 Mon Sep 17 00:00:00 2001 From: Ariel Mendelzon Date: Fri, 13 Dec 2024 10:23:50 -0300 Subject: [PATCH] Certificate V2 generator - Added individual certificate v2 element types with serialization logic: SgxQuote, SgxAttestationKey and X509 - Added CStruct abstraction to simplify representing C structs - Added struct definitions as CStruct subtypes based on OpenEnclave's endorsement type definitions - Updated SGX attestation gathering logic to parse the envelope and generate the actual certificate elements - Added and updated unit tests - Ignoring long line linting in envelope unit tests --- middleware/admin/certificate.py | 4 +- middleware/admin/certificate_v2.py | 60 ++++- middleware/admin/sgx_attestation.py | 74 +++++- middleware/comm/cstruct.py | 163 +++++++++++++ middleware/sgx/envelope.py | 192 +++++++++++++++ middleware/tests/admin/test_certificate_v1.py | 2 +- middleware/tests/admin/test_certificate_v2.py | 91 +++++++ .../tests/admin/test_sgx_attestation.py | 130 +++++++++- middleware/tests/comm/test_cstruct.py | 223 ++++++++++++++++++ middleware/tests/sgx/test_envelope.py | 112 +++++++++ setup.cfg | 1 + 11 files changed, 1026 insertions(+), 26 deletions(-) create mode 100644 middleware/comm/cstruct.py create mode 100644 middleware/sgx/envelope.py create mode 100644 middleware/tests/admin/test_certificate_v2.py create mode 100644 middleware/tests/comm/test_cstruct.py create mode 100644 middleware/tests/sgx/test_envelope.py diff --git a/middleware/admin/certificate.py b/middleware/admin/certificate.py index 64b3bb6b..1b86f791 100644 --- a/middleware/admin/certificate.py +++ b/middleware/admin/certificate.py @@ -21,4 +21,6 @@ # SOFTWARE. from .certificate_v1 import HSMCertificate, HSMCertificateElement -from .certificate_v2 import HSMCertificateV2, HSMCertificateV2Element +from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementSGXQuote, \ + HSMCertificateV2ElementSGXAttestationKey, \ + HSMCertificateV2ElementX509 diff --git a/middleware/admin/certificate_v2.py b/middleware/admin/certificate_v2.py index 61e74ce5..962fa3a0 100644 --- a/middleware/admin/certificate_v2.py +++ b/middleware/admin/certificate_v2.py @@ -24,16 +24,62 @@ class HSMCertificateV2Element: - # TODO: actual logic and subclasses - def __init__(self, element_map): - self.element_map = element_map + pass - # Stub - def name(self): - return "attestation" + +class HSMCertificateV2ElementSGXQuote(HSMCertificateV2Element): + def __init__(self, name, message, custom_data, signature, signed_by): + self.name = name + self.message = message + self.custom_data = custom_data + self.signature = signature + self.signed_by = signed_by + + def to_dict(self): + return { + "name": self.name, + "type": "sgx_quote", + "message": self.message.hex(), + "custom_data": self.custom_data.hex(), + "signature": self.signature.hex(), + "signed_by": self.signed_by, + } + + +class HSMCertificateV2ElementSGXAttestationKey(HSMCertificateV2Element): + def __init__(self, name, message, key, auth_data, signature, signed_by): + self.name = name + self.message = message + self.key = key + self.auth_data = auth_data + self.signature = signature + self.signed_by = signed_by + + def to_dict(self): + return { + "name": self.name, + "type": "sgx_attestation_key", + "message": self.message.hex(), + "key": self.key.hex(), + "auth_data": self.auth_data.hex(), + "signature": self.signature.hex(), + "signed_by": self.signed_by, + } + + +class HSMCertificateV2ElementX509(HSMCertificateV2Element): + def __init__(self, name, message, signed_by): + self.name = name + self.message = message + self.signed_by = signed_by def to_dict(self): - return self.element_map + return { + "name": self.name, + "type": "x509_pem", + "message": self.message.decode('ASCII'), + "signed_by": self.signed_by, + } class HSMCertificateV2(HSMCertificate): diff --git a/middleware/admin/sgx_attestation.py b/middleware/admin/sgx_attestation.py index c261f0ba..364df5b2 100644 --- a/middleware/admin/sgx_attestation.py +++ b/middleware/admin/sgx_attestation.py @@ -20,9 +20,13 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import ecdsa from .misc import info, head, get_hsm, AdminError, get_ud_value_for_attestation from .unlock import do_unlock -from .certificate import HSMCertificateV2, HSMCertificateV2Element +from sgx.envelope import SgxEnvelope +from .certificate import HSMCertificateV2, HSMCertificateV2ElementSGXQuote, \ + HSMCertificateV2ElementSGXAttestationKey, \ + HSMCertificateV2ElementX509 def do_attestation(options): @@ -58,14 +62,72 @@ def do_attestation(options): hsm.disconnect() + # Parse envelope + info("Parsing the powHSM attestation envelope...") + try: + envelope = SgxEnvelope( + bytes.fromhex(powhsm_attestation["envelope"]), + bytes.fromhex(powhsm_attestation["message"])) + except Exception as e: + raise AdminError(f"SGX envelope parse error: {str(e)}") + + # Conversions + quote_signature = ecdsa.util.sigdecode_string( + envelope.quote_auth_data.signature.r + + envelope.quote_auth_data.signature.s, + ecdsa.NIST256p.order) + quote_signature = ecdsa.util.sigencode_der( + quote_signature[0], + quote_signature[1], + ecdsa.NIST256p.order) + att_key = ecdsa.VerifyingKey.from_string( + envelope.quote_auth_data.attestation_key.x + + envelope.quote_auth_data.attestation_key.y, + ecdsa.NIST256p) + qe_rb_signature = ecdsa.util.sigdecode_string( + envelope.quote_auth_data.qe_report_body_signature.r + + envelope.quote_auth_data.qe_report_body_signature.s, + ecdsa.NIST256p.order) + qe_rb_signature = ecdsa.util.sigencode_der( + qe_rb_signature[0], + qe_rb_signature[1], + ecdsa.NIST256p.order) + # Generate and save the attestation certificate info("Generating the attestation certificate... ", options.verbose) - att_cert = HSMCertificateV2() - # TODO: - # 1. Parse envelope - # 2. Add actual elements of the certificate - att_cert.add_element(HSMCertificateV2Element(powhsm_attestation)) + + att_cert.add_element( + HSMCertificateV2ElementSGXQuote( + name="quote", + message=envelope.quote.get_raw_data(), + custom_data=envelope.custom_message, + signature=quote_signature, + signed_by="attestation", + )) + att_cert.add_element( + HSMCertificateV2ElementSGXAttestationKey( + name="attestation", + message=envelope.quote_auth_data.qe_report_body.get_raw_data(), + key=att_key.to_string("uncompressed"), + auth_data=envelope.qe_auth_data.data, + signature=qe_rb_signature, + signed_by="quoting_enclave", + )) + att_cert.add_element( + HSMCertificateV2ElementX509( + name="quoting_enclave", + message=envelope.qe_cert_data.certs[0], + signed_by="platform_ca", + )) + att_cert.add_element( + HSMCertificateV2ElementX509( + name="platform_ca", + message=envelope.qe_cert_data.certs[1], + signed_by="sgx_root", + )) + + att_cert.add_target("quote") att_cert.save_to_jsonfile(options.output_file_path) info(f"Attestation certificate saved to {options.output_file_path}") diff --git a/middleware/comm/cstruct.py b/middleware/comm/cstruct.py new file mode 100644 index 00000000..77203988 --- /dev/null +++ b/middleware/comm/cstruct.py @@ -0,0 +1,163 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +import struct +import re + + +class CStruct: + MAP = { + "uint8_t": ["B", "s"], + "uint16_t": "H", + "uint32_t": "I", + "uint64_t": "Q", + } + SPEC = None + TYPENAME = None + + @classmethod + def _spec(cls, little=True): + if cls.SPEC is None or little not in cls.SPEC: + fmt = "<" if little else ">" + atrmap = {} + names = [] + types = [] + index = 0 + typename = None + for line in cls.__doc__.split("\n")[1:]: + line = re.sub(r"\s+", " ", line.strip()) + if line == "": + continue + if typename is None: + typename = line + continue + tspec = line.split(" ") + + length = "" if len(tspec) < 3 else str(int(tspec[2].strip(), 10)) + + typ = tspec[0].strip() + actual_type = None + derived_type = None + if typ not in cls.MAP.keys(): + for kls in cls.__base__.__subclasses__(): + if cls != kls: + if typ == kls._typename(): + actual_type = kls + derived_type = kls + if derived_type is None: + raise ValueError(f"Invalid type: {typ}") + else: + actual_type = cls.MAP[typ] + + if length != "" and not isinstance(actual_type, list): + raise ValueError(f"Invalid type spec: {line}") + + name = tspec[1].strip() + + if isinstance(actual_type, list): + actual_type = actual_type[0] if length == "" else actual_type[1] + elif not isinstance(actual_type, str) and \ + issubclass(actual_type, cls.__base__): + actual_type = str(actual_type.get_bytelength(little)) + "s" + + fmt += length + actual_type + names.append(name) + types.append(derived_type) + atrmap[name] = index + index += 1 + if cls.SPEC is None: + cls.SPEC = {} + cls.SPEC[little] = (struct.Struct(fmt), atrmap, names, types, typename) + + return cls.SPEC[little] + + @classmethod + def _struct(cls, little=True): + return cls._spec(little)[0] + + @classmethod + def _atrmap(cls, little=True): + return cls._spec(little)[1] + + @classmethod + def _names(cls, little=True): + return cls._spec(little)[2] + + @classmethod + def _types(cls, little=True): + return cls._spec(little)[3] + + @classmethod + def _typename(cls): + if cls.TYPENAME is None: + for line in cls.__doc__.split("\n"): + line = re.sub(r"\s+", " ", line.strip()) + if line == "": + continue + cls.TYPENAME = line + break + + return cls.TYPENAME + + @classmethod + def get_bytelength(cls, little=True): + return cls._struct(little).size + + def __init__(self, value, offset=0, little=True): + self._offset = offset + self._little = little + self._raw_value = value + + try: + self._parsed = list(self._struct(little).unpack_from(value, offset)) + except Exception as e: + raise ValueError(f"While parsing: {e}") + + for index, derived_type in enumerate(self._types(little)): + if derived_type is not None: + self._parsed[index] = derived_type(self._parsed[index], little=little) + + def _value(self, name): + amap = self._atrmap(self._little) + if name in amap: + return self._parsed[amap[name]] + raise NameError(f"Property {name} does not exist") + + def __getattr__(self, name): + return self._value(name) + + def get_raw_data(self): + return self._raw_value[ + self._offset:self._offset+self.get_bytelength(self._little)] + + def to_dict(self): + result = {} + for name in self._names(self._little): + value = self._value(name) + if isinstance(value, bytes): + value = value.hex() + result[name] = value.to_dict() if isinstance(value, CStruct) else value + return result + + def __repr__(self): + return f"<{self.__class__.__name__}: {self.to_dict()}>" diff --git a/middleware/sgx/envelope.py b/middleware/sgx/envelope.py new file mode 100644 index 00000000..39f86aab --- /dev/null +++ b/middleware/sgx/envelope.py @@ -0,0 +1,192 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from comm.cstruct import CStruct + + +class SgxEnvelope(CStruct): + """ + sgx_envelope_t + + sgx_quote_t quote + sgx_quote_tail_t quote_tail + sgx_quote_auth_data_t quote_auth_data + """ + + def __init__(self, envelope_bytes, custom_message_bytes, offset=0, little=True): + super().__init__(envelope_bytes, offset, little) + offset += self.get_bytelength() + + qead = SgxQeAuthData(envelope_bytes, offset, little) + offset += qead.get_total_bytelength() + self.qe_auth_data = qead + + qecd = SgxQeCertData(envelope_bytes, offset, little) + offset += qecd.get_total_bytelength() + self.qe_cert_data = qecd + + if envelope_bytes[offset:] != custom_message_bytes: + raise ValueError("Unexpected custom message in envelope tail") + self.custom_message = custom_message_bytes + +############################################################################## +# Types below taken from OpenEnclave's include/openenclave/bits/sgx/sgxtypes.h +############################################################################## + + +class SgxAttributes(CStruct): + """ + sgx_attributes_t + + uint64_t flags + uint64_t xfrm + """ + + +class SgxReportData(CStruct): + """ + sgx_report_data_t + + uint8_t field 64 + """ + + +class SgxReportBody(CStruct): + """ + sgx_report_body_t + + uint8_t cpusvn 16 + uint32_t miscselect + uint8_t reserved1 12 + uint8_t isvextprodid 16 + sgx_attributes_t attributes + uint8_t mrenclave 32 + uint8_t reserved2 32 + uint8_t mrsigner 32 + uint8_t reserved3 32 + uint8_t configid 64 + uint16_t isvprodid + uint16_t isvsvn + uint16_t configsvn + uint8_t reserved4 42 + uint8_t isvfamilyid 16 + sgx_report_data_t report_data + """ + + +class SgxEcdsa256Signature(CStruct): + """ + sgx_ecdsa256_signature_t + + uint8_t r 32 + uint8_t s 32 + """ + + +class SgxEcdsa256Key(CStruct): + """ + sgx_ecdsa256_key_t + + uint8_t x 32 + uint8_t y 32 + """ + + +class SgxQuote(CStruct): + """ + sgx_quote_t + + uint16_t version + uint16_t sign_type + uint32_t tee_type + uint16_t qe_svn + uint16_t pce_svn + uint8_t uuid 16 + uint8_t user_data 20 + sgx_report_body_t report_body + """ + + +# This is actually part of sgx_quote_t, separated +# for pratical reasons since the signature doesn't include +# this field +class SgxQuoteTail(CStruct): + """ + sgx_quote_tail_t + + uint32_t signature_len + """ + + +class SgxQuoteAuthData(CStruct): + """ + sgx_quote_auth_data_t + + sgx_ecdsa256_signature_t signature + sgx_ecdsa256_key_t attestation_key + sgx_report_body_t qe_report_body + sgx_ecdsa256_signature_t qe_report_body_signature + """ + +#################################################################### +# The following two structs are augmented with content parsing logic +#################################################################### + + +class SgxQeAuthData(CStruct): + """ + sgx_qe_auth_data_t + + uint16_t size + """ + + def __init__(self, value, offset=0, little=True): + super().__init__(value, offset, little) + os = offset + self.get_bytelength() + data = value[os:os+self.size] + if len(data) != self.size: + raise ValueError(f"Expected {self.size} data bytes but only got {len(data)}") + self.data = data + + def get_total_bytelength(self): + return self.get_bytelength() + len(self.data) + + +class SgxQeCertData(SgxQeAuthData): + """ + sgx_qe_cert_data_t + + uint16_t type + uint32_t size + """ + SPEC = None + TYPENAME = None + + X509_START_MARKER = b"-----BEGIN CERTIFICATE-----\n" + X509_END_MARKER = b"\n-----END CERTIFICATE-----\n" + + def __init__(self, value, offset=0, little=True): + super().__init__(value, offset, little) + self.certs = list(map(lambda c: c.replace(self.X509_START_MARKER, b""), + filter(lambda c: + c.strip().startswith(self.X509_START_MARKER), + self.data.split(self.X509_END_MARKER)))) diff --git a/middleware/tests/admin/test_certificate_v1.py b/middleware/tests/admin/test_certificate_v1.py index 591cfc91..f7503bba 100644 --- a/middleware/tests/admin/test_certificate_v1.py +++ b/middleware/tests/admin/test_certificate_v1.py @@ -26,7 +26,7 @@ from unittest import TestCase from unittest.mock import call, patch, mock_open -from admin.certificate import HSMCertificate, HSMCertificateElement +from admin.certificate_v1 import HSMCertificate, HSMCertificateElement class TestHSMCertificate(TestCase): diff --git a/middleware/tests/admin/test_certificate_v2.py b/middleware/tests/admin/test_certificate_v2.py new file mode 100644 index 00000000..f4a16da6 --- /dev/null +++ b/middleware/tests/admin/test_certificate_v2.py @@ -0,0 +1,91 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from unittest import TestCase +from admin.certificate_v1 import HSMCertificate +from admin.certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementSGXQuote, \ + HSMCertificateV2ElementSGXAttestationKey, \ + HSMCertificateV2ElementX509 + + +class TestHSMCertificateV2(TestCase): + def test_behavior_inherited(self): + self.assertTrue(issubclass(HSMCertificateV2, HSMCertificate)) + + def test_create_empty_certificate_ok(self): + cert = HSMCertificateV2() + self.assertEqual({'version': 2, 'targets': [], 'elements': []}, cert.to_dict()) + + +class TestHSMCertificateV2ElementSGXQuote(TestCase): + def test_dict_ok(self): + elem = HSMCertificateV2ElementSGXQuote( + "thename", + bytes.fromhex("aabbcc"), + bytes.fromhex("ddeeff"), + bytes.fromhex("112233"), + "whosigned" + ) + self.assertEqual({ + "name": "thename", + "type": "sgx_quote", + "message": "aabbcc", + "custom_data": "ddeeff", + "signature": "112233", + "signed_by": "whosigned", + }, elem.to_dict()) + + +class TestHSMCertificateV2ElementSGXAttestationKey(TestCase): + def test_dict_ok(self): + elem = HSMCertificateV2ElementSGXAttestationKey( + "thename", + bytes.fromhex("aabbcc"), + bytes.fromhex("ddeeff"), + bytes.fromhex("112233"), + bytes.fromhex("44556677"), + "whosigned" + ) + self.assertEqual({ + "name": "thename", + "type": "sgx_attestation_key", + "message": "aabbcc", + "key": "ddeeff", + "auth_data": "112233", + "signature": "44556677", + "signed_by": "whosigned", + }, elem.to_dict()) + + +class TestHSMCertificateV2ElementX509(TestCase): + def test_dict_ok(self): + elem = HSMCertificateV2ElementX509( + "thename", + b"this is an ascii message", + "whosigned" + ) + self.assertEqual({ + "name": "thename", + "type": "x509_pem", + "message": "this is an ascii message", + "signed_by": "whosigned", + }, elem.to_dict()) diff --git a/middleware/tests/admin/test_sgx_attestation.py b/middleware/tests/admin/test_sgx_attestation.py index 2f05f2da..4b60fadb 100644 --- a/middleware/tests/admin/test_sgx_attestation.py +++ b/middleware/tests/admin/test_sgx_attestation.py @@ -20,17 +20,21 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import ecdsa from types import SimpleNamespace from unittest import TestCase -from unittest.mock import Mock, patch +from unittest.mock import Mock, patch, call from parameterized import parameterized from admin.sgx_attestation import do_attestation from admin.misc import AdminError @patch("sys.stdout") -@patch("admin.sgx_attestation.HSMCertificateV2Element") +@patch("admin.sgx_attestation.HSMCertificateV2ElementX509") +@patch("admin.sgx_attestation.HSMCertificateV2ElementSGXAttestationKey") +@patch("admin.sgx_attestation.HSMCertificateV2ElementSGXQuote") @patch("admin.sgx_attestation.HSMCertificateV2") +@patch("admin.sgx_attestation.SgxEnvelope") @patch("admin.sgx_attestation.do_unlock") @patch("admin.sgx_attestation.get_ud_value_for_attestation") @patch("admin.sgx_attestation.get_hsm") @@ -44,15 +48,56 @@ def setUp(self): self.options = options def setupMocks(self, get_hsm, get_ud_value_for_attestation, do_unlock, - HSMCertificateV2, HSMCertificateV2Element): + SgxEnvelope, HSMCertificateV2, HSMCertificateV2ElementSGXQuote, + HSMCertificateV2ElementSGXAttestationKey, HSMCertificateV2ElementX509): self.get_hsm = get_hsm self.get_ud_value_for_attestation = get_ud_value_for_attestation self.do_unlock = do_unlock + self.SgxEnvelope = SgxEnvelope self.HSMCertificateV2 = HSMCertificateV2 - self.HSMCertificateV2Element = HSMCertificateV2Element + self.HSMCertificateV2ElementSGXQuote = HSMCertificateV2ElementSGXQuote + self.HSMCertificateV2ElementSGXAttestationKey = \ + HSMCertificateV2ElementSGXAttestationKey + self.HSMCertificateV2ElementX509 = HSMCertificateV2ElementX509 self.hsm = Mock() - self.hsm.get_powhsm_attestation.return_value = "the-attestation" + self.hsm.get_powhsm_attestation.return_value = { + "envelope": "11"*32, + "message": "22"*32, + } + quote = SimpleNamespace(**{"get_raw_data": lambda: "quote-raw-data"}) + sig = SimpleNamespace(**{"r": b"a"*32, "s": b"a"*32}) + self.att_key = ecdsa.SigningKey.generate(curve=ecdsa.NIST256p).\ + get_verifying_key() + att_key_str = self.att_key.to_string() + attkey = SimpleNamespace(**{"x": att_key_str[:32], "y": att_key_str[32:]}) + qesig = SimpleNamespace(**{"r": b"c"*32, "s": b"c"*32}) + qad = SimpleNamespace(**{ + "signature": sig, + "attestation_key": attkey, + "qe_report_body": SimpleNamespace(**{ + "get_raw_data": lambda: "qerb-raw-data"}), + "qe_report_body_signature": qesig, + }) + qead = SimpleNamespace(**{ + "data": "qead-data", + }) + qecd = SimpleNamespace(**{ + "certs": ["qecd-cert-0", "qecd-cert-1"], + }) + envelope = SimpleNamespace(**{ + "quote": quote, + "quote_auth_data": qad, + "qe_auth_data": qead, + "qe_cert_data": qecd, + "custom_message": "a-custom-message", + }) + self.SgxEnvelope.return_value = envelope + + self.HSMCertificateV2ElementSGXQuote.return_value = "quote_elem" + self.HSMCertificateV2ElementSGXAttestationKey.return_value = "attkey_elem" + self.HSMCertificateV2ElementX509.side_effect = ["cert0_elem", "cert1_elem"] + get_hsm.return_value = self.hsm get_ud_value_for_attestation.return_value = "some-random-value" @@ -74,10 +119,44 @@ def test_ok(self, *args): self.get_hsm.assert_called_with("is-verbose") self.hsm.get_powhsm_attestation.assert_called_with("some-random-value") self.hsm.disconnect.assert_called() - self.HSMCertificateV2Element.assert_called_with("the-attestation") - elem = self.HSMCertificateV2Element.return_value + self.SgxEnvelope.assert_called_with( + bytes.fromhex("11"*32), + bytes.fromhex("22"*32), + ) + self.HSMCertificateV2ElementSGXQuote.assert_called_with( + name="quote", + message="quote-raw-data", + custom_data="a-custom-message", + signature=bytes.fromhex("30440220"+"61"*32+"0220"+"61"*32), + signed_by="attestation", + ) + self.HSMCertificateV2ElementSGXAttestationKey.assert_called_with( + name="attestation", + message="qerb-raw-data", + key=self.att_key.to_string("uncompressed"), + auth_data="qead-data", + signature=bytes.fromhex("30440220"+"63"*32+"0220"+"63"*32), + signed_by="quoting_enclave", + ) + self.HSMCertificateV2ElementX509.assert_has_calls([ + call( + name="quoting_enclave", + message="qecd-cert-0", + signed_by="platform_ca", + ), + call( + name="platform_ca", + message="qecd-cert-1", + signed_by="sgx_root", + ) + ]) cert = self.HSMCertificateV2.return_value - cert.add_element.assert_called_with(elem) + cert.add_element.assert_has_calls([ + call("quote_elem"), + call("attkey_elem"), + call("cert0_elem"), + call("cert1_elem") + ]) cert.save_to_jsonfile.assert_called_with("an-output-file") def test_no_output_path(self, *args): @@ -92,10 +171,13 @@ def test_no_output_path(self, *args): self.get_hsm.assert_not_called() self.hsm.get_powhsm_attestation.assert_not_called() self.do_unlock.assert_not_called() + self.SgxEnvelope.assert_not_called() self.HSMCertificateV2.assert_not_called() - self.HSMCertificateV2Element.assert_not_called() + self.HSMCertificateV2ElementSGXQuote.assert_not_called() + self.HSMCertificateV2ElementSGXAttestationKey.assert_not_called() + self.HSMCertificateV2ElementX509.assert_not_called() - def test_adm_err(self, *args): + def test_adm_err_get_attestation(self, *args): self.setupMocks(*args[:-1]) self.hsm.get_powhsm_attestation.side_effect = RuntimeError("an error") @@ -109,5 +191,31 @@ def test_adm_err(self, *args): self.get_hsm.assert_called_with("is-verbose") self.hsm.get_powhsm_attestation.assert_called_with("some-random-value") self.hsm.disconnect.assert_not_called() + self.SgxEnvelope.assert_not_called() + self.HSMCertificateV2.assert_not_called() + self.HSMCertificateV2ElementSGXQuote.assert_not_called() + self.HSMCertificateV2ElementSGXAttestationKey.assert_not_called() + self.HSMCertificateV2ElementX509.assert_not_called() + + def test_adm_err_envelope_parsing(self, *args): + self.setupMocks(*args[:-1]) + + self.SgxEnvelope.side_effect = ValueError("an error") + + with self.assertRaises(AdminError) as e: + do_attestation(self.options) + self.assertIn("envelope parse error", str(e.exception)) + + self.get_ud_value_for_attestation.assert_called_with("an-ud-source") + self.do_unlock.assert_called_with(self.options, label=False) + self.get_hsm.assert_called_with("is-verbose") + self.hsm.get_powhsm_attestation.assert_called_with("some-random-value") + self.hsm.disconnect.assert_called() + self.SgxEnvelope.assert_called_with( + bytes.fromhex("11"*32), + bytes.fromhex("22"*32), + ) self.HSMCertificateV2.assert_not_called() - self.HSMCertificateV2Element.assert_not_called() + self.HSMCertificateV2ElementSGXQuote.assert_not_called() + self.HSMCertificateV2ElementSGXAttestationKey.assert_not_called() + self.HSMCertificateV2ElementX509.assert_not_called() diff --git a/middleware/tests/comm/test_cstruct.py b/middleware/tests/comm/test_cstruct.py new file mode 100644 index 00000000..8c04d47d --- /dev/null +++ b/middleware/tests/comm/test_cstruct.py @@ -0,0 +1,223 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from unittest import TestCase +from parameterized import parameterized +from comm.cstruct import CStruct + +import logging + +logging.disable(logging.CRITICAL) + + +class RandomBisStruct(CStruct): + """ + random_bis_t + + uint16_t another_double + uint8_t single_arr 10 + uint32_t another_quad + """ + + +class RandomTrisStruct(CStruct): + """ + random_tris_t + + uint8_t arr_one 2 + uint8_t arr_two 3 + """ + + +class RandomStruct(CStruct): + """ + random_t + + uint8_t single_val + uint16_t double_val + uint32_t quad_val + uint64_t oct_val + + random_bis_t other_random + random_tris_t yet_other_random + """ + + +class Invalid1(CStruct): + """ + invalid_1 + + nonexistent_type something + """ + + +class Invalid2(CStruct): + """ + invalid_2 + + uint32_t withlength 5 + """ + + +class ValidWithInvalid(CStruct): + """ + valid_with_invalid + + uint8_t a_number + uint16_t another_number + invalid_2 something_invalid + """ + + +class TestCStruct(TestCase): + def setUp(self): + self.packed = bytes.fromhex( + "99" # single_val + "0102" # double_val + "03040506" # quad_val + "0708090a0b0c0d0e" # oct_val + "8899" # another_double + "00112233445566778899" # single_arr + "d1d2d3d4" # another_quad + "aabb" # arr_one + "ccddee" # arr_two + ) + + def test_expected_sizes(self): + self.assertEqual(16, RandomBisStruct.get_bytelength()) + self.assertEqual(5, RandomTrisStruct.get_bytelength()) + self.assertEqual(15 + + RandomBisStruct.get_bytelength() + + RandomTrisStruct.get_bytelength(), + RandomStruct.get_bytelength()) + + def test_parsing_default(self): + parsed = RandomStruct(self.packed) + + self.assertEqual(0x99, parsed.single_val) + self.assertEqual(0x0201, parsed.double_val) + self.assertEqual(0x06050403, parsed.quad_val) + self.assertEqual(0x0e0d0c0b0a090807, parsed.oct_val) + + self.assertEqual(0x9988, parsed.other_random.another_double) + self.assertEqual(bytes.fromhex("00112233445566778899"), + parsed.other_random.single_arr) + self.assertEqual(0xd4d3d2d1, parsed.other_random.another_quad) + + self.assertEqual(bytes.fromhex("aabb"), parsed.yet_other_random.arr_one) + self.assertEqual(bytes.fromhex("ccddee"), parsed.yet_other_random.arr_two) + + self.assertEqual({ + "single_val": 0x99, + "double_val": 0x0201, + "quad_val": 0x06050403, + "oct_val": 0x0e0d0c0b0a090807, + "other_random": { + "another_double": 0x9988, + "single_arr": "00112233445566778899", + "another_quad": 0xd4d3d2d1, + }, + "yet_other_random": { + "arr_one": "aabb", + "arr_two": "ccddee", + } + }, parsed.to_dict()) + + def test_parsing_little_offset(self): + parsed = RandomStruct(b"thisisrandom" + self.packed, offset=12, little=True) + + self.assertEqual(0x99, parsed.single_val) + self.assertEqual(0x0201, parsed.double_val) + self.assertEqual(0x06050403, parsed.quad_val) + self.assertEqual(0x0e0d0c0b0a090807, parsed.oct_val) + + self.assertEqual(0x9988, parsed.other_random.another_double) + self.assertEqual(bytes.fromhex("00112233445566778899"), + parsed.other_random.single_arr) + self.assertEqual(0xd4d3d2d1, parsed.other_random.another_quad) + + self.assertEqual(bytes.fromhex("aabb"), parsed.yet_other_random.arr_one) + self.assertEqual(bytes.fromhex("ccddee"), parsed.yet_other_random.arr_two) + + self.assertEqual({ + "single_val": 0x99, + "double_val": 0x0201, + "quad_val": 0x06050403, + "oct_val": 0x0e0d0c0b0a090807, + "other_random": { + "another_double": 0x9988, + "single_arr": "00112233445566778899", + "another_quad": 0xd4d3d2d1, + }, + "yet_other_random": { + "arr_one": "aabb", + "arr_two": "ccddee", + } + }, parsed.to_dict()) + + def test_parsing_big(self): + parsed = RandomStruct(self.packed, little=False) + + self.assertEqual(0x99, parsed.single_val) + self.assertEqual(0x0102, parsed.double_val) + self.assertEqual(0x03040506, parsed.quad_val) + self.assertEqual(0x0708090a0b0c0d0e, parsed.oct_val) + + self.assertEqual(0x8899, parsed.other_random.another_double) + self.assertEqual(bytes.fromhex("00112233445566778899"), + parsed.other_random.single_arr) + self.assertEqual(0xd1d2d3d4, parsed.other_random.another_quad) + + self.assertEqual(bytes.fromhex("aabb"), parsed.yet_other_random.arr_one) + self.assertEqual(bytes.fromhex("ccddee"), parsed.yet_other_random.arr_two) + + self.assertEqual({ + "single_val": 0x99, + "double_val": 0x0102, + "quad_val": 0x03040506, + "oct_val": 0x0708090a0b0c0d0e, + "other_random": { + "another_double": 0x8899, + "single_arr": "00112233445566778899", + "another_quad": 0xd1d2d3d4, + }, + "yet_other_random": { + "arr_one": "aabb", + "arr_two": "ccddee", + } + }, parsed.to_dict()) + + def test_parsing_toosmall(self): + with self.assertRaises(ValueError): + RandomStruct(b"thisistoosmall") + + @parameterized.expand([ + ("invalid_one", Invalid1), + ("invalid_two", Invalid2), + ("valid_with_invalid", ValidWithInvalid) + ]) + def test_invalid_spec(self, _, kls): + with self.assertRaises(ValueError): + kls.get_bytelength() + + with self.assertRaises(ValueError): + kls(b'somethingtoparse') diff --git a/middleware/tests/sgx/test_envelope.py b/middleware/tests/sgx/test_envelope.py new file mode 100644 index 00000000..132a2861 --- /dev/null +++ b/middleware/tests/sgx/test_envelope.py @@ -0,0 +1,112 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from unittest import TestCase +from parameterized import parameterized +from sgx.envelope import SgxAttributes, \ + SgxReportData, \ + SgxReportBody, \ + SgxEcdsa256Signature, \ + SgxEcdsa256Key, \ + SgxQuote, \ + SgxQuoteTail, \ + SgxQuoteAuthData, \ + SgxQeCertData, \ + SgxQeAuthData, \ + SgxEnvelope + +import logging + +logging.disable(logging.CRITICAL) + +TEST_ENVELOPE = """  +""" + +TEST_MESSAGE = "746869732069732061206d657373616765" + + +class TestSgxQeAuthData(TestCase): + def test_parses_ok(self): + parsed = SgxQeAuthData(bytes.fromhex("0a00112233445566778899aa")) + self.assertEqual(10, parsed.size) + self.assertEqual(bytes.fromhex("112233445566778899aa"), parsed.data) + + def test_parses_error_tooshort(self): + with self.assertRaises(ValueError): + SgxQeAuthData(bytes.fromhex("0a0baabbcc")) + + +class TestSgxQeCertData(TestCase): + def test_parses_ok(self): + certs = \ +b""" +-----BEGIN CERTIFICATE----- +this is certificate one +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +this is certificate two +-----END CERTIFICATE----- +""" + parsed = SgxQeCertData( + bytes.fromhex("1122") + + len(certs).to_bytes(4, byteorder="little", signed=False) + + certs + ) + self.assertEqual(0x2211, parsed.type) + self.assertEqual(certs, parsed.data) + self.assertEqual(2, len(parsed.certs)) + + def test_parses_error_tooshort(self): + with self.assertRaises(ValueError): + SgxQeAuthData(bytes.fromhex("0a0baabbcc")) + + +class TestSgxEnvelope(TestCase): + def test_parses_ok(self): + envelope = SgxEnvelope( + bytes.fromhex(TEST_ENVELOPE), + bytes.fromhex(TEST_MESSAGE) + ) + + self.assertEqual(TEST_MESSAGE, envelope.custom_message.hex()) + + def test_parsing_fails_if_message_mismatch(self): + with self.assertRaises(ValueError): + SgxEnvelope(bytes.fromhex(TEST_ENVELOPE), b"some-other-message") + + +class TestSgxStructs(TestCase): + # Sizes taken from OpenEnclave's include/openenclave/bits/sgx/sgxtypes.h + # sgx_quote_t is smaller due to not including the last field (signature_len) + @parameterized.expand([ + ("sgx_attributes_t", SgxAttributes, 16), + ("sgx_report_data_t", SgxReportData, 64), + ("sgx_report_body_t", SgxReportBody, 384), + ("sgx_ecdsa256_signature_t", SgxEcdsa256Signature, 64), + ("sgx_ecdsa256_key_t", SgxEcdsa256Key, 64), + ("sgx_quote_t", SgxQuote, 432), + ("sgx_quote_tail_t", SgxQuoteTail, 4), + ("sgx_quote_auth_data_t", SgxQuoteAuthData, 576), + ]) + def test_sizes_ok(self, _, kls, exp_len): + self.assertEqual(exp_len, kls.get_bytelength()) diff --git a/setup.cfg b/setup.cfg index b11ff18e..da90c29b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,6 +25,7 @@ indent-size = 4 per-file-ignores = __init__.py:F401, middleware/admin/certificate.py:F401, + middleware/tests/sgx/test_envelope.py:E122, show-source = False statistics = True