Skip to content

Commit

Permalink
feat, refact: check if yk inserted before signing, move check if load…
Browse files Browse the repository at this point in the history
…ed to yubikey store
  • Loading branch information
renatav committed Jan 18, 2025
1 parent d8cd3a3 commit 0c5363a
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 29 deletions.
5 changes: 2 additions & 3 deletions taf/api/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def create_repository(

roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)
auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager)
signers, verification_keys, key_name_mappings = load_sorted_keys_of_new_roles(
signers, verification_keys, keys_name_mappings = load_sorted_keys_of_new_roles(
roles=roles_keys_data.roles,
auth_repo=auth_repo,
yubikeys_data=roles_keys_data.yubikeys,
Expand All @@ -87,8 +87,7 @@ def create_repository(
if signers is None:
return

repository = AuthenticationRepository(path=path)
repository.create(roles_keys_data, signers, verification_keys, key_name_mappings)
auth_repo.create(roles_keys_data, signers, verification_keys, keys_name_mappings)
if commit:
auth_repo.init_repo()
commit_msg = git_commit_message("create-repo")
Expand Down
6 changes: 4 additions & 2 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def _sort_roles(roles):
for signer in keystore_signers:
signers.setdefault(role.name, []).append(signer)

# TODO add to repository...
keys_name_mappings.update(key_name_mapping)

for role in yubikey_roles:
Expand Down Expand Up @@ -220,6 +219,7 @@ def _load_and_append_yubikeys(
public_key,
serial_num,
partial(yk.yk_secrets_handler, pin_manager=taf_repo.pin_manager, serial_num=serial_num),
key_name=key_name,
)
signers_yubikeys.append(signer)
loaded_keyids.append(public_key.keyid)
Expand Down Expand Up @@ -421,6 +421,7 @@ def _setup_yubikey_roles_keys(
public_key,
serial_num,
partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num),
key_name=key_name,
)
signers.append(signer)
keyid_name_mapping[_get_legacy_keyid(public_key)] = key_name
Expand All @@ -443,7 +444,8 @@ def _setup_yubikey_roles_keys(
loaded_keys.append(key_name)
signer = YkSigner(
public_key,
partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num)
partial(yk.yk_secrets_handler, pin_manager=auth_repo.pin_manager, serial_num=serial_num),
key_name=key_name
)
signers.append(signer)
if loaded_keys_num == role.threshold:
Expand Down
11 changes: 8 additions & 3 deletions taf/tuf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,13 @@ class YkSigner(Signer):
_SECRET_PROMPT = "pin"

def __init__(
self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler
self, public_key: SSlibKey, serial_num: str, pin_handler: SecretsHandler, key_name: str
):

self._public_key = public_key
self._pin_handler = pin_handler
self._serial_num = serial_num
self._key_name = key_name

@property
def public_key(self) -> SSlibKey:
Expand All @@ -201,6 +202,10 @@ def public_key(self) -> SSlibKey:
def serial_num(self) -> str:
return self._serial_num

@property
def key_name(self) -> str:
return self._key_name

@classmethod
def import_(cls) -> SSlibKey:
"""Import rsa public key from Yubikey.
Expand All @@ -221,8 +226,8 @@ def import_(cls) -> SSlibKey:

def sign(self, payload: bytes) -> Signature:
pin = self._pin_handler(self._SECRET_PROMPT)
from taf.yubikey.yubikey import sign_piv_rsa_pkcs1v15

from taf.yubikey.yubikey import sign_piv_rsa_pkcs1v15, verify_yk_inserted
verify_yk_inserted(self.serial_num, self.key_name)
sig = sign_piv_rsa_pkcs1v15(payload, pin, serial=self.serial_num)
return Signature(self.public_key.keyid, sig.hex())

Expand Down
11 changes: 6 additions & 5 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def __init__(self, path: Union[Path, str], *args, **kwargs) -> None:
self._metadata_to_keep_open: Set[str] = set()
self.pin_manager = pin_manager
self.yubikey_store = YubiKeyStore()
self.keys_name_mappings: Dict[str, str] = {}

@property
def metadata_path(self) -> Path:
Expand Down Expand Up @@ -504,7 +505,7 @@ def create(
roles_keys_data: RolesKeysData,
signers: dict,
additional_verification_keys: Optional[dict] = None,
key_name_mappings: Optional[Dict[str, str]] = None,
keys_name_mappings: Optional[Dict[str, str]] = None,
) -> None:
"""Create a new metadata repository on disk.
Expand All @@ -523,8 +524,8 @@ def create(
present at the time of the repository's creation
key_name_mappings: A dictionary whose keys are key ids and values are custom names of those keys
"""
# TODO add verification keys
# support yubikeys
if keys_name_mappings:
self.keys_name_mappings = keys_name_mappings
self.metadata_path.mkdir(parents=True)
self.signer_cache = defaultdict(dict)

Expand Down Expand Up @@ -557,8 +558,8 @@ def create(
self.signer_cache[role.name][key_id] = signer
for public_key in public_keys[role.name].values():
key_id = _get_legacy_keyid(public_key)
if key_id in key_name_mappings:
public_key.unrecognized_fields["name"] = key_name_mappings[key_id]
if key_id in self.keys_name_mappings:
public_key.unrecognized_fields["name"] = self.keys_name_mappings[key_id]
root.add_key(public_key, role.name)
root.roles[role.name].threshold = role.threshold

Expand Down
42 changes: 28 additions & 14 deletions taf/yubikey/yubikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,21 @@ def export_yk_certificate(certs_dir, key: SSlibKey, serial: str):
f.write(export_piv_x509(serial=serial))


def get_and_validate_pin(key_name, pin_confirm=True, pin_repeat=True, serial=None):
valid_pin = False
while not valid_pin:
pin = get_pin_for(key_name, pin_confirm, pin_repeat)
valid_pin, retries = is_valid_pin(pin, serial)
if not valid_pin and not retries:
raise InvalidPINError("No retries left. YubiKey locked.")
if not valid_pin:
if not click.confirm(
f"Incorrect PIN. Do you want to try again? {retries} retires left."
):
raise InvalidPINError("PIN input cancelled")
return pin


@raise_yubikey_err("Cannot get public key in TUF format.")
def get_piv_public_key_tuf(
scheme=DEFAULT_RSA_SIGNATURE_SCHEME, serial=None
Expand Down Expand Up @@ -290,7 +305,7 @@ def _read_and_check_yubikeys(
yubikeys = []
invalid_keys = []
for serial_num in serials:
if not taf_repo.pin_manager.is_loaded(serial_num):
if not taf_repo.yubikey_store.is_loaded(serial_num):
# read the public key, unless a new key needs to be generated on the yubikey
public_key = (
get_piv_public_key_tuf(serial=serial_num)
Expand Down Expand Up @@ -456,20 +471,19 @@ def setup_new_yubikey(
return key


def get_and_validate_pin(key_name, pin_confirm=True, pin_repeat=True, serial=None):
valid_pin = False
while not valid_pin:
pin = get_pin_for(key_name, pin_confirm, pin_repeat)
valid_pin, retries = is_valid_pin(pin, serial)
if not valid_pin and not retries:
raise InvalidPINError("No retries left. YubiKey locked.")
if not valid_pin:
if not click.confirm(
f"Incorrect PIN. Do you want to try again? {retries} retires left."
):
raise InvalidPINError("PIN input cancelled")
return pin
def verify_yk_inserted(serial_num, key_name):

def _check_if_yk_inserted():
try:
serials = get_serial_num()
except Exception:
return False

return serial_num in serials

while not _check_if_yk_inserted():
prompt_message = f"Please insert {key_name} YubiKey and press ENTER"
getpass(prompt_message)

def yubikey_prompt(
key_name,
Expand Down
5 changes: 3 additions & 2 deletions taf/yubikey/yubikey_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ def __init__(self):
# Initializes the dictionary to store YubiKey data
self._yubikeys_data = defaultdict(dict)

def is_loaded(self, serial_number):
return any(data["serial"] == serial_number for data in self._yubikeys_data.values())

def is_key_name_loaded(self, key_name: str) -> bool:
"""Check if the key name is already loaded."""
return key_name in self._yubikeys_data
Expand Down Expand Up @@ -50,8 +53,6 @@ def clear_pins(self):
def get_pin(self, serial_number):
return self._pins.get(serial_number)

def is_loaded(self, serial_number):
return self.get_pin(serial_number) is not None


@contextlib.contextmanager
Expand Down

0 comments on commit 0c5363a

Please sign in to comment.