Skip to content

Commit

Permalink
refact: move dictionary keeping track of loaded yubikeys to auth repo
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Jan 17, 2025
1 parent f55d0a5 commit d8cd3a3
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 738 deletions.
6 changes: 3 additions & 3 deletions taf/api/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from taf.tuf.repository import METADATA_DIRECTORY_NAME
from taf.utils import ensure_pre_push_hook
from taf.log import taf_logger
from taf.yubikey.pin_manager import PinManager
from taf.yubikey.yubikey_manager import PinManager


@log_on_start(
Expand Down Expand Up @@ -63,7 +63,6 @@ def create_repository(
Returns:
None
"""
import pdb; pdb.set_trace()
if not _check_if_can_create_repository(Path(path)):
return

Expand All @@ -76,9 +75,10 @@ def create_repository(
)

roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)
auth_repo = AuthenticationRepository(path=path)
auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager)
signers, verification_keys, key_name_mappings = load_sorted_keys_of_new_roles(
roles=roles_keys_data.roles,
auth_repo=auth_repo,
yubikeys_data=roles_keys_data.yubikeys,
keystore=keystore,
skip_prompt=skip_prompt,
Expand Down
2 changes: 2 additions & 0 deletions taf/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def add_role(

signers, _, key_name_mappings = load_sorted_keys_of_new_roles(
roles=new_role,
auth_repo=auth_repo,
yubikeys_data=None,
keystore=keystore_path,
skip_prompt=skip_prompt,
Expand Down Expand Up @@ -293,6 +294,7 @@ def add_multiple_roles(
for role_to_add_data in roles_to_add_data:
signers, _, key_name_mappings = load_sorted_keys_of_new_roles(
roles=role_to_add_data,
auth_repo=auth_repo,
yubikeys_data=None,
keystore=keystore_path,
skip_prompt=not prompt_for_keys,
Expand Down
4 changes: 3 additions & 1 deletion taf/auth_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_target_path,
)
from taf.constants import INFO_JSON_PATH
from taf.yubikey.yubikey_manager import PinManager


class AuthenticationRepository(GitRepository):
Expand All @@ -40,6 +41,7 @@ def __init__(
out_of_band_authentication: Optional[str] = None,
path: Optional[Union[Path, str]] = None,
alias: Optional[str] = None,
pin_manager: Optional[PinManager] = None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -78,7 +80,7 @@ def __init__(
self.conf_directory_root = conf_directory_root_path.resolve()
self.out_of_band_authentication = out_of_band_authentication
self._storage = GitStorageBackend()
self._tuf_repository = TUFRepository(self.path, storage=self._storage)
self._tuf_repository = TUFRepository(self.path, storage=self._storage, pin_manager=pin_manager)

def __getattr__(self, item):
"""Delegate attribute lookup to TUFRepository instance"""
Expand Down
56 changes: 30 additions & 26 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import click
from pathlib import Path
from logdecorator import log_on_start
from taf.auth_repo import AuthenticationRepository
from taf.log import taf_logger
from taf.models.types import Role, RolesIterator
from taf.models.models import TAFKey
Expand Down Expand Up @@ -35,7 +36,7 @@
from securesystemslib.signer._crypto_signer import CryptoSigner

try:
import taf.yubikey as yk
import taf.yubikey.yubikey as yk
except ImportError:
taf_logger.warning(
"WARNING: yubikey-manager dependency not installed. You will not be able to use YubiKeys."
Expand Down Expand Up @@ -91,9 +92,9 @@ def _get_attr(oid):

def load_sorted_keys_of_new_roles(
roles: Union[MainRoles, TargetsRole],
auth_repo: AuthenticationRepository,
yubikeys_data: Optional[Dict[str, UserKeyData]],
keystore: Optional[Union[Path, str]],
yubikeys: Optional[Dict[str, Dict]] = None,
existing_roles: Optional[List[str]] = None,
skip_prompt: Optional[bool] = False,
certs_dir: Optional[Union[Path, str]] = None,
Expand All @@ -111,7 +112,7 @@ def load_sorted_keys_of_new_roles(
If additional details contain the public key, a user will not have to insert that YubiKey
(provided that it's not necessary given the threshold of signing keys)
keystore: keystore path
yubikeys:(optional): A dictionary containing previously loaded YubiKeys used to save already entered pins
pint_manager: Instance of a class for secure pin management
existing_roles (optional): A list of roles whose keys were already loaded
skip_prompt (optional): A flag defining if the user will be asked if they want to generate new keys or reuse existing
ones in case keystore files should be used. New keys will be generated by default.
Expand All @@ -132,8 +133,6 @@ def _sort_roles(roles):
keystore_roles.append(role)
return keystore_roles, yubikey_roles

if yubikeys is None:
yubikeys = defaultdict(dict)
# load and/or generate all keys first
if existing_roles is None:
existing_roles = []
Expand All @@ -148,20 +147,23 @@ def _sort_roles(roles):
continue
keystore_signers, _, _, key_name_mapping = setup_roles_keys(
role,
auth_repo,
keystore=keystore,
skip_prompt=skip_prompt,
)
for signer in keystore_signers:
signers.setdefault(role.name, []).append(signer)

# TODO add to repository...
keys_name_mappings.update(key_name_mapping)

for role in yubikey_roles:
if role.name in existing_roles:
continue
_, yubikey_keys, yubikey_signers, key_name_mapping = setup_roles_keys(
role,
auth_repo,
certs_dir=certs_dir,
yubikeys=yubikeys,
users_yubikeys_details=yubikeys_data,
skip_prompt=skip_prompt,
)
Expand Down Expand Up @@ -217,7 +219,7 @@ def _load_and_append_yubikeys(
signer = YkSigner(
public_key,
serial_num,
partial(yk.yk_secrets_handler, serial_num=serial_num),
partial(yk.yk_secrets_handler, pin_manager=taf_repo.pin_manager, serial_num=serial_num),
)
signers_yubikeys.append(signer)
loaded_keyids.append(public_key.keyid)
Expand Down Expand Up @@ -325,9 +327,9 @@ def load_signers(

def setup_roles_keys(
role: Role,
auth_repo: AuthenticationRepository,
certs_dir: Optional[Union[Path, str]] = None,
keystore: Optional[Union[Path, str]] = None,
yubikeys: Optional[Dict] = None,
users_yubikeys_details: Optional[Dict[str, UserKeyData]] = None,
skip_prompt: Optional[bool] = False,
key_size: int = 2048,
Expand All @@ -350,7 +352,7 @@ def setup_roles_keys(

if is_yubikey:
yubikey_keys, yubikey_signers, keys_name_mapping = _setup_yubikey_roles_keys(
yubikey_ids, users_yubikeys_details, yubikeys, role, certs_dir, key_size
auth_repo, yubikey_ids, users_yubikeys_details, role, certs_dir, key_size
)
keys_name_mappings.update(keys_name_mapping)
else:
Expand All @@ -376,7 +378,7 @@ def setup_roles_keys(


def _setup_yubikey_roles_keys(
yubikey_ids, users_yubikeys_details, yubikeys, role, certs_dir, key_size
auth_repo, yubikey_ids, users_yubikeys_details, role, certs_dir, key_size
):
loaded_keys_num = 0
yk_with_public_key = {}
Expand All @@ -391,7 +393,7 @@ def _setup_yubikey_roles_keys(
scheme = users_yubikeys_details[key_name].scheme
public_key = get_sslib_key_from_value(public_key_text, scheme)
# Check if the signing key is already loaded
if not yk.get_key_serial_by_id(key_name):
if not auth_repo.yubikey_store.is_key_name_loaded(key_name):
yk_with_public_key[key_name] = public_key
else:
loaded_keys_num += 1
Expand All @@ -401,21 +403,24 @@ def _setup_yubikey_roles_keys(
if key_name in users_yubikeys_details:
key_scheme = users_yubikeys_details[key_name].scheme
key_scheme = key_scheme or role.scheme
public_key, serial_num = _setup_yubikey(
yubikeys,
role.name,
key_name,
yubikey_keys,
key_scheme,
certs_dir,
key_size,
require_single_yk=True,
)
if auth_repo.yubikey_store.is_key_name_loaded(key_name):
public_key, serial_num = auth_repo.yubikey_store.get_key_data(key_name)
else:
public_key, serial_num = _setup_yubikey(
auth_repo,
role.name,
key_name,
yubikey_keys,
key_scheme,
certs_dir,
key_size,
require_single_yk=True,
)
loaded_keys_num += 1
signer = YkSigner(
public_key,
serial_num,
partial(yk.yk_secrets_handler, serial_num=serial_num),
partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num),
)
signers.append(signer)
keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name
Expand All @@ -438,7 +443,7 @@ def _setup_yubikey_roles_keys(
loaded_keys.append(key_name)
signer = YkSigner(
public_key,
partial(yk.yk_secrets_handler, serial_num=serial_num),
partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num)
)
signers.append(signer)
if loaded_keys_num == role.threshold:
Expand Down Expand Up @@ -530,7 +535,7 @@ def _invalid_key_message(key_name, keystore):


def _setup_yubikey(
yubikeys: Optional[Dict],
auth_repo: AuthenticationRepository,
role_name: str,
key_name: str,
loaded_keys: List[str],
Expand All @@ -552,10 +557,9 @@ def _setup_yubikey(
yubikeys = yk.yubikey_prompt(
key_name,
role_name,
taf_repo=None,
taf_repo=auth_repo,
registering_new_key=True,
creating_new_key=not use_existing,
loaded_yubikeys=yubikeys,
pin_confirm=True,
pin_repeat=True,
require_single_yubikey=True,
Expand Down
2 changes: 1 addition & 1 deletion taf/tools/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from taf.tools.cli import catch_cli_exception, find_repository
from taf.updater.types.update import UpdateType
from taf.updater.updater import OperationType, UpdateConfig, clone_repository, update_repository, validate_repository
from taf.yubikey.pin_manager import pin_managed
from taf.yubikey.yubikey_manager import pin_managed


def common_update_options(f):
Expand Down
4 changes: 2 additions & 2 deletions taf/tuf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,15 @@ def import_(cls) -> SSlibKey:
securesystemslib signers, e.g. `HSMSigner.import_`.
"""
# TODO: export pyca/cryptography key to avoid duplicate deserialization
from taf.yubikey import export_piv_pub_key
from taf.yubikey.yubikey import export_piv_pub_key

pem = export_piv_pub_key()
pub = load_pem_public_key(pem)
return _from_crypto(pub)

def sign(self, payload: bytes) -> Signature:
pin = self._pin_handler(self._SECRET_PROMPT)
from taf.yubikey import sign_piv_rsa_pkcs1v15
from taf.yubikey.yubikey import sign_piv_rsa_pkcs1v15

sig = sign_piv_rsa_pkcs1v15(payload, pin, serial=self.serial_num)
return Signature(self.public_key.keyid, sig.hex())
Expand Down
78 changes: 38 additions & 40 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

from securesystemslib.storage import FilesystemBackend

from taf.yubikey.yubikey_manager import YubiKeyStore
from tuf.api.metadata import Signed

try:
import taf.yubikey as yk
import taf.yubikey.yubikey as yk
except ImportError:
yk = YubikeyMissingLibrary() # type: ignore

Expand Down Expand Up @@ -143,6 +144,7 @@ def __init__(self, path: Union[Path, str], *args, **kwargs) -> None:
self.storage_backend = FilesystemBackend()
self._metadata_to_keep_open: Set[str] = set()
self.pin_manager = pin_manager
self.yubikey_store = YubiKeyStore()

@property
def metadata_path(self) -> Path:
Expand Down Expand Up @@ -172,45 +174,6 @@ def snapshot_info(self) -> MetaFile:
"""
return self._snapshot_info

def calculate_hashes(self, md: Metadata, algorithms: List[str]) -> Dict:
"""
Calculate hashes of the specified signed metadata after serializing
it using the previously initialized serializer.
Hashes are computed for each specified algorithm.
Arguments:
md: Signed metadata
algorithms: A list of hash algorithms (e.g., 'sha256', 'sha512').
Return:
A dcitionary mapping algorithms and calculated hashes
"""
hashes = {}
data = md.to_bytes(serializer=self.serializer)
for algo in algorithms:
digest_object = sslib_hash.digest(algo)
digest_object.update(data)

hashes[algo] = digest_object.hexdigest()
return hashes

def calculate_length(self, md: Metadata) -> int:
"""
Calculate length of the specified signed metadata after serializing
it using the previously initialized serializer.
Arguments:
md: Signed metadata
Return:
Langth of the signed metadata
"""
data = md.to_bytes(serializer=self.serializer)
return len(data)

def add_signers_to_cache(self, roles_signers: Dict):
for role, signers in roles_signers.items():
if self._role_obj(role):
self._load_role_signers(role, signers)

def all_target_files(self) -> Set:
"""
Return a set of relative paths of all files inside the targets
Expand Down Expand Up @@ -363,6 +326,41 @@ def open(self, role: str) -> Metadata:
except StorageError:
raise TAFError(f"Metadata file {path} does not exist")

def calculate_hashes(self, md: Metadata, algorithms: List[str]) -> Dict:
"""
Calculate hashes of the specified signed metadata after serializing
it using the previously initialized serializer.
Hashes are computed for each specified algorithm.
Arguments:
md: Signed metadata
algorithms: A list of hash algorithms (e.g., 'sha256', 'sha512').
Return:
A dcitionary mapping algorithms and calculated hashes
"""
hashes = {}
data = md.to_bytes(serializer=self.serializer)
for algo in algorithms:
digest_object = sslib_hash.digest(algo)
digest_object.update(data)

hashes[algo] = digest_object.hexdigest()
return hashes

def calculate_length(self, md: Metadata) -> int:
"""
Calculate length of the specified signed metadata after serializing
it using the previously initialized serializer.
Arguments:
md: Signed metadata
Return:
Langth of the signed metadata
"""
data = md.to_bytes(serializer=self.serializer)
return len(data)


def check_if_keys_loaded(self, role_name: str) -> bool:
"""
Check if at least a threshold of signers of the specified role
Expand Down
Loading

0 comments on commit d8cd3a3

Please sign in to comment.