Skip to content

Commit

Permalink
refact: move generate_roles_description to the new repository class
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Nov 5, 2024
1 parent feb7b53 commit 3202900
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 118 deletions.
117 changes: 0 additions & 117 deletions taf/repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,123 +355,6 @@ def delete_unregistered_target_files(self, targets_role="targets"):
(self.targets_path / file_rel_path).unlink()




def get_key_length_and_scheme_from_metadata(self, parent_role, keyid):
try:
metadata = json.loads(
Path(
self.path, METADATA_DIRECTORY_NAME, f"{parent_role}.json"
).read_text()
)
metadata = metadata["signed"]
if "delegations" in metadata:
metadata = metadata["delegations"]
scheme = metadata["keys"][keyid]["scheme"]
pub_key_pem = metadata["keys"][keyid]["keyval"]["public"]
pub_key = serialization.load_pem_public_key(
pub_key_pem.encode(), backend=default_backend()
)
return pub_key, scheme
except Exception:
return None, None

def generate_roles_description(self) -> Dict:
roles_description = {}

def _get_delegations(role_name):
delegations_info = {}
delegations = self.get_delegations_info(role_name)
if len(delegations):
for role_info in delegations.get("roles"):
delegations_info[role_info["name"]] = {
"threshold": role_info["threshold"],
"number": len(role_info["keyids"]),
"paths": role_info["paths"],
"terminating": role_info["terminating"],
}
pub_key, scheme = self.get_key_length_and_scheme_from_metadata(
role_name, role_info["keyids"][0]
)
delegations_info[role_info["name"]]["scheme"] = scheme
delegations_info[role_info["name"]]["length"] = pub_key.key_size
inner_roles_data = _get_delegations(role_info["name"])
if len(inner_roles_data):
delegations_info[role_info["name"]][
"delegations"
] = inner_roles_data
return delegations_info

for role_name in MAIN_ROLES:
role_obj = self._role_obj(role_name)
roles_description[role_name] = {
"threshold": role_obj.threshold,
"number": len(role_obj.keys),
}
pub_key, scheme = self.get_key_length_and_scheme_from_metadata(
"root", role_obj.keys[0]
)
roles_description[role_name]["scheme"] = scheme
roles_description[role_name]["length"] = pub_key.key_size
if role_name == "targets":
delegations_info = _get_delegations(role_name)
if len(delegations_info):
roles_description[role_name]["delegations"] = delegations_info
return {"roles": roles_description}



def get_delegated_role_property(self, property_name, role_name, parent_role=None):
"""
Extract value of the specified property of the provided delegated role from
its parent's role info.
Args:
- property_name: Name of the property (like threshold)
- role_name: Role
- parent_role: Parent role
Returns:
The specified property's value
"""
# TUF raises an error when asking for properties like threshold and signing keys
# of a delegated role (see https://github.com/theupdateframework/tuf/issues/574)
# The following workaround presumes that one every delegated role is a deegation
# of exactly one delegated role
if parent_role is None:
parent_role = self.find_delegated_roles_parent(role_name)
delegations = self.get_delegations_info(parent_role)
for delegated_role in delegations["roles"]:
if delegated_role["name"] == role_name:
return delegated_role[property_name]
return None

def get_role_keys(self, role, parent_role=None):
"""Get keyids of the given role
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
Returns:
List of the role's keyids (i.e., keyids of the keys).
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
targets object.
"""
role_obj = self._role_obj(role)
if role_obj is None:
return None
try:
return role_obj.keys
except KeyError:
pass
return self.get_delegated_role_property("keyids", role, parent_role)


