Skip to content

Commit

Permalink
feat: moved get threshold to the new repository class, implement dele…
Browse files Browse the repository at this point in the history
…gated role lookup
  • Loading branch information
renatav committed Oct 29, 2024
1 parent 70927b7 commit 9bf3fb9
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 85 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"click==8.*",
"colorama>=0.3.9",
"tuf==5.*",
"cryptography>=40.0.0",
# "cryptography>=40.0.0",
"securesystemslib==1.*",
"loguru==0.7.*",
'pygit2==1.9.*; python_version < "3.11"',
Expand Down
60 changes: 0 additions & 60 deletions taf/repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,30 +962,6 @@ def get_delegations_info(self, role_name):
self._repository
return tuf.roledb.get_roleinfo(role_name, self.name).get("delegations")

def get_role_threshold(self, role, parent_role=None):
"""Get threshold of the given role
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
Returns:
Role's signatures threshold
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by this
"""
role_obj = self._role_obj(role)
if role_obj is None:
return None
try:
return role_obj.threshold
except KeyError:
pass
return self.get_delegated_role_property("threshold", role, parent_role)

def get_signable_metadata(self, role):
"""Return signable portion of newly generate metadata for given role.
Expand Down Expand Up @@ -1139,42 +1115,6 @@ def roles_yubikeys_update_method(self, role_name):
"targets": self.update_targets_yubikeys,
}.get(role_name, self.update_targets_yubikeys)

def set_metadata_expiration_date(self, role, start_date=None, interval=None):
"""Set expiration date of the provided role.
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- start_date(datetime): Date to which the specified interval is added when calculating
expiration date. If a value is not provided, it is set to the
current time.
- interval(int): A number of days added to the start date.
If not provided, the default value is set based on the role:
root - 365 days
targets - 90 days
snapshot - 7 days
timestamp - 1 day
all other roles (delegations) - same as targets
Returns:
None
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by
this targets object.
"""
role_obj = self._role_obj(role)
if start_date is None:
start_date = datetime.datetime.now()
if interval is None:
try:
interval = expiration_intervals[role]
except KeyError:
interval = expiration_intervals["targets"]

expiration_date = start_date + datetime.timedelta(interval)
role_obj.expiration = expiration_date

def set_delegated_role_property(self, property_name, role, value, parent_role=None):
"""
Expand Down
23 changes: 23 additions & 0 deletions taf/tests/test_repository/test_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest
from taf.exceptions import TAFError
from taf.tests.conftest import TEST_DATA_REPOS_PATH
from taf.tuf.repository import MetadataRepository

def test_get_threshold_no_delegations():
test_group_dir = TEST_DATA_REPOS_PATH / "test-repository-tool/test-happy-path-pkcs1v15" / "taf"
tuf_repo = MetadataRepository(test_group_dir)
assert tuf_repo.get_role_threshold("root") == 2
assert tuf_repo.get_role_threshold("targets") == 1
assert tuf_repo.get_role_threshold("snapshot") == 1
assert tuf_repo.get_role_threshold("timestamp") == 1
with pytest.raises(TAFError):
tuf_repo.get_role_threshold("doestexist")

def test_get_threshold_delegations():
test_group_dir = TEST_DATA_REPOS_PATH / "test-repository-tool/test-delegated-roles-pkcs1v15" / "taf"
tuf_repo = MetadataRepository(test_group_dir)
assert tuf_repo.get_role_threshold("delegated_role1") == 2
assert tuf_repo.get_role_threshold("delegated_role2") == 1
assert tuf_repo.get_role_threshold("inner_delegated_role") == 1


169 changes: 145 additions & 24 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import Dict, List
from securesystemslib.exceptions import StorageError

from securesystemslib.signer import Signer

from tuf.api.metadata import (
Metadata,
MetaFile,
Role,
Root,
Snapshot,
Targets,
TargetFile,
Timestamp,
)
from taf.exceptions import TAFError
from tuf.repository import Repository

logger = logging.getLogger(__name__)

METADATA_DIRECTORY_NAME = "metadata"

MAIN_ROLES = ["root", "targets", "snapshot", "timestamp"]


class MetadataRepository(Repository):
"""TUF metadata repository implementation for on-disk top-level roles.
Expand All @@ -41,7 +46,7 @@ class MetadataRepository(Repository):
role is used to sign the related role metadata.
"""

expiry_period = timedelta(days=1)
expiration_intervals = {"root": 365, "targets": 90, "snapshot": 7, "timestamp": 1}

def __init__(self, path: Path) -> None:
self.signer_cache: Dict[str, Dict[str, Signer]] = {}
Expand All @@ -64,14 +69,44 @@ def snapshot_info(self) -> MetaFile:
# tracks snapshot metadata changes, needed in `do_timestamp`
return self._snapshot_info

def add_target_files(self, target_files: List[TargetFile]) -> None:
"""Add target files to top-level targets metadata."""
with self.edit_targets() as targets:
for target_file in target_files:
targets.targets[target_file.path] = target_file

self.do_snapshot()
self.do_timestamp()

