From 0c5363a5b96eb53f65412ccf7be84f3955ab1d4e Mon Sep 17 00:00:00 2001 From: Renata Date: Fri, 17 Jan 2025 19:07:07 -0500 Subject: [PATCH] feat, refact: check if yk inserted before signing, move check if loaded to yubikey store --- taf/api/repository.py | 5 ++-- taf/keys.py | 6 +++-- taf/tuf/keys.py | 11 ++++++--- taf/tuf/repository.py | 11 +++++---- taf/yubikey/yubikey.py | 42 ++++++++++++++++++++++------------ taf/yubikey/yubikey_manager.py | 5 ++-- 6 files changed, 51 insertions(+), 29 deletions(-) diff --git a/taf/api/repository.py b/taf/api/repository.py index e8b33b81..ac548357 100644 --- a/taf/api/repository.py +++ b/taf/api/repository.py @@ -76,7 +76,7 @@ def create_repository( roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData) auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager) - signers, verification_keys, key_name_mappings = load_sorted_keys_of_new_roles( + signers, verification_keys, keys_name_mappings = load_sorted_keys_of_new_roles( roles=roles_keys_data.roles, auth_repo=auth_repo, yubikeys_data=roles_keys_data.yubikeys, @@ -87,8 +87,7 @@ def create_repository( if signers is None: return - repository = AuthenticationRepository(path=path) - repository.create(roles_keys_data, signers, verification_keys, key_name_mappings) + auth_repo.create(roles_keys_data, signers, verification_keys, keys_name_mappings) if commit: auth_repo.init_repo() commit_msg = git_commit_message("create-repo") diff --git a/taf/keys.py b/taf/keys.py index 29b0368a..2fc515f4 100644 --- a/taf/keys.py +++ b/taf/keys.py @@ -154,7 +154,6 @@ def _sort_roles(roles): for signer in keystore_signers: signers.setdefault(role.name, []).append(signer) - # TODO add to repository... keys_name_mappings.update(key_name_mapping) for role in yubikey_roles: @@ -220,6 +219,7 @@ def _load_and_append_yubikeys( public_key, serial_num, partial(yk.yk_secrets_handler, pin_manager=taf_repo.pin_manager, serial_num=serial_num), + key_name=key_name, ) signers_yubikeys.append(signer) loaded_keyids.append(public_key.keyid) @@ -421,6 +421,7 @@ def _setup_yubikey_roles_keys( public_key, serial_num, partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num), + key_name=key_name, ) signers.append(signer) keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name @@ -443,7 +444,8 @@ def _setup_yubikey_roles_keys( loaded_keys.append(key_name) signer = YkSigner( public_key, - partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num) + partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num), + key_name=key_name ) signers.append(signer) if loaded_keys_num == role.threshold: diff --git a/taf/tuf/keys.py b/taf/tuf/keys.py index c474a45a..fab6f15d 100644 --- a/taf/tuf/keys.py +++ b/taf/tuf/keys.py @@ -186,12 +186,13 @@ class YkSigner(Signer): _SECRET_PROMPT = "pin" def __init__( - self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler + self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler, key_name: str ): self._public_key = public_key self._pin_handler = pin_handler self._serial_num = serial_num + self._key_name = key_name @property def public_key(self) -> SSlibKey: @@ -201,6 +202,10 @@ def public_key(self) -> SSlibKey: def serial_num(self) -> str: return self._serial_num + @property + def key_name(self) -> str: + return self._key_name + @classmethod def import_(cls) -> SSlibKey: """Import rsa public key from Yubikey. @@ -221,8 +226,8 @@ def import_(cls) -> SSlibKey: def sign(self, payload: bytes) -> Signature: pin = self._pin_handler(self._SECRET_PROMPT) - from taf.yubikey.yubikey import sign_piv_rsa_pkcs1v15 - + from taf.yubikey.yubikey import sign_piv_rsa_pkcs1v15, verify_yk_inserted + verify_yk_inserted(self.serial_num, self.key_name) sig = sign_piv_rsa_pkcs1v15(payload, pin, serial=self.serial_num) return Signature(self.public_key.keyid, sig.hex()) diff --git a/taf/tuf/repository.py b/taf/tuf/repository.py index 3ce7ee01..2f0f9c08 100644 --- a/taf/tuf/repository.py +++ b/taf/tuf/repository.py @@ -145,6 +145,7 @@ def __init__(self, path: Union[Path, str], *args, **kwargs) -> None: self._metadata_to_keep_open: Set[str] = set() self.pin_manager = pin_manager self.yubikey_store = YubiKeyStore() + self.keys_name_mappings: Dict[str, str] = {} @property def metadata_path(self) -> Path: @@ -504,7 +505,7 @@ def create( roles_keys_data: RolesKeysData, signers: dict, additional_verification_keys: Optional[dict] = None, - key_name_mappings: Optional[Dict[str, str]] = None, + keys_name_mappings: Optional[Dict[str, str]] = None, ) -> None: """Create a new metadata repository on disk. @@ -523,8 +524,8 @@ def create( present at the time of the repository's creation key_name_mappings: A dictionary whose keys are key ids and values are custom names of those keys """ - # TODO add verification keys - # support yubikeys + if keys_name_mappings: + self.keys_name_mappings = keys_name_mappings self.metadata_path.mkdir(parents=True) self.signer_cache = defaultdict(dict) @@ -557,8 +558,8 @@ def create( self.signer_cache[role.name][key_id] = signer for public_key in public_keys[role.name].values(): key_id = _get_legacy_keyid(public_key) - if key_id in key_name_mappings: - public_key.unrecognized_fields["name"] = key_name_mappings[key_id] + if key_id in self.keys_name_mappings: + public_key.unrecognized_fields["name"] = self.keys_name_mappings[key_id] root.add_key(public_key, role.name) root.roles[role.name].threshold = role.threshold diff --git a/taf/yubikey/yubikey.py b/taf/yubikey/yubikey.py index 9b2a1d4d..d8708153 100644 --- a/taf/yubikey/yubikey.py +++ b/taf/yubikey/yubikey.py @@ -224,6 +224,21 @@ def export_yk_certificate(certs_dir, key: SSlibKey, serial: str): f.write(export_piv_x509(serial=serial)) +def get_and_validate_pin(key_name, pin_confirm=True, pin_repeat=True, serial=None): + valid_pin = False + while not valid_pin: + pin = get_pin_for(key_name, pin_confirm, pin_repeat) + valid_pin, retries = is_valid_pin(pin, serial) + if not valid_pin and not retries: + raise InvalidPINError("No retries left. YubiKey locked.") + if not valid_pin: + if not click.confirm( + f"Incorrect PIN. Do you want to try again? {retries} retires left." + ): + raise InvalidPINError("PIN input cancelled") + return pin + + @raise_yubikey_err("Cannot get public key in TUF format.") def get_piv_public_key_tuf( scheme=DEFAULT_RSA_SIGNATURE_SCHEME, serial=None @@ -290,7 +305,7 @@ def _read_and_check_yubikeys( yubikeys = [] invalid_keys = [] for serial_num in serials: - if not taf_repo.pin_manager.is_loaded(serial_num): + if not taf_repo.yubikey_store.is_loaded(serial_num): # read the public key, unless a new key needs to be generated on the yubikey public_key = ( get_piv_public_key_tuf(serial=serial_num) @@ -456,20 +471,19 @@ def setup_new_yubikey( return key -def get_and_validate_pin(key_name, pin_confirm=True, pin_repeat=True, serial=None): - valid_pin = False - while not valid_pin: - pin = get_pin_for(key_name, pin_confirm, pin_repeat) - valid_pin, retries = is_valid_pin(pin, serial) - if not valid_pin and not retries: - raise InvalidPINError("No retries left. YubiKey locked.") - if not valid_pin: - if not click.confirm( - f"Incorrect PIN. Do you want to try again? {retries} retires left." - ): - raise InvalidPINError("PIN input cancelled") - return pin +def verify_yk_inserted(serial_num, key_name): + + def _check_if_yk_inserted(): + try: + serials = get_serial_num() + except Exception: + return False + return serial_num in serials + + while not _check_if_yk_inserted(): + prompt_message = f"Please insert {key_name} YubiKey and press ENTER" + getpass(prompt_message) def yubikey_prompt( key_name, diff --git a/taf/yubikey/yubikey_manager.py b/taf/yubikey/yubikey_manager.py index ec086735..26d62fd0 100644 --- a/taf/yubikey/yubikey_manager.py +++ b/taf/yubikey/yubikey_manager.py @@ -9,6 +9,9 @@ def __init__(self): # Initializes the dictionary to store YubiKey data self._yubikeys_data = defaultdict(dict) + def is_loaded(self, serial_number): + return any(data["serial"] == serial_number for data in self._yubikeys_data.values()) + def is_key_name_loaded(self, key_name: str) -> bool: """Check if the key name is already loaded.""" return key_name in self._yubikeys_data @@ -50,8 +53,6 @@ def clear_pins(self): def get_pin(self, serial_number): return self._pins.get(serial_number) - def is_loaded(self, serial_number): - return self.get_pin(serial_number) is not None @contextlib.contextmanager