Skip to content

Commit

Permalink
feat: work on loading multiple keys when signing and using key names …
Browse files Browse the repository at this point in the history
…fegined in metadata
  • Loading branch information
renatav committed Jan 20, 2025
1 parent 56a9694 commit fcc9a27
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 50 deletions.
2 changes: 1 addition & 1 deletion taf/api/yubikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def setup_signing_yubikey(
):
return
_, serial_num = yk.yubikey_prompt(
"new Yubikey",
["new Yubikey"],
creating_new_key=True,
pin_confirm=True,
pin_repeat=True,
Expand Down
30 changes: 20 additions & 10 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,25 @@ def _load_signer_from_keystore(

def _load_and_append_yubikeys(
taf_repo,
key_name,
role,
retry_on_failure,
hide_already_loaded_message,
loaded_yubikeys,
signers_yubikeys,
threshold,
initial_num_of_signatures,
signing_keys_num,
):

key_names = [count + 1 for count in range(initial_num_of_signatures, signing_keys_num)]

prompt_message = f"Please insert {role} YubiKey(s) and press ENTER.\nThreshold is {threshold}"
inserted_yubikeys = yk.yubikey_prompt(
key_name,
role,
taf_repo,
key_name=key_names,
role=role,
taf_repo=taf_repo,
loaded_yubikeys=loaded_yubikeys,
prompt_message=prompt_message,
retry_on_failure=retry_on_failure,
hide_already_loaded_message=hide_already_loaded_message,
)
Expand All @@ -214,6 +221,7 @@ def _load_and_append_yubikeys(
num_of_loaded_keys = 0
for public_key, serial_num in inserted_yubikeys:
if public_key is not None and public_key.keyid not in loaded_keyids:
key_name = taf_repo.yubikey_store.get_name_by_serial(serial_num)
signer = YkSigner(
public_key,
serial_num,
Expand All @@ -228,6 +236,7 @@ def _load_and_append_yubikeys(
loaded_keyids.append(public_key.keyid)
num_of_loaded_keys += 1
taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey")

return num_of_loaded_keys


Expand Down Expand Up @@ -294,10 +303,12 @@ def load_signers(
# loading from keystore files, couldn't load from all of them, but loaded enough
break

# try to load from the inserted YubiKey, without asking the user to insert it
key_name = get_key_name(role, num_of_signatures, signing_keys_num)
# try to load from the inserted YubiKeys, without asking the user to insert it
# in case of yubikeys, instead of asking for a particular key, ask to insert all
# that can be used to sign the current role, but either read the name from the
# metadata, or assign a role + counter name
num_of_loaded_keys = _load_and_append_yubikeys(
taf_repo, key_name, role, False, True, loaded_yubikeys, signers_yubikeys
taf_repo, role, False, True, loaded_yubikeys, signers_yubikeys, threshold, num_of_signatures, signing_keys_num
)

if num_of_loaded_keys:
Expand Down Expand Up @@ -571,14 +582,13 @@ def _setup_yubikey(
raise YubikeyError("Yubikey setup canceled")
continue
yubikeys = yk.yubikey_prompt(
key_name,
[key_name],
role_name,
taf_repo=auth_repo,
registering_new_key=True,
creating_new_key=not use_existing,
pin_confirm=True,
pin_repeat=True,
require_single_yubikey=True,
)
if yubikeys is not None:
key, serial_num = yubikeys[0]
Expand All @@ -605,7 +615,7 @@ def _load_and_verify_yubikey(
return None
while True:
yk_public_key, _ = yk.yubikey_prompt(
key_name,
[key_name],
role_name,
taf_repo=taf_repo,
registering_new_key=True,
Expand Down
23 changes: 21 additions & 2 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@ def __init__(self, path: Union[Path, str], *args, **kwargs) -> None:
self._metadata_to_keep_open: Set[str] = set()
self.pin_manager = pin_manager
self.yubikey_store = YubiKeyStore()
self.keys_name_mappings: Dict[str, str] = {}
self._keys_name_mappings: Optional[Dict[str, str]] = None

@property
def keys_name_mappings(self):
if self._keys_name_mappings is None:
self._keys_name_mappings = self.load_key_names()
return self._keys_name_mappings

@property
def metadata_path(self) -> Path:
Expand Down Expand Up @@ -524,7 +530,7 @@ def create(
key_name_mappings: A dictionary whose keys are key ids and values are custom names of those keys
"""
if keys_name_mappings:
self.keys_name_mappings = keys_name_mappings
self._keys_name_mappings = keys_name_mappings
self.metadata_path.mkdir(parents=True)
self.signer_cache = defaultdict(dict)

Expand Down Expand Up @@ -1368,6 +1374,19 @@ def modify_targets(
)
return targets_role


def load_key_names(self):
# TODO target roles need to be handled too
root_metadata = self.signed_obj("root")
name_mapping = {}
keys = root_metadata.keys
for key_id, key_obj in keys.items():
name_data = key_obj.unrecognized_fields
if name_data is not None and "name" in name_data:
name_mapping[key_id] = name_data["name"]
return name_mapping


def _modify_targets_role(
self,
added_target_files: List[TargetFile],
Expand Down
138 changes: 101 additions & 37 deletions taf/yubikey/yubikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,19 +275,19 @@ def list_connected_yubikeys():
print(f" Form Factor: {info.form_factor}")


def _read_and_check_yubikeys(
key_name,
def _read_and_check_single_yubikey(
role,
key_name,
taf_repo,
registering_new_key,
creating_new_key,
pin_confirm,
pin_repeat,
prompt_message,
require_single_yubikey,
retrying,
):


if retrying:
if prompt_message is None:
prompt_message = f"Please insert {key_name} YubiKey and press ENTER"
Expand All @@ -296,52 +296,105 @@ def _read_and_check_yubikeys(
# make sure that YubiKey is inserted
try:
serials = get_serial_num()
if require_single_yubikey:
not_loaded = [
serial
for serial in serials
if not taf_repo.yubikey_store.is_loaded(serial)
]
if len(not_loaded) > 1:
print("\nPlease insert only one YubiKey\n")
return None
not_loaded = [
serial
for serial in serials
if not taf_repo.yubikey_store.is_loaded(serial)
]
if len(not_loaded) > 1:
print("\nPlease insert only one not previously inserted YubiKey\n")
return None
# no need to try loading keys that we know were previously loaded
serials = not_loaded

except Exception:
taf_logger.log("NOTICE", "No YubiKeys inserted")
return None

serial_num = serials[0]
# check if this key is already loaded as the provided role's key (we can use the same key
# to sign different metadata)
yubikeys = []
# read the public key, unless a new key needs to be generated on the yubikey
public_key = (
get_piv_public_key_tuf(serial=serial_num)
if not creating_new_key
else None
)
# check if this yubikey is can be used for signing the provided role's metadata
# if the key was already registered as that role's key
if not registering_new_key and role is not None and taf_repo is not None:
if not taf_repo.is_valid_metadata_yubikey(role, public_key):
return None

if taf_repo.pin_manager.get_pin(serial_num) is None:
if creating_new_key:
pin = get_pin_for(key_name, pin_confirm, pin_repeat)
else:
pin = get_and_validate_pin(
key_name, pin_confirm, pin_repeat, serial_num
)
taf_repo.pin_manager.add_pin(serial_num, pin)

# when reusing the same yubikey, public key will already be in the public keys dictionary
# but the key name still needs to be added to the key id mapping dictionary
taf_repo.yubikey_store.add_key_data(key_name, serial_num, public_key)
yubikeys.append((public_key, serial_num))

return yubikeys

def _read_and_check_yubikeys(
role,
taf_repo,
pin_confirm,
pin_repeat,
prompt_message,
key_names,
retrying,
):

if retrying:
if prompt_message is None:
prompt_message = f"Please insert {role} YubiKey(s) and press ENTER"
getpass(prompt_message)

# make sure that YubiKey is inserted
try:
serials = get_serial_num()
except Exception:
taf_logger.log("NOTICE", "No YubiKeys inserted")
return None

# check if this key is already loaded as the provided role's key (we can use the same key
# to sign different metadata)
yubikeys = []
invalid_keys = []
for serial_num in serials:
for index, serial_num in enumerate(serials):
if not taf_repo.yubikey_store.is_loaded(serial_num):
# read the public key, unless a new key needs to be generated on the yubikey
public_key = (
get_piv_public_key_tuf(serial=serial_num)
if not creating_new_key
else None
)
public_key = get_piv_public_key_tuf(serial=serial_num)
# check if this yubikey is can be used for signing the provided role's metadata
# if the key was already registered as that role's key
if not registering_new_key and role is not None and taf_repo is not None:
if role is not None and taf_repo is not None:
if not taf_repo.is_valid_metadata_yubikey(role, public_key):
invalid_keys.append(serial_num)
# print(f"The inserted YubiKey is not a valid {role} key")
continue

if creating_new_key:
pin = get_pin_for(key_name, pin_confirm, pin_repeat)
if taf_repo.keys_name_mappings:
key_name = taf_repo.keys_name_mappings.get(public_key.keyid)
else:
key_name = key_names[index]

if taf_repo.pin_manager.get_pin(serial_num) is None:
pin = get_and_validate_pin(
key_name, pin_confirm, pin_repeat, serial_num
)
taf_repo.pin_manager.add_pin(serial_num, pin)
taf_repo.pin_manager.add_pin(serial_num, pin)

# when reusing the same yubikey, public key will already be in the public keys dictionary
# but the key name still needs to be added to the key id mapping dictionary
taf_repo.yubikey_store.add_key_data(key_name, serial_num, public_key)

yubikeys.append((public_key, serial_num))

return yubikeys
Expand Down Expand Up @@ -492,7 +545,7 @@ def _check_if_yk_inserted():


def yubikey_prompt(
key_name,
key_names,
role=None,
taf_repo=None,
registering_new_key=False,
Expand All @@ -502,23 +555,34 @@ def yubikey_prompt(
prompt_message=None,
retry_on_failure=True,
hide_already_loaded_message=False,
require_single_yubikey=True,
):

retry_counter = 0
while True:
yubikeys = _read_and_check_yubikeys(
key_name,
role,
taf_repo,
registering_new_key,
creating_new_key,
pin_confirm,
pin_repeat,
prompt_message,
require_single_yubikey=require_single_yubikey,
retrying=retry_counter > 0,
)
retrying=retry_counter > 0
if registering_new_key or creating_new_key:
yubikeys = _read_and_check_single_yubikey(
role,
key_names[0],
taf_repo,
registering_new_key,
creating_new_key,
pin_confirm,
pin_repeat,
prompt_message,
retrying,
)
else:
yubikeys = _read_and_check_yubikeys(
role,
taf_repo,
pin_confirm,
pin_repeat,
prompt_message,
key_names,
retrying,
)

if not yubikeys and not retry_on_failure:
return [(None, None)]
if yubikeys:
Expand Down
5 changes: 5 additions & 0 deletions taf/yubikey/yubikey_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def get_key_data(self, key_name: str) -> Tuple[str, SSlibKey]:
key_data = self._yubikeys_data.get(key_name)
return key_data["public_key"], key_data["serial"]

def get_name_by_serial(self, serial_num: str) -> str:
for key_name, data in self._yubikeys_data.items():
if data["serial"] == serial_num:
return key_name

def remove_key_data(self, key_name: str) -> bool:
"""Remove data associated with a given YubiKey name if it exists."""
if self.is_key_name_loaded(key_name):
Expand Down

0 comments on commit fcc9a27

Please sign in to comment.