From fcc9a277a924e120e4b62c2db8a1c16664d10136 Mon Sep 17 00:00:00 2001 From: Renata Date: Mon, 20 Jan 2025 05:21:37 -0500 Subject: [PATCH] feat: work on loading multiple keys when signing and using key names fegined in metadata --- taf/api/yubikey.py | 2 +- taf/keys.py | 30 ++++--- taf/tuf/repository.py | 23 +++++- taf/yubikey/yubikey.py | 138 ++++++++++++++++++++++++--------- taf/yubikey/yubikey_manager.py | 5 ++ 5 files changed, 148 insertions(+), 50 deletions(-) diff --git a/taf/api/yubikey.py b/taf/api/yubikey.py index 8d3e02aa..0f5090b5 100644 --- a/taf/api/yubikey.py +++ b/taf/api/yubikey.py @@ -150,7 +150,7 @@ def setup_signing_yubikey( ): return _, serial_num = yk.yubikey_prompt( - "new Yubikey", + ["new Yubikey"], creating_new_key=True, pin_confirm=True, pin_repeat=True, diff --git a/taf/keys.py b/taf/keys.py index badea92a..8027a012 100644 --- a/taf/keys.py +++ b/taf/keys.py @@ -194,18 +194,25 @@ def _load_signer_from_keystore( def _load_and_append_yubikeys( taf_repo, - key_name, role, retry_on_failure, hide_already_loaded_message, loaded_yubikeys, signers_yubikeys, + threshold, + initial_num_of_signatures, + signing_keys_num, ): + + key_names = [count + 1 for count in range(initial_num_of_signatures, signing_keys_num)] + + prompt_message = f"Please insert {role} YubiKey(s) and press ENTER.\nThreshold is {threshold}" inserted_yubikeys = yk.yubikey_prompt( - key_name, - role, - taf_repo, + key_name=key_names, + role=role, + taf_repo=taf_repo, loaded_yubikeys=loaded_yubikeys, + prompt_message=prompt_message, retry_on_failure=retry_on_failure, hide_already_loaded_message=hide_already_loaded_message, ) @@ -214,6 +221,7 @@ def _load_and_append_yubikeys( num_of_loaded_keys = 0 for public_key, serial_num in inserted_yubikeys: if public_key is not None and public_key.keyid not in loaded_keyids: + key_name = taf_repo.yubikey_store.get_name_by_serial(serial_num) signer = YkSigner( public_key, serial_num, @@ -228,6 +236,7 @@ def _load_and_append_yubikeys( loaded_keyids.append(public_key.keyid) num_of_loaded_keys += 1 taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey") + return num_of_loaded_keys @@ -294,10 +303,12 @@ def load_signers( # loading from keystore files, couldn't load from all of them, but loaded enough break - # try to load from the inserted YubiKey, without asking the user to insert it - key_name = get_key_name(role, num_of_signatures, signing_keys_num) + # try to load from the inserted YubiKeys, without asking the user to insert it + # in case of yubikeys, instead of asking for a particular key, ask to insert all + # that can be used to sign the current role, but either read the name from the + # metadata, or assign a role + counter name num_of_loaded_keys = _load_and_append_yubikeys( - taf_repo, key_name, role, False, True, loaded_yubikeys, signers_yubikeys + taf_repo, role, False, True, loaded_yubikeys, signers_yubikeys, threshold, num_of_signatures, signing_keys_num ) if num_of_loaded_keys: @@ -571,14 +582,13 @@ def _setup_yubikey( raise YubikeyError("Yubikey setup canceled") continue yubikeys = yk.yubikey_prompt( - key_name, + [key_name], role_name, taf_repo=auth_repo, registering_new_key=True, creating_new_key=not use_existing, pin_confirm=True, pin_repeat=True, - require_single_yubikey=True, ) if yubikeys is not None: key, serial_num = yubikeys[0] @@ -605,7 +615,7 @@ def _load_and_verify_yubikey( return None while True: yk_public_key, _ = yk.yubikey_prompt( - key_name, + [key_name], role_name, taf_repo=taf_repo, registering_new_key=True, diff --git a/taf/tuf/repository.py b/taf/tuf/repository.py index e898e3fd..5a695a1f 100644 --- a/taf/tuf/repository.py +++ b/taf/tuf/repository.py @@ -145,7 +145,13 @@ def __init__(self, path: Union[Path, str], *args, **kwargs) -> None: self._metadata_to_keep_open: Set[str] = set() self.pin_manager = pin_manager self.yubikey_store = YubiKeyStore() - self.keys_name_mappings: Dict[str, str] = {} + self._keys_name_mappings: Optional[Dict[str, str]] = None + + @property + def keys_name_mappings(self): + if self._keys_name_mappings is None: + self._keys_name_mappings = self.load_key_names() + return self._keys_name_mappings @property def metadata_path(self) -> Path: @@ -524,7 +530,7 @@ def create( key_name_mappings: A dictionary whose keys are key ids and values are custom names of those keys """ if keys_name_mappings: - self.keys_name_mappings = keys_name_mappings + self._keys_name_mappings = keys_name_mappings self.metadata_path.mkdir(parents=True) self.signer_cache = defaultdict(dict) @@ -1368,6 +1374,19 @@ def modify_targets( ) return targets_role + + def load_key_names(self): + # TODO target roles need to be handled too + root_metadata = self.signed_obj("root") + name_mapping = {} + keys = root_metadata.keys + for key_id, key_obj in keys.items(): + name_data = key_obj.unrecognized_fields + if name_data is not None and "name" in name_data: + name_mapping[key_id] = name_data["name"] + return name_mapping + + def _modify_targets_role( self, added_target_files: List[TargetFile], diff --git a/taf/yubikey/yubikey.py b/taf/yubikey/yubikey.py index 1ab8d656..9bc0783f 100644 --- a/taf/yubikey/yubikey.py +++ b/taf/yubikey/yubikey.py @@ -275,19 +275,19 @@ def list_connected_yubikeys(): print(f" Form Factor: {info.form_factor}") -def _read_and_check_yubikeys( - key_name, +def _read_and_check_single_yubikey( role, + key_name, taf_repo, registering_new_key, creating_new_key, pin_confirm, pin_repeat, prompt_message, - require_single_yubikey, retrying, ): + if retrying: if prompt_message is None: prompt_message = f"Please insert {key_name} YubiKey and press ENTER" @@ -296,52 +296,105 @@ def _read_and_check_yubikeys( # make sure that YubiKey is inserted try: serials = get_serial_num() - if require_single_yubikey: - not_loaded = [ - serial - for serial in serials - if not taf_repo.yubikey_store.is_loaded(serial) - ] - if len(not_loaded) > 1: - print("\nPlease insert only one YubiKey\n") - return None + not_loaded = [ + serial + for serial in serials + if not taf_repo.yubikey_store.is_loaded(serial) + ] + if len(not_loaded) > 1: + print("\nPlease insert only one not previously inserted YubiKey\n") + return None + # no need to try loading keys that we know were previously loaded + serials = not_loaded except Exception: taf_logger.log("NOTICE", "No YubiKeys inserted") return None + serial_num = serials[0] + # check if this key is already loaded as the provided role's key (we can use the same key + # to sign different metadata) + yubikeys = [] + # read the public key, unless a new key needs to be generated on the yubikey + public_key = ( + get_piv_public_key_tuf(serial=serial_num) + if not creating_new_key + else None + ) + # check if this yubikey is can be used for signing the provided role's metadata + # if the key was already registered as that role's key + if not registering_new_key and role is not None and taf_repo is not None: + if not taf_repo.is_valid_metadata_yubikey(role, public_key): + return None + + if taf_repo.pin_manager.get_pin(serial_num) is None: + if creating_new_key: + pin = get_pin_for(key_name, pin_confirm, pin_repeat) + else: + pin = get_and_validate_pin( + key_name, pin_confirm, pin_repeat, serial_num + ) + taf_repo.pin_manager.add_pin(serial_num, pin) + + # when reusing the same yubikey, public key will already be in the public keys dictionary + # but the key name still needs to be added to the key id mapping dictionary + taf_repo.yubikey_store.add_key_data(key_name, serial_num, public_key) + yubikeys.append((public_key, serial_num)) + + return yubikeys + +def _read_and_check_yubikeys( + role, + taf_repo, + pin_confirm, + pin_repeat, + prompt_message, + key_names, + retrying, +): + + if retrying: + if prompt_message is None: + prompt_message = f"Please insert {role} YubiKey(s) and press ENTER" + getpass(prompt_message) + + # make sure that YubiKey is inserted + try: + serials = get_serial_num() + except Exception: + taf_logger.log("NOTICE", "No YubiKeys inserted") + return None + # check if this key is already loaded as the provided role's key (we can use the same key # to sign different metadata) yubikeys = [] invalid_keys = [] - for serial_num in serials: + for index, serial_num in enumerate(serials): if not taf_repo.yubikey_store.is_loaded(serial_num): # read the public key, unless a new key needs to be generated on the yubikey - public_key = ( - get_piv_public_key_tuf(serial=serial_num) - if not creating_new_key - else None - ) + public_key = get_piv_public_key_tuf(serial=serial_num) # check if this yubikey is can be used for signing the provided role's metadata # if the key was already registered as that role's key - if not registering_new_key and role is not None and taf_repo is not None: + if role is not None and taf_repo is not None: if not taf_repo.is_valid_metadata_yubikey(role, public_key): invalid_keys.append(serial_num) # print(f"The inserted YubiKey is not a valid {role} key") continue - if creating_new_key: - pin = get_pin_for(key_name, pin_confirm, pin_repeat) + if taf_repo.keys_name_mappings: + key_name = taf_repo.keys_name_mappings.get(public_key.keyid) else: + key_name = key_names[index] + + if taf_repo.pin_manager.get_pin(serial_num) is None: pin = get_and_validate_pin( key_name, pin_confirm, pin_repeat, serial_num ) - taf_repo.pin_manager.add_pin(serial_num, pin) + taf_repo.pin_manager.add_pin(serial_num, pin) # when reusing the same yubikey, public key will already be in the public keys dictionary # but the key name still needs to be added to the key id mapping dictionary taf_repo.yubikey_store.add_key_data(key_name, serial_num, public_key) - yubikeys.append((public_key, serial_num)) return yubikeys @@ -492,7 +545,7 @@ def _check_if_yk_inserted(): def yubikey_prompt( - key_name, + key_names, role=None, taf_repo=None, registering_new_key=False, @@ -502,23 +555,34 @@ def yubikey_prompt( prompt_message=None, retry_on_failure=True, hide_already_loaded_message=False, - require_single_yubikey=True, ): retry_counter = 0 while True: - yubikeys = _read_and_check_yubikeys( - key_name, - role, - taf_repo, - registering_new_key, - creating_new_key, - pin_confirm, - pin_repeat, - prompt_message, - require_single_yubikey=require_single_yubikey, - retrying=retry_counter > 0, - ) + retrying=retry_counter > 0 + if registering_new_key or creating_new_key: + yubikeys = _read_and_check_single_yubikey( + role, + key_names[0], + taf_repo, + registering_new_key, + creating_new_key, + pin_confirm, + pin_repeat, + prompt_message, + retrying, + ) + else: + yubikeys = _read_and_check_yubikeys( + role, + taf_repo, + pin_confirm, + pin_repeat, + prompt_message, + key_names, + retrying, + ) + if not yubikeys and not retry_on_failure: return [(None, None)] if yubikeys: diff --git a/taf/yubikey/yubikey_manager.py b/taf/yubikey/yubikey_manager.py index ca560e80..3a446a51 100644 --- a/taf/yubikey/yubikey_manager.py +++ b/taf/yubikey/yubikey_manager.py @@ -33,6 +33,11 @@ def get_key_data(self, key_name: str) -> Tuple[str, SSlibKey]: key_data = self._yubikeys_data.get(key_name) return key_data["public_key"], key_data["serial"] + def get_name_by_serial(self, serial_num: str) -> str: + for key_name, data in self._yubikeys_data.items(): + if data["serial"] == serial_num: + return key_name + def remove_key_data(self, key_name: str) -> bool: """Remove data associated with a given YubiKey name if it exists.""" if self.is_key_name_loaded(key_name):