Skip to content

Commit

Permalink
refact: update _load_and_append_yubikeys, support loading from multip…
Browse files Browse the repository at this point in the history
…le yubikeys
  • Loading branch information
renatav committed Jan 20, 2025
1 parent fcc9a27 commit ef4aedd
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 28 deletions.
4 changes: 3 additions & 1 deletion taf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from logging import ERROR
from typing import Dict, List, Optional, Tuple
from logdecorator import log_on_error
from taf.yubikey.yubikey_manager import PinManager
from tuf.api.metadata import Snapshot, Timestamp

from taf.api.utils._git import check_if_clean
Expand Down Expand Up @@ -88,6 +89,7 @@ def print_expiration_dates(
@check_if_clean
def update_metadata_expiration_date(
path: str,
pin_manager: PinManager,
roles: List[str],
interval: Optional[int] = None,
keystore: Optional[str] = None,
Expand Down Expand Up @@ -126,7 +128,7 @@ def update_metadata_expiration_date(
None
"""

auth_repo = AuthenticationRepository(path=path)
auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager)
if start_date is None:
start_date = datetime.now()

Expand Down
36 changes: 26 additions & 10 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,21 +197,23 @@ def _load_and_append_yubikeys(
role,
retry_on_failure,
hide_already_loaded_message,
loaded_yubikeys,
signers_yubikeys,
threshold,
initial_num_of_signatures,
signing_keys_num,
):

key_names = [count + 1 for count in range(initial_num_of_signatures, signing_keys_num)]
key_names = [
count + 1 for count in range(initial_num_of_signatures, signing_keys_num)
]

prompt_message = f"Please insert {role} YubiKey(s) and press ENTER.\nThreshold is {threshold}"
prompt_message = (
f"Please insert {role} YubiKey(s) and press ENTER.\nThreshold is {threshold}"
)
inserted_yubikeys = yk.yubikey_prompt(
key_name=key_names,
key_names=key_names,
role=role,
taf_repo=taf_repo,
loaded_yubikeys=loaded_yubikeys,
prompt_message=prompt_message,
retry_on_failure=retry_on_failure,
hide_already_loaded_message=hide_already_loaded_message,
Expand Down Expand Up @@ -308,7 +310,14 @@ def load_signers(
# that can be used to sign the current role, but either read the name from the
# metadata, or assign a role + counter name
num_of_loaded_keys = _load_and_append_yubikeys(
taf_repo, role, False, True, loaded_yubikeys, signers_yubikeys, threshold, num_of_signatures, signing_keys_num
taf_repo,
role,
False,
True,
signers_yubikeys,
threshold,
num_of_signatures,
signing_keys_num,
)

if num_of_loaded_keys:
Expand All @@ -321,10 +330,17 @@ def load_signers(
prompt_for_yubikey = False

if use_yubikey_for_signing_confirmed:
if _load_and_append_yubikeys(
taf_repo, key_name, role, True, False, loaded_yubikeys, signers_yubikeys
):
num_of_signatures += 1
num_of_loaded_keys = _load_and_append_yubikeys(
taf_repo,
role,
True,
False,
signers_yubikeys,
threshold,
num_of_signatures,
signing_keys_num,
)
num_of_signatures += num_of_loaded_keys
continue

if prompt_for_keys and click.confirm(f"Manually enter {role} key?"):
Expand Down
5 changes: 4 additions & 1 deletion taf/tools/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from taf.constants import DEFAULT_RSA_SIGNATURE_SCHEME
from taf.exceptions import TAFError
from taf.tools.cli import catch_cli_exception, find_repository
from taf.tools.repo import pin_managed
from taf.utils import ISO_DATE_PARAM_TYPE as ISO_DATE
import datetime

Expand Down Expand Up @@ -58,12 +59,14 @@ def update_expiration_dates_command():
@click.option("--start-date", default=datetime.datetime.now(), type=ISO_DATE, help="Date to which the interval is added")
@click.option("--no-commit", is_flag=True, default=False, help="Indicates that the changes should not be committed automatically")
@click.option("--prompt-for-keys", is_flag=True, default=False, help="Whether to ask the user to enter their key if not located inside the keystore directory")
def update_expiration_dates(path, role, interval, keystore, scheme, start_date, no_commit, prompt_for_keys):
@pin_managed
def update_expiration_dates(path, role, interval, keystore, scheme, start_date, no_commit, prompt_for_keys, pin_manager):
if not len(role):
print("Specify at least one role")
return
update_metadata_expiration_date(
path=path,
pin_manager=pin_manager,
roles=role,
interval=interval,
keystore=keystore,
Expand Down
11 changes: 7 additions & 4 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ def _filter_if_can_be_added(roles):

return added_keys, already_added_keys, invalid_keys

def add_signers_to_cache(self, roles_signers: Dict):
for role, signers in roles_signers.items():
if self._role_obj(role):
self._load_role_signers(role, signers)

def add_target_files_to_role(self, added_data: Dict[str, Dict]) -> None:
"""Add target files to top-level targets metadata.
Expand Down Expand Up @@ -606,13 +611,13 @@ def _process_keys(self, signers, additional_verification_keys):
for role_name, role_signers in signers.items():
public_keys[role_name] = {}
for signer in role_signers:
key_id = self._get_legacy_keyid(signer.public_key)
key_id = _get_legacy_keyid(signer.public_key)
public_keys[role_name][key_id] = signer.public_key

if additional_verification_keys:
for role_name, keys in additional_verification_keys.items():
for public_key in keys:
key_id = self._get_legacy_keyid(public_key)
key_id = _get_legacy_keyid(public_key)
public_keys[role_name][key_id] = public_key
return public_keys

Expand Down Expand Up @@ -1374,7 +1379,6 @@ def modify_targets(
)
return targets_role


def load_key_names(self):
# TODO target roles need to be handled too
root_metadata = self.signed_obj("root")
Expand All @@ -1386,7 +1390,6 @@ def load_key_names(self):
name_mapping[key_id] = name_data["name"]
return name_mapping


def _modify_targets_role(
self,
added_target_files: List[TargetFile],
Expand Down
23 changes: 11 additions & 12 deletions taf/yubikey/yubikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ def _read_and_check_single_yubikey(
retrying,
):


if retrying:
if prompt_message is None:
prompt_message = f"Please insert {key_name} YubiKey and press ENTER"
Expand All @@ -296,14 +295,17 @@ def _read_and_check_single_yubikey(
# make sure that YubiKey is inserted
try:
serials = get_serial_num()

not_loaded = [
serial
for serial in serials
if not taf_repo.yubikey_store.is_loaded(serial)
serial for serial in serials if not taf_repo.yubikey_store.is_loaded(serial)
]
if len(not_loaded) > 1:
print("\nPlease insert only one not previously inserted YubiKey\n")
return None

if not len(not_loaded):
return None

# no need to try loading keys that we know were previously loaded
serials = not_loaded

Expand All @@ -317,9 +319,7 @@ def _read_and_check_single_yubikey(
yubikeys = []
# read the public key, unless a new key needs to be generated on the yubikey
public_key = (
get_piv_public_key_tuf(serial=serial_num)
if not creating_new_key
else None
get_piv_public_key_tuf(serial=serial_num) if not creating_new_key else None
)
# check if this yubikey is can be used for signing the provided role's metadata
# if the key was already registered as that role's key
Expand All @@ -331,9 +331,7 @@ def _read_and_check_single_yubikey(
if creating_new_key:
pin = get_pin_for(key_name, pin_confirm, pin_repeat)
else:
pin = get_and_validate_pin(
key_name, pin_confirm, pin_repeat, serial_num
)
pin = get_and_validate_pin(key_name, pin_confirm, pin_repeat, serial_num)
taf_repo.pin_manager.add_pin(serial_num, pin)

# when reusing the same yubikey, public key will already be in the public keys dictionary
Expand All @@ -343,6 +341,7 @@ def _read_and_check_single_yubikey(

return yubikeys


def _read_and_check_yubikeys(
role,
taf_repo,
Expand Down Expand Up @@ -375,7 +374,7 @@ def _read_and_check_yubikeys(
public_key = get_piv_public_key_tuf(serial=serial_num)
# check if this yubikey is can be used for signing the provided role's metadata
# if the key was already registered as that role's key
if role is not None and taf_repo is not None:
if role is not None and taf_repo is not None:
if not taf_repo.is_valid_metadata_yubikey(role, public_key):
invalid_keys.append(serial_num)
# print(f"The inserted YubiKey is not a valid {role} key")
Expand Down Expand Up @@ -559,7 +558,7 @@ def yubikey_prompt(

retry_counter = 0
while True:
retrying=retry_counter > 0
retrying = retry_counter > 0
if registering_new_key or creating_new_key:
yubikeys = _read_and_check_single_yubikey(
role,
Expand Down

0 comments on commit ef4aedd

Please sign in to comment.