def get_role_repositories(self, role, parent_role=None):
"""Get repositories of the given role
Expand Down
42 changes: 42 additions & 0 deletions taf/tests/tuf/test_query_repo/test_query_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,46 @@ def test_get_target_file_hashes(tuf_repo_with_delegations):
with pytest.raises(TAFError):
tuf_repo_with_delegations.get_target_file_hashes("doesntexist")

def test_get_key_length_and_scheme_from_metadata(tuf_repo_with_delegations):
keyid = tuf_repo_with_delegations._role_obj("targets").keyids[0]
actual = tuf_repo_with_delegations.get_key_length_and_scheme_from_metadata("root", keyid)
key, scheme = actual
assert key is not None
assert scheme == "rsa-pkcs1v15-sha256"

def test_generate_roles_description(tuf_repo_with_delegations):
actual = tuf_repo_with_delegations.generate_roles_description()
roles_data = actual["roles"]
root_data = roles_data["root"]
assert root_data["threshold"] == 2
assert root_data["number"] == 3
assert root_data["scheme"] == "rsa-pkcs1v15-sha256"
assert root_data["length"] == 3072
targets_data = roles_data["targets"]
assert targets_data["threshold"] == 1
assert targets_data["number"] == 2
assert targets_data["scheme"] == "rsa-pkcs1v15-sha256"
assert targets_data["length"] == 3072
snapshot_data = roles_data["snapshot"]
assert snapshot_data["threshold"] == 1
assert snapshot_data["number"] == 1
assert snapshot_data["scheme"] == "rsa-pkcs1v15-sha256"
assert snapshot_data["length"] == 3072
timestamp_data = roles_data["timestamp"]
assert timestamp_data["threshold"] == 1
assert timestamp_data["number"] == 1
assert timestamp_data["scheme"] == "rsa-pkcs1v15-sha256"
assert timestamp_data["length"] == 3072
assert targets_data["delegations"]
delegated_role_data = targets_data["delegations"]["delegated_role"]
assert delegated_role_data["threshold"] == 2
assert delegated_role_data["number"] == 2
assert delegated_role_data["scheme"] == "rsa-pkcs1v15-sha256"
assert delegated_role_data["length"] == 3072
assert delegated_role_data["delegations"]
inner_role_data = delegated_role_data["delegations"]["inner_role"]
assert inner_role_data["threshold"] == 1
assert inner_role_data["number"] == 1
assert inner_role_data["scheme"] == "rsa-pkcs1v15-sha256"
assert inner_role_data["length"] == 3072

96 changes: 95 additions & 1 deletion taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
import shutil
from typing import Dict, List, Optional, Set, Tuple
from securesystemslib.exceptions import StorageError
from cryptography.hazmat.primitives import serialization

from securesystemslib.signer import Signer

from taf.utils import get_file_details, on_rm_error
from taf.utils import default_backend, get_file_details, on_rm_error
from tuf.api.metadata import (
Metadata,
MetaFile,
Expand Down Expand Up @@ -624,6 +625,99 @@ def get_target_file_hashes(self, target_path, hash_func=HASH_FUNCTION):
except KeyError:
raise TAFError(f"Target {target_path} does not exist")

def get_key_length_and_scheme_from_metadata(self, parent_role, keyid):
try:
metadata = json.loads(
Path(
self._path, METADATA_DIRECTORY_NAME, f"{parent_role}.json"
).read_text()
)
metadata = metadata["signed"]
if "delegations" in metadata:
metadata = metadata["delegations"]
scheme = metadata["keys"][keyid]["scheme"]
pub_key_pem = metadata["keys"][keyid]["keyval"]["public"]
pub_key = serialization.load_pem_public_key(
pub_key_pem.encode(), backend=default_backend()
)
return pub_key, scheme
except Exception:
return None, None

# TODO
def generate_roles_description(self) -> Dict:
roles_description = {}

def _get_delegations(role_name):
delegations_info = {}
targets_signed = self._signed_obj(role_name)
for delegation in targets_signed.delegations.roles:
delegated_role = self._role_obj(delegation)
delegations_info[delegation] = {
"threshold": delegated_role.threshold,
"number": len(delegated_role.keyids),
"paths": delegated_role.paths,
"terminating": delegated_role.terminating,
}
pub_key, scheme = self.get_key_length_and_scheme_from_metadata(
role_name, delegated_role.keyids[0]
)

delegations_info[delegation]["scheme"] = scheme
delegations_info[delegation]["length"] = pub_key.key_size
delegated_signed = self._signed_obj(delegation)
if delegated_signed.delegations:
inner_roles_data = _get_delegations(delegation)
if len(inner_roles_data):
delegations_info[delegation][
"delegations"
] = inner_roles_data
return delegations_info

for role_name in MAIN_ROLES:
role_obj = self._role_obj(role_name)
roles_description[role_name] = {
"threshold": role_obj.threshold,
"number": len(role_obj.keyids),
}
pub_key, scheme = self.get_key_length_and_scheme_from_metadata(
"root", role_obj.keyids[0]
)
roles_description[role_name]["scheme"] = scheme
roles_description[role_name]["length"] = pub_key.key_size
if role_name == "targets":
targets_signed = self._signed_obj(role_name)
if targets_signed.delegations:
delegations_info = _get_delegations(role_name)
if len(delegations_info):
roles_description[role_name]["delegations"] = delegations_info
return {"roles": roles_description}

def get_role_keys(self, role, parent_role=None):
"""Get keyids of the given role
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
Returns:
List of the role's keyids (i.e., keyids of the keys).
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
targets object.
"""
role_obj = self._role_obj(role)
if role_obj is None:
return None
try:
return role_obj.keys
except KeyError:
pass
return self.get_delegated_role_property("keyids", role, parent_role)

def map_signing_roles(self, target_filenames):
"""
Expand Down

0 comments on commit 3202900

Please sign in to comment.