diff --git a/taf/keys.py b/taf/keys.py index 69e93e57..2cf97286 100644 --- a/taf/keys.py +++ b/taf/keys.py @@ -143,7 +143,6 @@ def _sort_roles(roles): verification_keys: Dict = {} keys_name_mappings: Dict = {} - for role in keystore_roles: if role.name in existing_roles: continue @@ -193,6 +192,38 @@ def _load_signer_from_keystore( return None +def _load_and_append_yubikeys( + taf_repo, + key_name, + role, + retry_on_failure, + hide_already_loaded_message, + loaded_yubikeys, + signers_yubikeys, +): + inserted_yubikeys = yk.yubikey_prompt( + key_name, + role, + taf_repo, + loaded_yubikeys=loaded_yubikeys, + retry_on_failure=retry_on_failure, + hide_already_loaded_message=hide_already_loaded_message, + ) + + num_of_loaded_keys = 0 + for public_key, serial_num in inserted_yubikeys: + if public_key is not None and public_key.keyid not in signer.public_key.keyid: + signer = YkSigner( + public_key, + serial_num, + partial(yk.yk_secrets_handler, serial_num=serial_num), + ) + signers_yubikeys.append(signer) + num_of_loaded_keys += 1 + taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey") + return num_of_loaded_keys + + @log_on_start(INFO, "Loading signing keys of '{role:s}'", logger=taf_logger) def load_signers( taf_repo: TUFRepository, @@ -213,7 +244,6 @@ def load_signers( num_of_signatures = 0 signers_keystore = [] signers_yubikeys = [] - yubikeys = [] # first try to sign using yubikey # if that is not possible, try to load key from a keystore file @@ -229,32 +259,6 @@ def load_signers( keystore_path = Path(keystore).expanduser().resolve() if keystore else None - def _load_and_append_yubikeys( - key_name, role, retry_on_failure, hide_already_loaded_message - ): - inserted_yubikeys = yk.yubikey_prompt( - key_name, - role, - taf_repo, - loaded_yubikeys=loaded_yubikeys, - retry_on_failure=retry_on_failure, - hide_already_loaded_message=hide_already_loaded_message, - ) - - num_of_loaded_keys = 0 - for public_key, serial_num in inserted_yubikeys: - if public_key is not None and public_key.keyid not in yubikeys: - signer = YkSigner( - public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num) - ) - signers_yubikeys.append(signer) - yubikeys.append(public_key.keyid) - num_of_loaded_keys += 1 - taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey") - return num_of_loaded_keys - - - keystore_files = [] if keystore is not None: keystore_files = get_keystore_keys_of_role(keystore, role) @@ -285,7 +289,9 @@ def _load_and_append_yubikeys( # try to load from the inserted YubiKey, without asking the user to insert it key_name = get_key_name(role, num_of_signatures, signing_keys_num) - num_of_loaded_keys = _load_and_append_yubikeys(key_name, role, False, True) + num_of_loaded_keys = _load_and_append_yubikeys( + key_name, role, False, True, loaded_yubikeys, signers_yubikeys + ) if num_of_loaded_keys: num_of_signatures += num_of_loaded_keys @@ -297,7 +303,9 @@ def _load_and_append_yubikeys( prompt_for_yubikey = False if use_yubikey_for_signing_confirmed: - if _load_and_append_yubikeys(key_name, role, True, False): + if _load_and_append_yubikeys( + key_name, role, True, False, loaded_yubikeys, signers_yubikeys + ): num_of_signatures += 1 continue @@ -403,12 +411,14 @@ def _setup_yubikey_roles_keys( ) loaded_keys_num += 1 signer = YkSigner( - public_key, serial_num, partial(yk.yk_secrets_handler, serial_num=serial_num) + public_key, + serial_num, + partial(yk.yk_secrets_handler, serial_num=serial_num), ) signers.append(signer) keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name - if loaded_keys_num < role.number: #role.threshold: + if loaded_keys_num < role.number: # role.threshold: print(f"Threshold of role {role.name} is {role.threshold}") while loaded_keys_num < role.threshold: loaded_keys = [] diff --git a/taf/tools/yubikey/__init__.py b/taf/tools/yubikey/__init__.py index bf10203c..68519426 100644 --- a/taf/tools/yubikey/__init__.py +++ b/taf/tools/yubikey/__init__.py @@ -123,4 +123,3 @@ def attach_to_group(group): group.add_command(list_key_command(), name='list-key') group.add_command(setup_signing_key_command(), name='setup-signing-key') group.add_command(setup_test_key_command(), name='setup-test-key') - diff --git a/taf/tuf/keys.py b/taf/tuf/keys.py index 995bcd0c..57f5ddc4 100644 --- a/taf/tuf/keys.py +++ b/taf/tuf/keys.py @@ -185,7 +185,9 @@ class YkSigner(Signer): _SECRET_PROMPT = "pin" - def __init__(self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler): + def __init__( + self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler + ): self._public_key = public_key self._pin_handler = pin_handler diff --git a/taf/tuf/repository.py b/taf/tuf/repository.py index 681e56fb..bc19eb3a 100644 --- a/taf/tuf/repository.py +++ b/taf/tuf/repository.py @@ -504,7 +504,7 @@ def create( roles_keys_data: RolesKeysData, signers: dict, additional_verification_keys: Optional[dict] = None, - key_name_mappings: Optional[Dict[str, str]] = None + key_name_mappings: Optional[Dict[str, str]] = None, ) -> None: """Create a new metadata repository on disk. @@ -602,9 +602,11 @@ def create( signed.version = 0 # `close` will bump to initial valid verison 1 self.close(name, Metadata(signed)) - def create_delegated_roles( - self, roles_data: List[TargetsRole], signers: Dict[str, List[CryptoSigner]], key_name_mappings: Optional[Dict[str, str]]=None + self, + roles_data: List[TargetsRole], + signers: Dict[str, List[CryptoSigner]], + key_name_mappings: Optional[Dict[str, str]] = None, ) -> Tuple[List, List]: """ Create a new delegated roles, signes them using the provided signers and diff --git a/taf/yubikey.py b/taf/yubikey.py index 168e6ec7..b9266527 100644 --- a/taf/yubikey.py +++ b/taf/yubikey.py @@ -120,14 +120,16 @@ def _yk_piv_ctrl(serial=None): try: session = PivSession(connection) sessions.append((session, info.serial)) - devices_info.append((connection, session)) # Store to manage cleanup + devices_info.append( + (connection, session) + ) # Store to manage cleanup if serial is not None: break except Exception as e: connection.close() # Ensure we close connection on error raise e if serial is not None: - session, serial = sessions[0] + session, serial = sessions[0] yield session, serial else: yield sessions @@ -137,7 +139,6 @@ def _yk_piv_ctrl(serial=None): connection.close() - def is_inserted(): """Checks if YubiKey is inserted. @@ -244,7 +245,7 @@ def export_piv_pub_key(pub_key_format=serialization.Encoding.PEM, serial=None): @raise_yubikey_err("Cannot export yk certificate.") -def export_yk_certificate(certs_dir, key: SSlibKey, serial: str) : +def export_yk_certificate(certs_dir, key: SSlibKey, serial: str): if certs_dir is None: certs_dir = Path.home() else: @@ -292,6 +293,87 @@ def list_connected_yubikeys(): print(f" Form Factor: {info.form_factor}") +# TODO +# need to pass in multiple key names +def _read_and_check_yubikeys( + key_name, + role, + taf_repo, + registering_new_key, + creating_new_key, + loaded_yubikeys, + pin_confirm, + pin_repeat, + prompt_message, + retrying, +): + + if retrying: + if prompt_message is None: + prompt_message = f"Please insert {key_name} YubiKey and press ENTER" + getpass(prompt_message) + # make sure that YubiKey is inserted + try: + serials = get_serial_num() + except Exception: + taf_logger.log("NOTICE", "No YubiKeys inserted") + return [False, None, None] + + # check if this key is already loaded as the provided role's key (we can use the same key + # to sign different metadata) + yubikeys = [] + already_loaded_keys = [] + invalid_keys = [] + for serial_num in serials: + if ( + loaded_yubikeys is not None + and serial_num in loaded_yubikeys + and role in loaded_yubikeys[serial_num] + ): + already_loaded_keys.append(serial_num) + else: + # read the public key, unless a new key needs to be generated on the yubikey + public_key = ( + get_piv_public_key_tuf(serial=serial_num) + if not creating_new_key + else None + ) + # check if this yubikey is can be used for signing the provided role's metadata + # if the key was already registered as that role's key + if not registering_new_key and role is not None and taf_repo is not None: + if not taf_repo.is_valid_metadata_yubikey(role, public_key): + invalid_keys.append(serial_num) + # print(f"The inserted YubiKey is not a valid {role} key") + continue + + if get_key_pin(serial_num) is None: + if creating_new_key: + pin = get_pin_for(key_name, pin_confirm, pin_repeat) + else: + pin = get_and_validate_pin( + key_name, pin_confirm, pin_repeat, serial_num + ) + add_key_pin(serial_num, pin) + + if get_key_public_key(serial_num) is None and public_key is not None: + add_key_public_key(serial_num, public_key) + + # when reusing the same yubikey, public key will already be in the public keys dictionary + # but the key name still needs to be added to the key id mapping dictionary + add_key_id_mapping(serial_num, key_name) + + if role is not None: + if loaded_yubikeys is None: + loaded_yubikeys = {serial_num: [role]} + else: + loaded_yubikeys.setdefault(serial_num, []).append(role) + + yubikeys.append((public_key, serial_num)) + + # TODO error messages + return yubikeys + + @raise_yubikey_err("Cannot sign data.") def sign_piv_rsa_pkcs1v15(data, pin, serial=None): """Sign data with key from YubiKey's piv slot. @@ -454,84 +536,9 @@ def yubikey_prompt( if len(serials) == 1: break else: - prompt_message = f"Please insert only one YubiKey and press ENTER" + prompt_message = "Please insert only one YubiKey and press ENTER" getpass(prompt_message) - - # TODO - # need to pass in multiple key names - def _read_and_check_yubikeys( - key_name, - role, - taf_repo, - registering_new_key, - creating_new_key, - loaded_yubikeys, - pin_confirm, - pin_repeat, - prompt_message, - retrying, - ): - - if retrying: - if prompt_message is None: - prompt_message = f"Please insert {key_name} YubiKey and press ENTER" - getpass(prompt_message) - # make sure that YubiKey is inserted - try: - serials = get_serial_num() - except Exception: - taf_logger.log("NOTICE", "No YubiKeys inserted") - return [False, None, None] - - # check if this key is already loaded as the provided role's key (we can use the same key - # to sign different metadata) - yubikeys = [] - already_loaded_keys = [] - invalid_keys = [] - for serial_num in serials: - if ( - loaded_yubikeys is not None - and serial_num in loaded_yubikeys - and role in loaded_yubikeys[serial_num] - ): - already_loaded_keys.append(serial_num) - else: - # read the public key, unless a new key needs to be generated on the yubikey - public_key = get_piv_public_key_tuf(serial=serial_num) if not creating_new_key else None - # check if this yubikey is can be used for signing the provided role's metadata - # if the key was already registered as that role's key - if not registering_new_key and role is not None and taf_repo is not None: - if not taf_repo.is_valid_metadata_yubikey(role, public_key): - invalid_keys.append(serial_num) - # print(f"The inserted YubiKey is not a valid {role} key") - continue - - if get_key_pin(serial_num) is None: - if creating_new_key: - pin = get_pin_for(key_name, pin_confirm, pin_repeat) - else: - pin = get_and_validate_pin(key_name, pin_confirm, pin_repeat, serial_num) - add_key_pin(serial_num, pin) - - if get_key_public_key(serial_num) is None and public_key is not None: - add_key_public_key(serial_num, public_key) - - # when reusing the same yubikey, public key will already be in the public keys dictionary - # but the key name still needs to be added to the key id mapping dictionary - add_key_id_mapping(serial_num, key_name) - - if role is not None: - if loaded_yubikeys is None: - loaded_yubikeys = {serial_num: [role]} - else: - loaded_yubikeys.setdefault(serial_num, []).append(role) - - yubikeys.append((public_key, serial_num)) - - #TODO error messages - return yubikeys - retry_counter = 0 while True: