Skip to content

Commit

Permalink
Better MDM push certificate lifecycle
Browse files Browse the repository at this point in the history
 - Create MDM push certificate with only a private key
 - Upload the certificate later
 - CSR download
 - Store signed CSR in the Push Certificate
 - Signed CSR download
  • Loading branch information
np5 committed Apr 3, 2024
1 parent 46743f0 commit c228400
Show file tree
Hide file tree
Showing 13 changed files with 425 additions and 75 deletions.
8 changes: 8 additions & 0 deletions server/base/templatetags/ui_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ def flex_field_width(value):
'icon': "bi bi-download",
'tooltip': "Download",
},
'REFRESH': {
'icon': "bi bi-arrow-counterclockwise",
'tooltip': "Renew",
},
'UPLOAD': {
'icon': "bi bi-upload",
'tooltip': "Upload",
},
'LINK': {
'icon': "bi bi-link-45deg",
'tooltip': "External Link"
Expand Down
182 changes: 162 additions & 20 deletions tests/mdm/test_setup_push_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,29 @@ def test_push_certificate_rewrap_secret(self):
push_certificate.rewrap_secrets()
self.assertEqual(push_certificate.get_private_key(), private_key)

# add push certificate
# upload push certificate

def test_add_push_certificate_redirect(self):
self._login_redirect(reverse("mdm:add_push_certificate"))
def test_upload_push_certificate_redirect(self):
self._login_redirect(reverse("mdm:upload_push_certificate"))

def test_add_push_certificate_permission_denied(self):
def test_upload_push_certificate_permission_denied(self):
self._login()
response = self.client.get(reverse("mdm:add_push_certificate"))
response = self.client.get(reverse("mdm:upload_push_certificate"))
self.assertEqual(response.status_code, 403)

def test_add_push_certificate_get(self):
def test_upload_push_certificate_get(self):
self._login("mdm.add_pushcertificate")
response = self.client.get(reverse("mdm:add_push_certificate"))
response = self.client.get(reverse("mdm:upload_push_certificate"))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_form.html")
self.assertContains(response, "Add a MDM push certificate")
self.assertContains(response, "Upload MDM push certificate and key")

def test_add_push_certificate_post(self):
def test_upload_push_certificate_post(self):
self._login("mdm.add_pushcertificate", "mdm.view_pushcertificate")
name = get_random_string(12)
topic = get_random_string(12)
cert_pem, privkey_pem, privkey_password = force_push_certificate_material(topic)
response = self.client.post(reverse("mdm:add_push_certificate"),
response = self.client.post(reverse("mdm:upload_push_certificate"),
{"name": name,
"certificate_file": SimpleUploadedFile("cert.pem", cert_pem),
"key_file": SimpleUploadedFile("key.pem", privkey_pem),
Expand All @@ -100,6 +100,45 @@ def test_add_push_certificate_post(self):
serialization.load_pem_private_key(privkey_pem, privkey_password).private_numbers()
)

# create push certificate

def test_create_push_certificate_redirect(self):
self._login_redirect(reverse("mdm:create_push_certificate"))

def test_create_push_certificate_permission_denied(self):
self._login("mdm.view_pushcertificate")
response = self.client.get(reverse("mdm:create_push_certificate"))
self.assertEqual(response.status_code, 403)

def test_create_push_certificate_get(self):
self._login("mdm.add_pushcertificate")
response = self.client.get(reverse("mdm:create_push_certificate"))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_form.html")
self.assertContains(response, "Create MDM push certificate")

def test_create_push_certificate_name_collision(self):
push_certificate = force_push_certificate()
self._login("mdm.add_pushcertificate")
response = self.client.post(reverse("mdm:create_push_certificate"),
{"name": push_certificate.name})
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_form.html")
self.assertFormError(response.context["form"], "name", "Push certificate with this Name already exists.")

def test_create_push_certificate_name_post(self):
self._login("mdm.add_pushcertificate", "mdm.view_pushcertificate")
name = get_random_string(12)
response = self.client.post(reverse("mdm:create_push_certificate"),
{"name": name},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_detail.html")
push_certificate = response.context["object"]
self.assertEqual(push_certificate.name, name)
self.assertIsNotNone(push_certificate.private_key)
self.assertIsNone(push_certificate.certificate)

# view push certificate

def test_view_push_certificate_redirect(self):
Expand Down Expand Up @@ -130,33 +169,136 @@ def test_no_delete_push_certificate_link(self):
self.assertNotContains(response, reverse("mdm:delete_push_certificate",
args=(enrollment.push_certificate.pk,)))

# update push certificate
# push certificate CSR

def test_push_certificate_csr_redirect(self):
push_certificate = force_push_certificate()
self._login_redirect(reverse("mdm:push_certificate_csr", args=(push_certificate.pk,)))

def test_push_certificate_csr_permission_denied(self):
push_certificate = force_push_certificate()
self._login()
response = self.client.get(reverse("mdm:push_certificate_csr", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 403)

def test_push_certificate_csr(self):
push_certificate = force_push_certificate(with_material=True)
self._login("mdm.view_pushcertificate")
response = self.client.get(reverse("mdm:push_certificate_csr", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.headers["Content-Type"], "application/pkcs10")
self.assertEqual(response.headers["Content-Disposition"],
f'attachment; filename="push_certificate_{push_certificate.pk}.csr"')

# push certificate signed CSR

def test_push_certificate_signed_csr_redirect(self):
push_certificate = force_push_certificate()
self._login_redirect(reverse("mdm:push_certificate_signed_csr", args=(push_certificate.pk,)))

def test_push_certificate_signed_csr_permission_denied(self):
push_certificate = force_push_certificate()
self._login()
response = self.client.get(reverse("mdm:push_certificate_signed_csr", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 403)

def test_push_certificate_signed_csr_empty(self):
push_certificate = force_push_certificate()
self.assertIsNone(push_certificate.signed_csr)
self._login("mdm.view_pushcertificate")
response = self.client.get(reverse("mdm:push_certificate_signed_csr", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 404)

def test_push_certificate_signed_csr(self):
push_certificate = force_push_certificate()
push_certificate.signed_csr = b"1234"
push_certificate.save()
self._login("mdm.view_pushcertificate")
response = self.client.get(reverse("mdm:push_certificate_signed_csr", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 200)
self.assertEqual(b''.join(response.streaming_content), push_certificate.signed_csr)
self.assertEqual(response.headers["Content-Type"], "application/octet-stream")
self.assertEqual(response.headers["Content-Disposition"],
f'attachment; filename="push_certificate_{push_certificate.pk}_signed_csr.b64"')

# upload push certificate certificate

def test_upload_push_certificate_certificate_redirect(self):
push_certificate = force_push_certificate()
self._login_redirect(reverse("mdm:upload_push_certificate_certificate", args=(push_certificate.pk,)))

def test_upload_push_certificate_certificate_permission_denied(self):
push_certificate = force_push_certificate()
self._login("mdm.view_pushcertificate")
response = self.client.get(reverse("mdm:upload_push_certificate_certificate", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 403)

def test_upload_push_certificate_certificate_get(self):
push_certificate = force_push_certificate()
self._login("mdm.change_pushcertificate")
response = self.client.get(reverse("mdm:upload_push_certificate_certificate", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_form.html")
self.assertContains(response, "Upload MDM push certificate")

def test_upload_push_certificate_certificate_post(self):
push_certificate = force_push_certificate()
push_certificate.topic = None
push_certificate.save()
topic = get_random_string(12)
cert_pem, privkey_pem, _ = force_push_certificate_material(topic=topic, encrypt_key=False)
push_certificate.set_private_key(privkey_pem)
push_certificate.save()
self._login("mdm.change_pushcertificate", "mdm.view_pushcertificate")
response = self.client.post(reverse("mdm:upload_push_certificate_certificate", args=(push_certificate.pk,)),
{"name": push_certificate.name,
"certificate_file": SimpleUploadedFile("cert.pem", cert_pem)},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_detail.html")
self.assertEqual(response.context["object"], push_certificate)
push_certificate.refresh_from_db()
self.assertEqual(push_certificate.topic, topic)

def test_upload_push_certificate_certificate_cert_key_mismatch(self):
push_certificate = force_push_certificate(with_material=True)
cert_pem, _, _ = force_push_certificate_material(topic=push_certificate.topic, encrypt_key=False)
self._login("mdm.change_pushcertificate", "mdm.view_pushcertificate")
response = self.client.post(reverse("mdm:upload_push_certificate_certificate", args=(push_certificate.pk,)),
{"name": push_certificate.name,
"certificate_file": SimpleUploadedFile("cert.pem", cert_pem)},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_form.html")
self.assertFormError(response.context["form"], None, "The certificate and key do not form a pair")

# renew push certificate

def test_update_push_certificate_redirect(self):
def test_renew_push_certificate_redirect(self):
push_certificate = force_push_certificate(with_material=True)
self._login_redirect(reverse("mdm:update_push_certificate", args=(push_certificate.pk,)))
self._login_redirect(reverse("mdm:renew_push_certificate", args=(push_certificate.pk,)))

def test_update_push_certificate_permission_denied(self):
def test_renew_push_certificate_permission_denied(self):
push_certificate = force_push_certificate(with_material=True)
self._login()
response = self.client.get(reverse("mdm:update_push_certificate", args=(push_certificate.pk,)))
response = self.client.get(reverse("mdm:renew_push_certificate", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 403)

def test_update_push_certificate_get(self):
def test_renew_push_certificate_get(self):
push_certificate = force_push_certificate(with_material=True)
self._login("mdm.change_pushcertificate")
response = self.client.get(reverse("mdm:update_push_certificate", args=(push_certificate.pk,)))
response = self.client.get(reverse("mdm:renew_push_certificate", args=(push_certificate.pk,)))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/pushcertificate_form.html")
self.assertContains(response, f"Update MDM push certificate <i>{push_certificate.name}</i>")
self.assertContains(response, "Renew MDM push certificate and key")

def test_update_push_certificate_post(self):
def test_renew_push_certificate_post(self):
topic = get_random_string(12)
push_certificate = force_push_certificate(topic=topic)
new_name = get_random_string(12)
cert_pem, privkey_pem, privkey_password = force_push_certificate_material(topic)
self._login("mdm.change_pushcertificate", "mdm.view_pushcertificate")
response = self.client.post(reverse("mdm:update_push_certificate", args=(push_certificate.pk,)),
response = self.client.post(reverse("mdm:renew_push_certificate", args=(push_certificate.pk,)),
{"name": new_name,
"certificate_file": SimpleUploadedFile("cert.pem", cert_pem),
"key_file": SimpleUploadedFile("key.pem", privkey_pem),
Expand Down
11 changes: 8 additions & 3 deletions tests/mdm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def force_realm_user(realm=None):
# push certificate


def force_push_certificate_material(topic=None, reduced_key_size=True):
def force_push_certificate_material(topic=None, reduced_key_size=True, encrypt_key=True):
privkey = rsa.generate_private_key(
public_exponent=65537,
key_size=512 if reduced_key_size else 2048,
Expand All @@ -83,11 +83,16 @@ def force_push_certificate_material(topic=None, reduced_key_size=True):
cert_pem = cert.public_bytes(
encoding=serialization.Encoding.PEM
)
privkey_password = get_random_string(12).encode("utf-8")
if encrypt_key:
privkey_password = get_random_string(12).encode("utf-8")
encryption_algorithm = serialization.BestAvailableEncryption(privkey_password)
else:
privkey_password = None
encryption_algorithm = serialization.NoEncryption()
privkey_pem = privkey.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(privkey_password)
encryption_algorithm=encryption_algorithm,
)
return cert_pem, privkey_pem, privkey_password

Expand Down
30 changes: 25 additions & 5 deletions zentral/contrib/mdm/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.x509.oid import NameOID
from django.utils.crypto import get_random_string
from django.utils.functional import SimpleLazyObject
Expand Down Expand Up @@ -196,7 +196,7 @@ def encrypt_cms_payload(payload, public_key_bytes, raw_output=False):
with NamedTemporaryFile() as tmp_pubkey_file:
tmp_pubkey_file.write(public_key_bytes)
tmp_pubkey_file.flush()
# encrypt the paload
# encrypt the payload
p = subprocess.Popen(["/usr/bin/openssl", "smime", "-encrypt", tmp_pubkey_file.name],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
Expand All @@ -210,6 +210,26 @@ def encrypt_cms_payload(payload, public_key_bytes, raw_output=False):
# push certificate


def generate_push_certificate_key_bytes(key_size=2048):
key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
)
return key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
)


def generate_push_certificate_csr_der_bytes(push_certificate):
key = serialization.load_pem_private_key(push_certificate.get_private_key(), None)
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, f"Zentral Push Certificate {push_certificate.pk}")
])).sign(key, hashes.SHA256())
return csr.public_bytes(serialization.Encoding.DER)


def load_push_certificate_and_key(cert_pem_bytes, key_pem_bytes, password=None):
if password:
if isinstance(password, str):
Expand All @@ -231,7 +251,7 @@ def load_push_certificate_and_key(cert_pem_bytes, key_pem_bytes, password=None):
# (TODO verify <1024bit with padding.OAEP → error)
pad = padding.PKCS1v15()
try:
key.decrypt(cert.public_key().encrypt(message, pad), pad)
assert key.decrypt(cert.public_key().encrypt(message, pad), pad) == message
except Exception:
raise ValueError("The certificate and key do not form a pair")
try:
Expand All @@ -244,6 +264,6 @@ def load_push_certificate_and_key(cert_pem_bytes, key_pem_bytes, password=None):
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
),
"not_before": cert.not_valid_before,
"not_after": cert.not_valid_after,
"not_before": cert.not_valid_before_utc,
"not_after": cert.not_valid_after_utc,
"topic": topic}
Loading

0 comments on commit c228400

Please sign in to comment.