From d8cd3a350d9b8c7bba47ea339a1bcb4f1a4a645f Mon Sep 17 00:00:00 2001 From: Renata Date: Thu, 16 Jan 2025 23:27:47 -0500 Subject: [PATCH] refact: move dictionary keeping track of loaded yubikeys to auth repo --- taf/api/repository.py | 6 +- taf/api/roles.py | 2 + taf/auth_repo.py | 4 +- taf/keys.py | 56 ++-- taf/tools/repo/__init__.py | 2 +- taf/tuf/keys.py | 4 +- taf/tuf/repository.py | 78 +++-- taf/yubikey.py | 566 --------------------------------- taf/yubikey/pin_manager.py | 36 --- taf/yubikey/yubikey.py | 79 +---- taf/yubikey/yubikey_manager.py | 71 +++++ 11 files changed, 166 insertions(+), 738 deletions(-) delete mode 100644 taf/yubikey.py delete mode 100644 taf/yubikey/pin_manager.py create mode 100644 taf/yubikey/yubikey_manager.py diff --git a/taf/api/repository.py b/taf/api/repository.py index 9be986ec..e8b33b81 100644 --- a/taf/api/repository.py +++ b/taf/api/repository.py @@ -23,7 +23,7 @@ from taf.tuf.repository import METADATA_DIRECTORY_NAME from taf.utils import ensure_pre_push_hook from taf.log import taf_logger -from taf.yubikey.pin_manager import PinManager +from taf.yubikey.yubikey_manager import PinManager @log_on_start( @@ -63,7 +63,6 @@ def create_repository( Returns: None """ - import pdb; pdb.set_trace() if not _check_if_can_create_repository(Path(path)): return @@ -76,9 +75,10 @@ def create_repository( ) roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData) - auth_repo = AuthenticationRepository(path=path) + auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager) signers, verification_keys, key_name_mappings = load_sorted_keys_of_new_roles( roles=roles_keys_data.roles, + auth_repo=auth_repo, yubikeys_data=roles_keys_data.yubikeys, keystore=keystore, skip_prompt=skip_prompt, diff --git a/taf/api/roles.py b/taf/api/roles.py index 70ceea40..916bf931 100644 --- a/taf/api/roles.py +++ b/taf/api/roles.py @@ -130,6 +130,7 @@ def add_role( signers, _, key_name_mappings = load_sorted_keys_of_new_roles( roles=new_role, + auth_repo=auth_repo, yubikeys_data=None, keystore=keystore_path, skip_prompt=skip_prompt, @@ -293,6 +294,7 @@ def add_multiple_roles( for role_to_add_data in roles_to_add_data: signers, _, key_name_mappings = load_sorted_keys_of_new_roles( roles=role_to_add_data, + auth_repo=auth_repo, yubikeys_data=None, keystore=keystore_path, skip_prompt=not prompt_for_keys, diff --git a/taf/auth_repo.py b/taf/auth_repo.py index 5d7af21f..25c395d0 100644 --- a/taf/auth_repo.py +++ b/taf/auth_repo.py @@ -16,6 +16,7 @@ get_target_path, ) from taf.constants import INFO_JSON_PATH +from taf.yubikey.yubikey_manager import PinManager class AuthenticationRepository(GitRepository): @@ -40,6 +41,7 @@ def __init__( out_of_band_authentication: Optional[str] = None, path: Optional[Union[Path, str]] = None, alias: Optional[str] = None, + pin_manager: Optional[PinManager] = None, *args, **kwargs, ): @@ -78,7 +80,7 @@ def __init__( self.conf_directory_root = conf_directory_root_path.resolve() self.out_of_band_authentication = out_of_band_authentication self._storage = GitStorageBackend() - self._tuf_repository = TUFRepository(self.path, storage=self._storage) + self._tuf_repository = TUFRepository(self.path, storage=self._storage, pin_manager=pin_manager) def __getattr__(self, item): """Delegate attribute lookup to TUFRepository instance""" diff --git a/taf/keys.py b/taf/keys.py index b7f3f3f5..29b0368a 100644 --- a/taf/keys.py +++ b/taf/keys.py @@ -5,6 +5,7 @@ import click from pathlib import Path from logdecorator import log_on_start +from taf.auth_repo import AuthenticationRepository from taf.log import taf_logger from taf.models.types import Role, RolesIterator from taf.models.models import TAFKey @@ -35,7 +36,7 @@ from securesystemslib.signer._crypto_signer import CryptoSigner try: - import taf.yubikey as yk + import taf.yubikey.yubikey as yk except ImportError: taf_logger.warning( "WARNING: yubikey-manager dependency not installed. You will not be able to use YubiKeys." @@ -91,9 +92,9 @@ def _get_attr(oid): def load_sorted_keys_of_new_roles( roles: Union[MainRoles, TargetsRole], + auth_repo: AuthenticationRepository, yubikeys_data: Optional[Dict[str, UserKeyData]], keystore: Optional[Union[Path, str]], - yubikeys: Optional[Dict[str, Dict]] = None, existing_roles: Optional[List[str]] = None, skip_prompt: Optional[bool] = False, certs_dir: Optional[Union[Path, str]] = None, @@ -111,7 +112,7 @@ def load_sorted_keys_of_new_roles( If additional details contain the public key, a user will not have to insert that YubiKey (provided that it's not necessary given the threshold of signing keys) keystore: keystore path - yubikeys:(optional): A dictionary containing previously loaded YubiKeys used to save already entered pins + pint_manager: Instance of a class for secure pin management existing_roles (optional): A list of roles whose keys were already loaded skip_prompt (optional): A flag defining if the user will be asked if they want to generate new keys or reuse existing ones in case keystore files should be used. New keys will be generated by default. @@ -132,8 +133,6 @@ def _sort_roles(roles): keystore_roles.append(role) return keystore_roles, yubikey_roles - if yubikeys is None: - yubikeys = defaultdict(dict) # load and/or generate all keys first if existing_roles is None: existing_roles = [] @@ -148,11 +147,14 @@ def _sort_roles(roles): continue keystore_signers, _, _, key_name_mapping = setup_roles_keys( role, + auth_repo, keystore=keystore, skip_prompt=skip_prompt, ) for signer in keystore_signers: signers.setdefault(role.name, []).append(signer) + + # TODO add to repository... keys_name_mappings.update(key_name_mapping) for role in yubikey_roles: @@ -160,8 +162,8 @@ def _sort_roles(roles): continue _, yubikey_keys, yubikey_signers, key_name_mapping = setup_roles_keys( role, + auth_repo, certs_dir=certs_dir, - yubikeys=yubikeys, users_yubikeys_details=yubikeys_data, skip_prompt=skip_prompt, ) @@ -217,7 +219,7 @@ def _load_and_append_yubikeys( signer = YkSigner( public_key, serial_num, - partial(yk.yk_secrets_handler, serial_num=serial_num), + partial(yk.yk_secrets_handler, pin_manager=taf_repo.pin_manager, serial_num=serial_num), ) signers_yubikeys.append(signer) loaded_keyids.append(public_key.keyid) @@ -325,9 +327,9 @@ def load_signers( def setup_roles_keys( role: Role, + auth_repo: AuthenticationRepository, certs_dir: Optional[Union[Path, str]] = None, keystore: Optional[Union[Path, str]] = None, - yubikeys: Optional[Dict] = None, users_yubikeys_details: Optional[Dict[str, UserKeyData]] = None, skip_prompt: Optional[bool] = False, key_size: int = 2048, @@ -350,7 +352,7 @@ def setup_roles_keys( if is_yubikey: yubikey_keys, yubikey_signers, keys_name_mapping = _setup_yubikey_roles_keys( - yubikey_ids, users_yubikeys_details, yubikeys, role, certs_dir, key_size + auth_repo, yubikey_ids, users_yubikeys_details, role, certs_dir, key_size ) keys_name_mappings.update(keys_name_mapping) else: @@ -376,7 +378,7 @@ def setup_roles_keys( def _setup_yubikey_roles_keys( - yubikey_ids, users_yubikeys_details, yubikeys, role, certs_dir, key_size + auth_repo, yubikey_ids, users_yubikeys_details, role, certs_dir, key_size ): loaded_keys_num = 0 yk_with_public_key = {} @@ -391,7 +393,7 @@ def _setup_yubikey_roles_keys( scheme = users_yubikeys_details[key_name].scheme public_key = get_sslib_key_from_value(public_key_text, scheme) # Check if the signing key is already loaded - if not yk.get_key_serial_by_id(key_name): + if not auth_repo.yubikey_store.is_key_name_loaded(key_name): yk_with_public_key[key_name] = public_key else: loaded_keys_num += 1 @@ -401,21 +403,24 @@ def _setup_yubikey_roles_keys( if key_name in users_yubikeys_details: key_scheme = users_yubikeys_details[key_name].scheme key_scheme = key_scheme or role.scheme - public_key, serial_num = _setup_yubikey( - yubikeys, - role.name, - key_name, - yubikey_keys, - key_scheme, - certs_dir, - key_size, - require_single_yk=True, - ) + if auth_repo.yubikey_store.is_key_name_loaded(key_name): + public_key, serial_num = auth_repo.yubikey_store.get_key_data(key_name) + else: + public_key, serial_num = _setup_yubikey( + auth_repo, + role.name, + key_name, + yubikey_keys, + key_scheme, + certs_dir, + key_size, + require_single_yk=True, + ) loaded_keys_num += 1 signer = YkSigner( public_key, serial_num, - partial(yk.yk_secrets_handler, serial_num=serial_num), + partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num), ) signers.append(signer) keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name @@ -438,7 +443,7 @@ def _setup_yubikey_roles_keys( loaded_keys.append(key_name) signer = YkSigner( public_key, - partial(yk.yk_secrets_handler, serial_num=serial_num), + partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num) ) signers.append(signer) if loaded_keys_num == role.threshold: @@ -530,7 +535,7 @@ def _invalid_key_message(key_name, keystore): def _setup_yubikey( - yubikeys: Optional[Dict], + auth_repo: AuthenticationRepository, role_name: str, key_name: str, loaded_keys: List[str], @@ -552,10 +557,9 @@ def _setup_yubikey( yubikeys = yk.yubikey_prompt( key_name, role_name, - taf_repo=None, + taf_repo=auth_repo, registering_new_key=True, creating_new_key=not use_existing, - loaded_yubikeys=yubikeys, pin_confirm=True, pin_repeat=True, require_single_yubikey=True, diff --git a/taf/tools/repo/__init__.py b/taf/tools/repo/__init__.py index 7e295fc1..7e6bf54d 100644 --- a/taf/tools/repo/__init__.py +++ b/taf/tools/repo/__init__.py @@ -10,7 +10,7 @@ from taf.tools.cli import catch_cli_exception, find_repository from taf.updater.types.update import UpdateType from taf.updater.updater import OperationType, UpdateConfig, clone_repository, update_repository, validate_repository -from taf.yubikey.pin_manager import pin_managed +from taf.yubikey.yubikey_manager import pin_managed def common_update_options(f): diff --git a/taf/tuf/keys.py b/taf/tuf/keys.py index 57f5ddc4..c474a45a 100644 --- a/taf/tuf/keys.py +++ b/taf/tuf/keys.py @@ -213,7 +213,7 @@ def import_(cls) -> SSlibKey: securesystemslib signers, e.g. `HSMSigner.import_`. """ # TODO: export pyca/cryptography key to avoid duplicate deserialization - from taf.yubikey import export_piv_pub_key + from taf.yubikey.yubikey import export_piv_pub_key pem = export_piv_pub_key() pub = load_pem_public_key(pem) @@ -221,7 +221,7 @@ def import_(cls) -> SSlibKey: def sign(self, payload: bytes) -> Signature: pin = self._pin_handler(self._SECRET_PROMPT) - from taf.yubikey import sign_piv_rsa_pkcs1v15 + from taf.yubikey.yubikey import sign_piv_rsa_pkcs1v15 sig = sign_piv_rsa_pkcs1v15(payload, pin, serial=self.serial_num) return Signature(self.public_key.keyid, sig.hex()) diff --git a/taf/tuf/repository.py b/taf/tuf/repository.py index c6e5d542..3ce7ee01 100644 --- a/taf/tuf/repository.py +++ b/taf/tuf/repository.py @@ -22,10 +22,11 @@ from securesystemslib.storage import FilesystemBackend +from taf.yubikey.yubikey_manager import YubiKeyStore from tuf.api.metadata import Signed try: - import taf.yubikey as yk + import taf.yubikey.yubikey as yk except ImportError: yk = YubikeyMissingLibrary() # type: ignore @@ -143,6 +144,7 @@ def __init__(self, path: Union[Path, str], *args, **kwargs) -> None: self.storage_backend = FilesystemBackend() self._metadata_to_keep_open: Set[str] = set() self.pin_manager = pin_manager + self.yubikey_store = YubiKeyStore() @property def metadata_path(self) -> Path: @@ -172,45 +174,6 @@ def snapshot_info(self) -> MetaFile: """ return self._snapshot_info - def calculate_hashes(self, md: Metadata, algorithms: List[str]) -> Dict: - """ - Calculate hashes of the specified signed metadata after serializing - it using the previously initialized serializer. - Hashes are computed for each specified algorithm. - - Arguments: - md: Signed metadata - algorithms: A list of hash algorithms (e.g., 'sha256', 'sha512'). - Return: - A dcitionary mapping algorithms and calculated hashes - """ - hashes = {} - data = md.to_bytes(serializer=self.serializer) - for algo in algorithms: - digest_object = sslib_hash.digest(algo) - digest_object.update(data) - - hashes[algo] = digest_object.hexdigest() - return hashes - - def calculate_length(self, md: Metadata) -> int: - """ - Calculate length of the specified signed metadata after serializing - it using the previously initialized serializer. - - Arguments: - md: Signed metadata - Return: - Langth of the signed metadata - """ - data = md.to_bytes(serializer=self.serializer) - return len(data) - - def add_signers_to_cache(self, roles_signers: Dict): - for role, signers in roles_signers.items(): - if self._role_obj(role): - self._load_role_signers(role, signers) - def all_target_files(self) -> Set: """ Return a set of relative paths of all files inside the targets @@ -363,6 +326,41 @@ def open(self, role: str) -> Metadata: except StorageError: raise TAFError(f"Metadata file {path} does not exist") + def calculate_hashes(self, md: Metadata, algorithms: List[str]) -> Dict: + """ + Calculate hashes of the specified signed metadata after serializing + it using the previously initialized serializer. + Hashes are computed for each specified algorithm. + + Arguments: + md: Signed metadata + algorithms: A list of hash algorithms (e.g., 'sha256', 'sha512'). + Return: + A dcitionary mapping algorithms and calculated hashes + """ + hashes = {} + data = md.to_bytes(serializer=self.serializer) + for algo in algorithms: + digest_object = sslib_hash.digest(algo) + digest_object.update(data) + + hashes[algo] = digest_object.hexdigest() + return hashes + + def calculate_length(self, md: Metadata) -> int: + """ + Calculate length of the specified signed metadata after serializing + it using the previously initialized serializer. + + Arguments: + md: Signed metadata + Return: + Langth of the signed metadata + """ + data = md.to_bytes(serializer=self.serializer) + return len(data) + + def check_if_keys_loaded(self, role_name: str) -> bool: """ Check if at least a threshold of signers of the specified role diff --git a/taf/yubikey.py b/taf/yubikey.py deleted file mode 100644 index 9ace13cd..00000000 --- a/taf/yubikey.py +++ /dev/null @@ -1,566 +0,0 @@ -import datetime -from contextlib import contextmanager -from functools import wraps -from collections import defaultdict -from getpass import getpass -from pathlib import Path -from typing import Callable, Dict, Optional - -import click -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.serialization import load_pem_private_key -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import rsa, padding - -from taf.tuf.keys import get_sslib_key_from_value -from ykman.device import list_all_devices -from yubikit.core.smartcard import SmartCardConnection -from ykman.piv import ( - KEY_TYPE, - MANAGEMENT_KEY_TYPE, - SLOT, - PivSession, - generate_random_management_key, -) -from yubikit.piv import ( - DEFAULT_MANAGEMENT_KEY, - PIN_POLICY, - InvalidPinError, -) - -from taf.constants import DEFAULT_RSA_SIGNATURE_SCHEME -from taf.exceptions import InvalidPINError, YubikeyError -from taf.utils import get_pin_for -from taf.log import taf_logger - -from securesystemslib.signer._key import SSlibKey - -DEFAULT_PIN = "123456" -DEFAULT_PUK = "12345678" -EXPIRATION_INTERVAL = 36500 - -_yks_data_dict: Dict = defaultdict(dict) - - -def add_key_id_mapping(serial_num: str, keyid: str) -> None: - if "ids" not in _yks_data_dict: - _yks_data_dict["ids"] = defaultdict(dict) - if keyid not in _yks_data_dict["ids"]: - _yks_data_dict["ids"][keyid] = serial_num - - -def add_key_pin(serial_num: str, pin: str) -> None: - _yks_data_dict[serial_num]["pin"] = pin - - -def add_key_public_key(serial_num: str, public_key: Dict) -> None: - _yks_data_dict[serial_num]["public_key"] = public_key - - -def get_key_pin(serial_num: int) -> Optional[str]: - if serial_num in _yks_data_dict: - return _yks_data_dict.get(serial_num, {}).get("pin") - return None - - -def get_key_serial_by_id(keyid: str) -> Optional[str]: - return _yks_data_dict.get("ids", {}).get(keyid) - - -def get_key_public_key(serial_num: str) -> Optional[Dict]: - if serial_num in _yks_data_dict: - return _yks_data_dict.get(serial_num, {}).get("public_key") - return None - - -def raise_yubikey_err(msg: Optional[str] = None) -> Callable: - """Decorator used to catch all errors raised by yubikey-manager and raise - YubikeyError. We don't need to handle specific cases. - """ - - def wrapper(f): - @wraps(f) - def decorator(*args, **kwargs): - try: - return f(*args, **kwargs) - except YubikeyError: - raise - except Exception as e: - err_msg = ( - f"{msg} Reason: ({type(e).__name__}) {str(e)}" if msg else str(e) - ) - raise YubikeyError(err_msg) from e - - return decorator - - return wrapper - - -@contextmanager -def _yk_piv_ctrl(serial=None): - """Context manager to open connection and instantiate Piv Session. - - Args: - - serial (str): Match Yubikey's serial multiple keys are inserted - - Returns: - - ykman.piv.PivSession - - Raises: - - YubikeyError - """ - sessions = [] - devices_info = [] - try: - for dev, info in list_all_devices(): - if serial is None or info.serial == serial: - connection = dev.open_connection(SmartCardConnection) - try: - session = PivSession(connection) - sessions.append((session, info.serial)) - devices_info.append( - (connection, session) - ) # Store to manage cleanup - if serial is not None: - break - except Exception as e: - connection.close() # Ensure we close connection on error - raise e - if serial is not None: - session, serial = sessions[0] - yield session, serial - else: - yield sessions - finally: - # Cleanup: ensure all connections are closed properly - for connection, _ in devices_info: - connection.close() - - -def is_inserted(): - """Checks if YubiKey is inserted. - - Args: - None - - Returns: - True if at least one Yubikey is inserted (bool) - - Raises: - - YubikeyError - """ - return len(list(list_all_devices())) > 0 - - -@raise_yubikey_err() -def is_valid_pin(pin, serial=None): - """Checks if given pin is valid. - - Args: - pin(str): Yubikey piv PIN - - Returns: - tuple: True if PIN is valid, otherwise False, number of PIN retries - - Raises: - - YubikeyError - """ - with _yk_piv_ctrl(serial=serial) as (ctrl, _): - try: - ctrl.verify_pin(pin) - return True, None # ctrl.get_pin_tries() fails if PIN is valid - except InvalidPinError: - return False, ctrl.get_pin_attempts() - - -@raise_yubikey_err("Cannot get serial number.") -def get_serial_num(): - """Get Yubikey serial number. - - Args: - - pub_key_pem(str): Match Yubikey's public key (PEM) if multiple keys - are inserted - - Returns: - Yubikey serial number - - Raises: - - YubikeyError - """ - serials = [] - with _yk_piv_ctrl() as sessions: - for _, serial in sessions: - # Process each session - serials.append(serial) - return serials - - -@raise_yubikey_err("Cannot export x509 certificate.") -def export_piv_x509(cert_format=serialization.Encoding.PEM, serial=None): - """Exports YubiKey's piv slot x509. - - Args: - - cert_format(str): One of 'serialization.Encoding' formats. - - pub_key_pem(str): Match Yubikey's public key (PEM) if multiple keys - are inserted - - Returns: - PIV x509 certificate in a given format (bytes) - - Raises: - - YubikeyError - """ - with _yk_piv_ctrl(serial=serial) as (ctrl, _): - x509 = ctrl.get_certificate(SLOT.SIGNATURE) - return x509.public_bytes(encoding=cert_format) - - -@raise_yubikey_err("Cannot export public key.") -def export_piv_pub_key(pub_key_format=serialization.Encoding.PEM, serial=None): - """Exports YubiKey's piv slot public key. - - Args: - - pub_key_format(str): One of 'serialization.Encoding' formats. - - pub_key_pem(str): Match Yubikey's public key (PEM) if multiple keys - are inserted - - Returns: - PIV public key in a given format (bytes) - - Raises: - - YubikeyError - """ - with _yk_piv_ctrl(serial=serial) as (ctrl, _): - try: - x509_cert = ctrl.get_certificate(SLOT.SIGNATURE) - public_key = x509_cert.public_key() - return public_key.public_bytes( - encoding=pub_key_format, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ) - except Exception as e: - raise YubikeyError(f"Failed to export public key: {str(e)}") from e - - -@raise_yubikey_err("Cannot export yk certificate.") -def export_yk_certificate(certs_dir, key: SSlibKey, serial: str): - if certs_dir is None: - certs_dir = Path.home() - else: - certs_dir = Path(certs_dir) - certs_dir.mkdir(parents=True, exist_ok=True) - cert_path = certs_dir / f"{key.keyid}.cert" - print(f"Exporting certificate to {cert_path}") - with open(cert_path, "wb") as f: - f.write(export_piv_x509(serial=serial)) - - -@raise_yubikey_err("Cannot get public key in TUF format.") -def get_piv_public_key_tuf( - scheme=DEFAULT_RSA_SIGNATURE_SCHEME, serial=None -) -> SSlibKey: - """Return public key from a Yubikey in TUF's RSAKEY_SCHEMA format. - - Args: - - scheme(str): Rsa signature scheme (default is rsa-pkcs1v15-sha256) - - pub_key_pem(str): Match Yubikey's public key (PEM) if multiple keys - are inserted - - Returns: - A dictionary containing the RSA keys and other identifying information - from inserted smart card. - Conforms to 'securesystemslib.formats.RSAKEY_SCHEMA'. - - Raises: - - YubikeyError - """ - pub_key_pem = export_piv_pub_key(serial=serial).decode("utf-8") - return get_sslib_key_from_value(pub_key_pem, scheme) - - -def list_connected_yubikeys(): - """Lists all connected YubiKeys with their serial numbers and details.""" - yubikeys = list_all_devices() - if not yubikeys: - print("No YubiKeys connected.") - else: - for index, (_, info) in enumerate(yubikeys, start=1): - print(f"YubiKey {index}:") - print(f" Serial Number: {info.serial}") - print(f" Version: {info.version}") - print(f" Form Factor: {info.form_factor}") - - -# TODO -# need to pass in multiple key names -def _read_and_check_yubikeys( - key_name, - role, - taf_repo, - registering_new_key, - creating_new_key, - loaded_yubikeys, - pin_confirm, - pin_repeat, - prompt_message, - retrying, -): - - if retrying: - if prompt_message is None: - prompt_message = f"Please insert {key_name} YubiKey and press ENTER" - getpass(prompt_message) - # make sure that YubiKey is inserted - try: - serials = get_serial_num() - except Exception: - taf_logger.log("NOTICE", "No YubiKeys inserted") - return [False, None, None] - - # check if this key is already loaded as the provided role's key (we can use the same key - # to sign different metadata) - yubikeys = [] - already_loaded_keys = [] - invalid_keys = [] - for serial_num in serials: - if ( - loaded_yubikeys is not None - and serial_num in loaded_yubikeys - and role in loaded_yubikeys[serial_num] - ): - already_loaded_keys.append(serial_num) - else: - # read the public key, unless a new key needs to be generated on the yubikey - public_key = ( - get_piv_public_key_tuf(serial=serial_num) - if not creating_new_key - else None - ) - # check if this yubikey is can be used for signing the provided role's metadata - # if the key was already registered as that role's key - if not registering_new_key and role is not None and taf_repo is not None: - if not taf_repo.is_valid_metadata_yubikey(role, public_key): - invalid_keys.append(serial_num) - # print(f"The inserted YubiKey is not a valid {role} key") - continue - - if get_key_pin(serial_num) is None: - if creating_new_key: - pin = get_pin_for(key_name, pin_confirm, pin_repeat) - else: - pin = get_and_validate_pin( - key_name, pin_confirm, pin_repeat, serial_num - ) - add_key_pin(serial_num, pin) - - if get_key_public_key(serial_num) is None and public_key is not None: - add_key_public_key(serial_num, public_key) - - # when reusing the same yubikey, public key will already be in the public keys dictionary - # but the key name still needs to be added to the key id mapping dictionary - add_key_id_mapping(serial_num, key_name) - - if role is not None: - if loaded_yubikeys is None: - loaded_yubikeys = {serial_num: [role]} - else: - loaded_yubikeys.setdefault(serial_num, []).append(role) - - yubikeys.append((public_key, serial_num)) - - # TODO error messages - return yubikeys - - -@raise_yubikey_err("Cannot sign data.") -def sign_piv_rsa_pkcs1v15(data, pin, serial=None): - """Sign data with key from YubiKey's piv slot. - - Args: - - data(bytes): Data to be signed - - pin(str): Pin for piv slot login. - - pub_key_pem(str): Match Yubikey's public key (PEM) if multiple keys - are inserted - - Returns: - Signature (bytes) - - Raises: - - YubikeyError - """ - with _yk_piv_ctrl(serial=serial) as (ctrl, _): - ctrl.verify_pin(pin) - return ctrl.sign( - SLOT.SIGNATURE, KEY_TYPE.RSA2048, data, hashes.SHA256(), padding.PKCS1v15() - ) - - -@raise_yubikey_err("Cannot setup Yubikey.") -def setup( - pin, - cert_cn, - cert_exp_days=365, - pin_retries=10, - private_key_pem=None, - mgm_key=generate_random_management_key(MANAGEMENT_KEY_TYPE.TDES), - key_size=2048, -): - """Use to setup inserted Yubikey, with following steps (order is important): - - reset to factory settings - - set management key - - generate key(RSA2048) or import given one - - generate and import self-signed certificate(X509) - - set pin retries - - set pin - - set puk(same as pin) - - Args: - - cert_cn(str): x509 common name - - cert_exp_days(int): x509 expiration (in days from now) - - pin_retries(int): Number of retries for PIN - - private_key_pem(str): Private key in PEM format. If given, it will be - imported to Yubikey. - - mgm_key(bytes): New management key - - Returns: - PIV public key in PEM format (bytes) - - Raises: - - YubikeyError - """ - - with _yk_piv_ctrl() as (ctrl, _): - # Factory reset and set PINs - ctrl.reset() - - ctrl.authenticate(MANAGEMENT_KEY_TYPE.TDES, DEFAULT_MANAGEMENT_KEY) - ctrl.set_management_key(MANAGEMENT_KEY_TYPE.TDES, mgm_key) - - # Generate RSA2048 - if private_key_pem is None: - private_key = rsa.generate_private_key(65537, key_size, default_backend()) - pub_key = private_key.public_key() - else: - try: - private_key = load_pem_private_key( - private_key_pem, None, default_backend() - ) - except TypeError: - pem_pwd = getpass("Enter pem file password:\n") - if pem_pwd: - pem_pwd = pem_pwd.encode() - private_key = load_pem_private_key( - private_key_pem, pem_pwd, default_backend() - ) - - ctrl.put_key(SLOT.SIGNATURE, private_key, PIN_POLICY.ALWAYS) - pub_key = private_key.public_key() - ctrl.authenticate(MANAGEMENT_KEY_TYPE.TDES, mgm_key) - ctrl.verify_pin(DEFAULT_PIN) - - now = datetime.datetime.now() - valid_to = now + datetime.timedelta(days=cert_exp_days) - - name = x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, cert_cn)]) - # Generate and import certificate - cert = ( - x509.CertificateBuilder() - .subject_name(name) - .issuer_name(name) - .public_key(pub_key) - .serial_number(x509.random_serial_number()) - .not_valid_before(now) - .not_valid_after(valid_to) - .sign(private_key, hashes.SHA256(), default_backend()) - ) - - ctrl.put_certificate(SLOT.SIGNATURE, cert) - - ctrl.set_pin_attempts(pin_attempts=pin_retries, puk_attempts=pin_retries) - ctrl.change_pin(DEFAULT_PIN, pin) - ctrl.change_puk(DEFAULT_PUK, pin) - - return pub_key.public_bytes( - serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo - ) - - -def setup_new_yubikey( - serial_num, scheme=DEFAULT_RSA_SIGNATURE_SCHEME, key_size=2048 -) -> SSlibKey: - pin = get_key_pin(serial_num) - cert_cn = input("Enter key holder's name: ") - print("Generating key, please wait...") - pub_key_pem = setup( - pin, cert_cn, cert_exp_days=EXPIRATION_INTERVAL, key_size=key_size - ).decode("utf-8") - scheme = DEFAULT_RSA_SIGNATURE_SCHEME - key = get_sslib_key_from_value(pub_key_pem, scheme) - return key - - -def get_and_validate_pin(key_name, pin_confirm=True, pin_repeat=True, serial=None): - valid_pin = False - while not valid_pin: - pin = get_pin_for(key_name, pin_confirm, pin_repeat) - valid_pin, retries = is_valid_pin(pin, serial) - if not valid_pin and not retries: - raise InvalidPINError("No retries left. YubiKey locked.") - if not valid_pin: - if not click.confirm( - f"Incorrect PIN. Do you want to try again? {retries} retires left." - ): - raise InvalidPINError("PIN input cancelled") - return pin - - -def yubikey_prompt( - key_name, - role=None, - taf_repo=None, - registering_new_key=False, - creating_new_key=False, - loaded_yubikeys=None, - pin_confirm=True, - pin_repeat=True, - prompt_message=None, - retry_on_failure=True, - hide_already_loaded_message=False, - require_single_yubikey=True, -): - if require_single_yubikey: - while True: - serials = get_serial_num() - if len(serials) == 1: - break - else: - prompt_message = "Please insert only one YubiKey and press ENTER" - getpass(prompt_message) - - retry_counter = 0 - while True: - yubikeys = _read_and_check_yubikeys( - key_name, - role, - taf_repo, - registering_new_key, - creating_new_key, - loaded_yubikeys, - pin_confirm, - pin_repeat, - prompt_message, - retrying=retry_counter > 0, - ) - if not yubikeys and not retry_on_failure: - return [(None, None)] - if yubikeys: - return yubikeys - retry_counter += 1 - - -def yk_secrets_handler(prompt, serial_num): - if prompt == "pin": - return get_key_pin(serial_num) - raise YubikeyError(f"Invalid prompt {prompt}") diff --git a/taf/yubikey/pin_manager.py b/taf/yubikey/pin_manager.py deleted file mode 100644 index 21b6822a..00000000 --- a/taf/yubikey/pin_manager.py +++ /dev/null @@ -1,36 +0,0 @@ -import contextlib - - -class PinManager(): - - def __init__(self): - self._pins = {} - - def set_pin(self, serial_number, pin): - self._pins[serial_number] = pin - - def get_pin(self, serial_number): - return self._pins.get(serial_number) - - def clear_pins(self): - for key in list(self._pins.keys()): - self._pins[key] = None - self._pins.clear() - - - -@contextlib.contextmanager -def manage_pins(): - pin_manager = PinManager() - try: - yield pin_manager - finally: - pin_manager.clear_pins() - - -def pin_managed(func): - def wrapper(*args, **kwargs): - with manage_pins() as pin_manager: - kwargs['pin_manager'] = pin_manager - return func(*args, **kwargs) - return wrapper diff --git a/taf/yubikey/yubikey.py b/taf/yubikey/yubikey.py index 9ace13cd..9b2a1d4d 100644 --- a/taf/yubikey/yubikey.py +++ b/taf/yubikey/yubikey.py @@ -41,39 +41,6 @@ DEFAULT_PUK = "12345678" EXPIRATION_INTERVAL = 36500 -_yks_data_dict: Dict = defaultdict(dict) - - -def add_key_id_mapping(serial_num: str, keyid: str) -> None: - if "ids" not in _yks_data_dict: - _yks_data_dict["ids"] = defaultdict(dict) - if keyid not in _yks_data_dict["ids"]: - _yks_data_dict["ids"][keyid] = serial_num - - -def add_key_pin(serial_num: str, pin: str) -> None: - _yks_data_dict[serial_num]["pin"] = pin - - -def add_key_public_key(serial_num: str, public_key: Dict) -> None: - _yks_data_dict[serial_num]["public_key"] = public_key - - -def get_key_pin(serial_num: int) -> Optional[str]: - if serial_num in _yks_data_dict: - return _yks_data_dict.get(serial_num, {}).get("pin") - return None - - -def get_key_serial_by_id(keyid: str) -> Optional[str]: - return _yks_data_dict.get("ids", {}).get(keyid) - - -def get_key_public_key(serial_num: str) -> Optional[Dict]: - if serial_num in _yks_data_dict: - return _yks_data_dict.get(serial_num, {}).get("public_key") - return None - def raise_yubikey_err(msg: Optional[str] = None) -> Callable: """Decorator used to catch all errors raised by yubikey-manager and raise @@ -301,7 +268,6 @@ def _read_and_check_yubikeys( taf_repo, registering_new_key, creating_new_key, - loaded_yubikeys, pin_confirm, pin_repeat, prompt_message, @@ -322,16 +288,9 @@ def _read_and_check_yubikeys( # check if this key is already loaded as the provided role's key (we can use the same key # to sign different metadata) yubikeys = [] - already_loaded_keys = [] invalid_keys = [] for serial_num in serials: - if ( - loaded_yubikeys is not None - and serial_num in loaded_yubikeys - and role in loaded_yubikeys[serial_num] - ): - already_loaded_keys.append(serial_num) - else: + if not taf_repo.pin_manager.is_loaded(serial_num): # read the public key, unless a new key needs to be generated on the yubikey public_key = ( get_piv_public_key_tuf(serial=serial_num) @@ -346,27 +305,23 @@ def _read_and_check_yubikeys( # print(f"The inserted YubiKey is not a valid {role} key") continue - if get_key_pin(serial_num) is None: - if creating_new_key: - pin = get_pin_for(key_name, pin_confirm, pin_repeat) - else: - pin = get_and_validate_pin( - key_name, pin_confirm, pin_repeat, serial_num - ) - add_key_pin(serial_num, pin) - - if get_key_public_key(serial_num) is None and public_key is not None: - add_key_public_key(serial_num, public_key) + if creating_new_key: + pin = get_pin_for(key_name, pin_confirm, pin_repeat) + else: + pin = get_and_validate_pin( + key_name, pin_confirm, pin_repeat, serial_num + ) + taf_repo.pin_manager.add_pin(serial_num, pin) # when reusing the same yubikey, public key will already be in the public keys dictionary # but the key name still needs to be added to the key id mapping dictionary - add_key_id_mapping(serial_num, key_name) + taf_repo.yubikey_store.add_key_data(key_name, serial_num, public_key) - if role is not None: - if loaded_yubikeys is None: - loaded_yubikeys = {serial_num: [role]} - else: - loaded_yubikeys.setdefault(serial_num, []).append(role) + # if role is not None: + # if loaded_yubikeys is None: + # loaded_yubikeys = {serial_num: [role]} + # else: + # loaded_yubikeys.setdefault(serial_num, []).append(role) yubikeys.append((public_key, serial_num)) @@ -522,7 +477,6 @@ def yubikey_prompt( taf_repo=None, registering_new_key=False, creating_new_key=False, - loaded_yubikeys=None, pin_confirm=True, pin_repeat=True, prompt_message=None, @@ -547,7 +501,6 @@ def yubikey_prompt( taf_repo, registering_new_key, creating_new_key, - loaded_yubikeys, pin_confirm, pin_repeat, prompt_message, @@ -560,7 +513,7 @@ def yubikey_prompt( retry_counter += 1 -def yk_secrets_handler(prompt, serial_num): +def yk_secrets_handler(prompt, pin_manager, serial_num): if prompt == "pin": - return get_key_pin(serial_num) + return pin_manager.get_pin(serial_num) raise YubikeyError(f"Invalid prompt {prompt}") diff --git a/taf/yubikey/yubikey_manager.py b/taf/yubikey/yubikey_manager.py new file mode 100644 index 00000000..ec086735 --- /dev/null +++ b/taf/yubikey/yubikey_manager.py @@ -0,0 +1,71 @@ +from collections import defaultdict +import contextlib +from typing import Any, Dict, Optional, Tuple +from taf.tuf.keys import SSlibKey + + +class YubiKeyStore: + def __init__(self): + # Initializes the dictionary to store YubiKey data + self._yubikeys_data = defaultdict(dict) + + def is_key_name_loaded(self, key_name: str) -> bool: + """Check if the key name is already loaded.""" + return key_name in self._yubikeys_data + + def add_key_data(self, key_name: str, serial_num: str, public_key: SSlibKey) -> None: + """Add data associated with a YubiKey.""" + if not self.is_key_name_loaded(key_name): + self._yubikeys_data[key_name] = { + "serial": serial_num, + "public_key": public_key + } + + def get_key_data(self, key_name: str) -> Tuple[str, SSlibKey]: + """Retrieve data associated with a given YubiKey name.""" + key_data = self._yubikeys_data.get(key_name) + return key_data["public_key"], key_data["serial"] + + def remove_key_data(self, key_name: str) -> bool: + """Remove data associated with a given YubiKey name if it exists.""" + if self.is_key_name_loaded(key_name): + del self._yubikeys_data[key_name] + return True + return False + + +class PinManager(): + + def __init__(self): + self._pins = {} + + def add_pin(self, serial_number, pin): + self._pins[serial_number] = pin + + def clear_pins(self): + for key in list(self._pins.keys()): + self._pins[key] = None + self._pins.clear() + + def get_pin(self, serial_number): + return self._pins.get(serial_number) + + def is_loaded(self, serial_number): + return self.get_pin(serial_number) is not None + + +@contextlib.contextmanager +def manage_pins(): + pin_manager = PinManager() + try: + yield pin_manager + finally: + pin_manager.clear_pins() + + +def pin_managed(func): + def wrapper(*args, **kwargs): + with manage_pins() as pin_manager: + kwargs['pin_manager'] = pin_manager + return func(*args, **kwargs) + return wrapper