def add_keys(self, signers: List[Signer], role: str) -> None:
"""Add signer public keys for role to root and update signer cache."""
with self.edit_root() as root:
for signer in signers:
key = signer.public_key
self.signer_cache[role][key.keyid] = signer
root.add_key(key, role)

# Make sure the targets role gets signed with its new key, even though
# it wasn't updated itself.
if role == "targets":
with self.edit_targets():
pass

self.do_snapshot()
self.do_timestamp()

def open(self, role: str) -> Metadata:
"""Read role metadata from disk."""
return Metadata.from_file(self.metadata_path / f"{role}.json")
try:
return Metadata.from_file(self.metadata_path / f"{role}.json")
except StorageError:
raise TAFError(f"Metadata file {self.metadata_path} does not exist")

def close(self, role: str, md: Metadata) -> None:
"""Bump version and expiry, re-sign, and write role metadata to disk."""

# expiration date is updated before close is called
md.signed.version += 1
md.signed.expires = datetime.now(timezone.utc) + self.expiry_period

md.signatures.clear()
for signer in self.signer_cache[role].values():
Expand Down Expand Up @@ -122,28 +157,114 @@ def create(self, signers: Dict[str, Dict[str, Signer]]):
signed.version = 0 # `close` will bump to initial valid verison 1
self.close(signed.type, Metadata(signed))

def add_target_files(self, target_files: List[TargetFile]) -> None:
"""Add target files to top-level targets metadata."""
with self.edit_targets() as targets:
for target_file in target_files:
targets.targets[target_file.path] = target_file
def _find_delegated_role_parent(self, delegated_role, parent=None):
if parent is None:
parent = "targets"

parents = [parent]

while parents:
parent = parents.pop()
parent_obj = self._signed_obj(parent)
for delegation in parent_obj.delegations.roles:
if delegation == delegated_role:
return parent
parents.append(delegation)
return None


def _signed_obj(self, role):
md = self.open(role)
try:
singed_data = md.to_dict()["signed"]
role_to_role_class = {
"root": Root,
"targets": Targets,
"snapshot": Snapshot,
"timestamp": Timestamp
}
role_class = role_to_role_class.get(role, Targets)
return role_class.from_dict(singed_data)
except (KeyError, ValueError):
raise TAFError(f"Invalid metadata file {role}.json")

def _role_obj(self, role, parent=None):
if role in MAIN_ROLES:
md = self.open("root")
try:
data = md.to_dict()["signed"]["roles"][role]
return Role.from_dict(data)
except (KeyError, ValueError):
raise TAFError("root.json is invalid")
else:
parent_name = self._find_delegated_role_parent(role, parent)
if parent_name is None:
return None
md = self.open(parent_name)
delegations_data = md.to_dict()["signed"]["delegations"]["roles"]
for delegation in delegations_data:
if delegation["name"] == role:
try:
return Role.from_dict(delegation)
except (KeyError, ValueError):
raise TAFError(f"{delegation}.json is invalid")
return None


def get_role_threshold(self, role, parent=None):
"""Get threshold of the given role
self.do_snapshot()
self.do_timestamp()
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- parent_role(str): Name of the parent role of the delegated role. If not specified,
it will be set automatically, but this might be slow if there
are many delegations.
def add_keys(self, signers: List[Signer], role: str) -> None:
"""Add signer public keys for role to root and update signer cache."""
with self.edit_root() as root:
for signer in signers:
key = signer.public_key
self.signer_cache[role][key.keyid] = signer
root.add_key(key, role)
Returns:
Role's signatures threshold
# Make sure the targets role gets signed with its new key, even though
# it wasn't updated itself.
if role == "targets":
with self.edit_targets():
pass
Raises:
- TAFError if the role does not exist or if metadata files are invalid
"""
role_obj = self._role_obj(role, parent)
if role_obj is None:
raise TAFError(f"Role {role} does not exist")
return role_obj.threshold

self.do_snapshot()
self.do_timestamp()

def set_metadata_expiration_date(self, role, start_date=None, interval=None):
"""Set expiration date of the provided role.
Args:
- role(str): TUF role (root, targets, timestamp, snapshot or delegated one)
- start_date(datetime): Date to which the specified interval is added when calculating
expiration date. If a value is not provided, it is set to the
current time.
- interval(int): A number of days added to the start date.
If not provided, the default value is set based on the role:
root - 365 days
targets - 90 days
snapshot - 7 days
timestamp - 1 day
all other roles (delegations) - same as targets
Returns:
None
Raises:
- securesystemslib.exceptions.FormatError: If the arguments are improperly formatted.
- securesystemslib.exceptions.UnknownRoleError: If 'rolename' has not been delegated by
this targets object.
"""
md = self.open(role)
start_date = datetime.datetime.now()
if interval is None:
try:
interval = self.expiration_intervals[role]
except KeyError:
interval = self.expiration_intervals["targets"]
expiration_date = start_date + datetime.timedelta(interval)
md.signed.expires = expiration_date

self.close(role, md)

0 comments on commit 9bf3fb9

Please sign in to comment.