Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple YubiKeys support #583

Merged
merged 41 commits into from
Feb 14, 2025
Merged
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
be4edfa
feat: add list yubikeys command
renatav Dec 13, 2024
df3f0b6
feat, WIP: Initial work on iterating over inserted yubikeys
renatav Dec 14, 2024
4bb667c
refact: initial implementation of signing using yubikeys that iterate…
renatav Dec 16, 2024
95ce03b
feat: initial work on storing names of keys in metadata files
renatav Dec 17, 2024
540f0c9
chore: merge feature/tuf-repository and resovle conflicts
renatav Jan 9, 2025
e185ef8
refact: resolve flake issues
renatav Jan 11, 2025
0c08165
feat: add key name to root.json if defined
renatav Jan 11, 2025
f55d0a5
feat: add initial pin manager
renatav Jan 16, 2025
d8cd3a3
refact: move dictionary keeping track of loaded yubikeys to auth repo
renatav Jan 17, 2025
0c5363a
feat, refact: check if yk inserted before signing, move check if load…
renatav Jan 18, 2025
40e6a6e
fix: fix check if only one yubikey inserted
renatav Jan 18, 2025
56a9694
refact: mypy and flake fixes
renatav Jan 18, 2025
fcc9a27
feat: work on loading multiple keys when signing and using key names …
renatav Jan 20, 2025
ef4aedd
refact: update _load_and_append_yubikeys, support loading from multip…
renatav Jan 20, 2025
df29f8a
feat: show names of keys when asking the user to insert yubikeys
renatav Jan 21, 2025
19da186
fix: fix loading single yubikeys and improve messages
renatav Jan 22, 2025
0164576
refact: update yubikey commands to add support for multiple yubikeys
renatav Jan 23, 2025
dfd8b08
refact: allow specification of pin manager not linked with an auth repo
renatav Jan 23, 2025
4c4471a
chore: added pin_manager to api commands and pin_managed decorator to…
renatav Jan 24, 2025
7036b27
feat: add auto continue flag to pin manager
renatav Jan 24, 2025
87eb87a
chore: fix mypy errors
renatav Jan 24, 2025
c43686f
tests: update tests after the addition of pin manager
renatav Jan 24, 2025
dadd79b
fix, refact: fix yubikeys setup when the same yubikeys can be used fo…
renatav Jan 28, 2025
12b0148
fix: fix setting names of keys when there are conflicts, fix loading …
renatav Jan 28, 2025
291d98a
feat: when adding a new role, check if a key name already exists
renatav Jan 29, 2025
16e1608
fix: fix create new role when additional verification keys defined an…
renatav Jan 29, 2025
816809d
chore: formatting and mypy fixes
renatav Jan 29, 2025
09ef0cc
test: update tests after the addition of pin manager
renatav Jan 29, 2025
891905d
fix: iterate over target roles while looking for keys
renatav Jan 29, 2025
29fa0a5
chore: flake and mypy fixes
renatav Jan 29, 2025
f56d466
test: fix sign_target_files
renatav Jan 29, 2025
c98b314
test: fix failing tests
renatav Jan 29, 2025
0e5b5f9
test: fix failing fake yk test
renatav Jan 30, 2025
73848cc
chore: update changelog
renatav Jan 30, 2025
bb97b48
Merge branch 'master' into renatav/multiple-yubikeys
renatav Jan 30, 2025
beef380
fix: ensure 'yk_piv_ctrl' returns same type
n-dusan Feb 10, 2025
bc7e658
docs: docstrings, type hints
renatav Feb 12, 2025
f17360e
chore: mypy fixes
renatav Feb 12, 2025
4d48a72
fix: minor create repo and add roles fixes
renatav Feb 13, 2025
37ced4f
Merge branch 'master' into renatav/multiple-yubikeys
renatav Feb 13, 2025
2788ea1
chore: mypy fix
renatav Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: fix setting names of keys when there are conflicts, fix loading …
…keys when public key is defined
renatav committed Jan 28, 2025
commit 12b014891be3c5282cd3b2a59da26f04ef560a35
4 changes: 2 additions & 2 deletions taf/api/repository.py
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ def create_repository(

roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)
auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager)
signers, verification_keys, keys_name_mappings = load_sorted_keys_of_new_roles(
signers, verification_keys = load_sorted_keys_of_new_roles(
roles=roles_keys_data.roles,
auth_repo=auth_repo,
yubikeys_data=roles_keys_data.yubikeys,
@@ -87,7 +87,7 @@ def create_repository(
if signers is None:
return

auth_repo.create(roles_keys_data, signers, verification_keys, keys_name_mappings)
auth_repo.create(roles_keys_data, signers, verification_keys)
if commit:
auth_repo.init_repo()
commit_msg = git_commit_message("create-repo")
31 changes: 18 additions & 13 deletions taf/api/roles.py
Original file line number Diff line number Diff line change
@@ -132,7 +132,7 @@ def add_role(
new_role.threshold = threshold
new_role.yubikey = yubikey

signers, _, key_name_mappings = load_sorted_keys_of_new_roles(
signers, _ = load_sorted_keys_of_new_roles(
roles=new_role,
auth_repo=auth_repo,
yubikeys_data=None,
@@ -259,10 +259,12 @@ def add_roles(
None
"""


auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager)
roles_keys_data_new = _initialize_roles_and_keystore_for_existing_repo(
path, auth_repo, roles_key_infos, keystore,
path,
auth_repo,
roles_key_infos,
keystore,
)
roles_data = auth_repo.generate_roles_description()
roles_keys_data_current = from_dict(roles_data, RolesKeysData)
@@ -300,7 +302,7 @@ def add_roles(
):
all_signers = {}
for role_to_add_data in roles_to_add_data:
signers, _, key_name_mappings = load_sorted_keys_of_new_roles(
signers, _ = load_sorted_keys_of_new_roles(
roles=role_to_add_data,
auth_repo=auth_repo,
yubikeys_data=None,
@@ -310,6 +312,7 @@ def add_roles(
)
all_signers.update(signers)

# TODO add key name mappings
auth_repo.create_delegated_roles(roles_to_add_data, all_signers)
auth_repo.add_new_roles_to_snapshot(roles_to_add)
auth_repo.do_timestamp()
@@ -711,15 +714,11 @@ def _transform_roles_dict(data, auth_repo):
if key_name not in yubikeys_data:
yubikeys_data[key_name] = {}

transformed_roles = {
"root": {},
"snapshot": {},
"timestamp": {}
}
transformed_roles = {"root": {}, "snapshot": {}, "timestamp": {}}

if "roles" in data:
for role_name, role_data in data["roles"].items():
parent_role = role_data.pop('parent_role')
parent_role = role_data.pop("parent_role")

if parent_role == "targets":
if "targets" not in transformed_roles:
@@ -729,8 +728,12 @@ def _transform_roles_dict(data, auth_repo):
if "targets" not in transformed_roles:
transformed_roles["targets"] = {"delegations": {}}
if parent_role not in transformed_roles["targets"]["delegations"]:
transformed_roles["targets"]["delegations"][parent_role] = {"delegations": {}}
transformed_roles["targets"]["delegations"][parent_role]["delegations"][role_name] = role_data
transformed_roles["targets"]["delegations"][parent_role] = {
"delegations": {}
}
transformed_roles["targets"]["delegations"][parent_role]["delegations"][
role_name
] = role_data

transformed_data["roles"] = transformed_roles

@@ -749,7 +752,9 @@ def _initialize_roles_and_keystore_for_existing_repo(
if not roles_key_infos_dict and enter_info:
roles_key_infos_dict = _enter_roles_infos(None, roles_key_infos)
elif roles_key_infos_dict:
import pdb; pdb.set_trace()
import pdb

pdb.set_trace()
roles_key_infos_dict = _transform_roles_dict(roles_key_infos_dict, auth_repo)
roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)
keystore = keystore or roles_keys_data.keystore
173 changes: 94 additions & 79 deletions taf/keys.py
Original file line number Diff line number Diff line change
@@ -43,6 +43,19 @@
yk = YubikeyMissingLibrary() # type: ignore


def _create_signer(auth_repo, public_key, serial_num, key_name):
return YkSigner(
public_key,
serial_num,
partial(
yk.yk_secrets_handler,
pin_manager=auth_repo.pin_manager,
serial_num=serial_num,
),
key_name=key_name,
)


def get_key_name(role_name: str, key_num: int, num_of_keys: int) -> str:
"""
Return a keystore key's name based on the role's name and total number of signing keys,
@@ -139,12 +152,11 @@ def _sort_roles(roles):
keystore_roles, yubikey_roles = _sort_roles(roles)
signers: Dict = {}
verification_keys: Dict = {}
keys_name_mappings: Dict = {}

for role in keystore_roles:
if role.name in existing_roles:
continue
keystore_signers, _, _, key_name_mapping = setup_roles_keys(
keystore_signers, _, _ = setup_roles_keys(
role,
auth_repo,
keystore=keystore,
@@ -153,12 +165,10 @@ def _sort_roles(roles):
for signer in keystore_signers:
signers.setdefault(role.name, []).append(signer)

keys_name_mappings.update(key_name_mapping)

for role in yubikey_roles:
if role.name in existing_roles:
continue
_, yubikey_keys, yubikey_signers, key_name_mapping = setup_roles_keys(
_, yubikey_keys, yubikey_signers = setup_roles_keys(
role,
auth_repo,
certs_dir=certs_dir,
@@ -167,8 +177,8 @@ def _sort_roles(roles):
)
verification_keys[role.name] = yubikey_keys
signers[role.name] = yubikey_signers
keys_name_mappings.update(key_name_mapping)
return signers, verification_keys, keys_name_mappings

return signers, verification_keys
except KeystoreError:
raise SigningError("Could not load keys of new roles")

@@ -369,13 +379,10 @@ def setup_roles_keys(
if users_yubikeys_details is None:
users_yubikeys_details = {}

keys_name_mappings: Dict = {}

if role.is_yubikey:
yubikey_keys, yubikey_signers, keys_name_mapping = _setup_yubikey_roles_keys(
yubikey_keys, yubikey_signers = _setup_yubikey_roles_keys(
auth_repo, yubikey_ids, users_yubikeys_details, role, certs_dir, key_size
)
keys_name_mappings.update(keys_name_mapping)
else:
if keystore is None:
taf_logger.error("No keystore provided and no default keystore found")
@@ -393,9 +400,9 @@ def setup_roles_keys(
skip_prompt=skip_prompt,
)
keystore_signers.append(signer)
keys_name_mappings[key_id] = key_name
auth_repo.add_key_name(key_name, key_id)

return keystore_signers, yubikey_keys, yubikey_signers, keys_name_mappings
return keystore_signers, yubikey_keys, yubikey_signers


def _setup_yubikey_roles_keys(
@@ -405,19 +412,6 @@ def _setup_yubikey_roles_keys(
yk_with_public_key = {}
yubikey_keys = []
signers = []
keyid_name_mapping = {}

def _create_signer(public_key, serial_num, key_name):
return YkSigner(
public_key,
serial_num,
partial(
yk.yk_secrets_handler,
pin_manager=auth_repo.pin_manager,
serial_num=serial_num,
),
key_name=key_name,
)

# if a key was already loaded (while setting up a different role)
# register signers and remove the key id, so that the user is not asked to enter it again
@@ -428,11 +422,13 @@ def _create_signer(public_key, serial_num, key_name):
key_data = auth_repo.yubikey_store.get_key_data(key_name)
if key_data is not None:
public_key, serial_num = key_data
auth_repo.yubikey_store.add_key_data(key_name, serial_num, public_key, role.name)
auth_repo.yubikey_store.add_key_data(
key_name, serial_num, public_key, role.name
)
yubikey_ids.remove(key_name)
yubikey_keys.append(public_key)
loaded_keys_num += 1
signer = _create_signer(public_key, serial_num, key_name)
signer = _create_signer(auth_repo, public_key, serial_num, key_name)
signers.append(signer)

# if key already loaded while setting up a different role, skip it
@@ -442,7 +438,6 @@ def _create_signer(public_key, serial_num, key_name):
for key_name, key_data in auth_repo.yubikey_store.yubikeys_data.items():
if key_name not in yubikey_ids:
yubikes_to_skip.append(key_data["serial"])

else:
yubikey_ids = [f"{role.name}{counter}" for counter in range(1, role.number + 1)]

@@ -453,8 +448,15 @@ def _create_signer(public_key, serial_num, key_name):
if public_key_text:
scheme = users_yubikeys_details[key_name].scheme
public_key = get_sslib_key_from_value(public_key_text, scheme)
yk_with_public_key[key_name] = public_key
loaded_keys_num += 1
# Check if the signing key is already loaded
if not auth_repo.yubikey_store.is_key_name_loaded(key_name):
yk_with_public_key[key_name] = public_key
else:
serial_num = auth_repo.yubikey_store.get_key_data["serial"]
auth_repo.yubikey_store.add_key_data(
key_name, serial_num, public_key, role.name
)
loaded_keys_num += 1
yubikey_keys.append(public_key)
else:
key_scheme = None
@@ -471,53 +473,24 @@ def _create_signer(public_key, serial_num, key_name):
yubikes_to_skip,
)
loaded_keys_num += 1
signer = _create_signer(public_key, serial_num, key_name)
signer = _create_signer(auth_repo, public_key, serial_num, key_name)
signers.append(signer)

key_id = _get_legacy_keyid(public_key)
if names_defined or key_id not in keyid_name_mapping:
keyid_name_mapping[key_id] = key_name
auth_repo.add_key_name(key_name, key_id, overwrite=names_defined)

if loaded_keys_num < role.number: # role.threshold:
if loaded_keys_num < role.threshold:
print(f"Threshold of role {role.name} is {role.threshold}")
while loaded_keys_num < role.threshold:
loaded_keys = []
for key_name, public_key in yk_with_public_key.items():
if (
key_name in users_yubikeys_details
and not users_yubikeys_details[key_name].present
):
continue
serial_num = _load_and_verify_yubikey(
role.name,
key_name,
public_key,
taf_repo=auth_repo,
)
if serial_num:
loaded_keys_num += 1
loaded_keys.append(key_name)
signer = YkSigner(
public_key,
partial(
yk.yk_secrets_handler,
pin_manager=auth_repo.pin_manager,
serial_num=serial_num,
),
key_name=key_name,
)
signers.append(signer)
if loaded_keys_num == role.threshold:
break
if loaded_keys_num < role.threshold:
if not click.confirm(
f"Threshold of signing keys of role {role.name} not reached. Continue?"
):
raise SigningError("Not enough signing keys")
for key_name in loaded_keys:
yk_with_public_key.pop(key_name)
_load_remaining_keys_of_role(
auth_repo,
role,
loaded_keys_num,
users_yubikeys_details,
yk_with_public_key,
signers,
)

return yubikey_keys, signers, keyid_name_mapping
return yubikey_keys, signers


def _setup_keystore_key(
@@ -640,6 +613,44 @@ def _setup_yubikey(
return key, serial_num


def _load_remaining_keys_of_role(
auth_repo: AuthenticationRepository,
role: Role,
loaded_keys_num: int,
users_yubikeys_details: UserKeyData,
yk_with_public_key: Dict,
signers: List,
):
"""
If a a yubikey's public key was specified, meaning that it can be added as a
verification key without being inserted, but the total number of signing
keys is smaller than the threshold
"""
while loaded_keys_num < role.threshold:
loaded_keys = []
for key_name, public_key in yk_with_public_key.items():
serial_num = _load_and_verify_yubikey(
role.name,
key_name,
public_key,
taf_repo=auth_repo,
)
if serial_num:
loaded_keys_num += 1
loaded_keys.append(key_name)
signer = _create_signer(auth_repo, public_key, serial_num, key_name)
signers.append(signer)
if loaded_keys_num == role.threshold:
break
if loaded_keys_num < role.threshold:
if not click.confirm(
f"Threshold of signing keys of role {role.name} not reached. Continue?"
):
raise SigningError("Not enough signing keys")
for key_name in loaded_keys:
yk_with_public_key.pop(key_name)


def _load_and_verify_yubikey(
role_name: str,
key_name: str,
@@ -649,7 +660,7 @@ def _load_and_verify_yubikey(
if not click.confirm(f"Sign using {key_name} Yubikey?"):
return None
while True:
yk_public_key, _ = yk.yubikey_prompt(
yubikeys = yk.yubikey_prompt(
[key_name],
role=role_name,
pin_manager=taf_repo.pin_manager,
@@ -659,9 +670,13 @@ def _load_and_verify_yubikey(
pin_confirm=True,
pin_repeat=True,
)

if yk_public_key["keyid"] != public_key["keyid"]:
print("Public key of the inserted key is not equal to the specified one.")
if not click.confirm("Try again?"):
return None
return yk.get_serial_num()
if yubikeys:
yubikey = yubikeys[0]
yk_pub_key_id = yubikey[0].keyid
if yk_pub_key_id != public_key.keyid:
print(
"Public key of the inserted key is not equal to the specified one."
)
if not click.confirm("Try again?"):
return None
return yubikey[1]
1 change: 1 addition & 0 deletions taf/models/types.py
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ def yubikey_ids(self):
return None
return self.yubikeys


@attrs.define
class RootRole(Role):
name: str = "root"
Loading