-
-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #512 from lukpueh/tuf-upgrade
Add alternative TUF metadata repo implementation (WIP)
- Loading branch information
Showing
10 changed files
with
596 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from pathlib import Path | ||
|
||
|
||
# TODO: de-duplicate with conftest.py constants | ||
TEST_DATA_PATH = Path(__file__).parent.parent / "data" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import pytest | ||
|
||
from taf.tests.tuf import TEST_DATA_PATH | ||
from taf.tuf.keys import load_public_key_from_file, load_signer_from_file | ||
from tuf.api.metadata import Metadata, Root | ||
from securesystemslib.exceptions import UnverifiedSignatureError | ||
|
||
|
||
class TestKeys: | ||
def test_keys(self): | ||
"""Smoke test for key functions. | ||
Test loading public and private keys, and compatiblity with existing | ||
metadata: | ||
- newly loaded keys can verify old signatures on metadata | ||
- old keys in metadata can verify signatures from newly loaded signers | ||
""" | ||
root_path = ( | ||
TEST_DATA_PATH | ||
/ "repos" | ||
/ "test-repository-tool" | ||
/ "test-happy-path-pkcs1v15" | ||
/ "taf" | ||
/ "metadata" | ||
/ "root.json" | ||
) | ||
|
||
root = Metadata[Root].from_file(root_path) | ||
store_path = TEST_DATA_PATH / "keystores" / "keystore" | ||
for name in ["root1", "root2", "root3", "snapshot", "targets", "timestamp"]: | ||
public_key = load_public_key_from_file(store_path / f"{name}.pub") | ||
|
||
# assert hard-coded scheme and correct legacy keyid | ||
assert public_key.scheme == "rsa-pkcs1v15-sha256" | ||
assert public_key.keyid in root.signed.keys | ||
|
||
signer = load_signer_from_file(store_path / name, None) | ||
|
||
# assert public key loaded from disk matches public key derived | ||
# from private key loaded from disk | ||
assert public_key == signer.public_key | ||
|
||
# assert existing keys verify new signatures | ||
sig = signer.sign(b"DATA") | ||
existing_key = root.signed.keys[public_key.keyid] | ||
existing_key.verify_signature(sig, b"DATA") | ||
with pytest.raises(UnverifiedSignatureError): | ||
existing_key.verify_signature(sig, b"NOT DATA") | ||
|
||
# assert newly loaded keys verify existing signatures | ||
if name.startswith("root"): # there are only root sigs on root metadata | ||
existing_sig = root.signatures[public_key.keyid] | ||
public_key.verify_signature(existing_sig, root.signed_bytes) | ||
with pytest.raises(UnverifiedSignatureError): | ||
existing_key.verify_signature(sig, b"NOT DATA") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
import pytest | ||
from securesystemslib.exceptions import StorageError | ||
from taf.tuf.repository import MetadataRepository | ||
from taf.tuf.keys import load_signer_from_file | ||
|
||
from tuf.api.metadata import TargetFile | ||
|
||
from taf.tests.tuf import TEST_DATA_PATH | ||
|
||
|
||
@pytest.fixture | ||
def test_signer(): | ||
"""Create signer from some rsa test key.""" | ||
key_path = TEST_DATA_PATH / "keystores" / "keystore" / "root1" | ||
return load_signer_from_file(key_path, None) | ||
|
||
|
||
@pytest.fixture | ||
def test_signer2(): | ||
"""Create signer from some other rsa test key.""" | ||
key_path = TEST_DATA_PATH / "keystores" / "keystore" / "root2" | ||
return load_signer_from_file(key_path, None) | ||
|
||
|
||
@pytest.fixture | ||
def test_signers(test_signer): | ||
"""Dict of signers per role""" | ||
signers = {} | ||
for role in ["root", "timestamp", "snapshot", "targets"]: | ||
signers[role] = {test_signer.public_key.keyid: test_signer} | ||
return signers | ||
|
||
|
||
class TestMetadataRepository: | ||
def test_open(self): | ||
repo = MetadataRepository( | ||
TEST_DATA_PATH | ||
/ "repos" | ||
/ "test-repository-tool" | ||
/ "test-delegated-roles-pkcs1v15" | ||
/ "taf" | ||
) | ||
|
||
# assert existing role metadata can be opened | ||
for role in [ | ||
"root", | ||
"timestamp", | ||
"snapshot", | ||
"targets", | ||
"delegated_role1", | ||
"delegated_role2", | ||
"inner_delegated_role", | ||
]: | ||
assert repo.open(role) | ||
|
||
# assert non-existing role metadata cannot be opened | ||
with pytest.raises(StorageError): | ||
repo.open("foo") | ||
|
||
def test_create(self, tmp_path, test_signer, test_signers): | ||
# Create new metadata repository | ||
repo = MetadataRepository(tmp_path) | ||
repo.create(test_signers) | ||
|
||
# assert metadata files were created | ||
assert sorted([f.name for f in repo.metadata_path.glob("*")]) == [ | ||
"1.root.json", | ||
"root.json", | ||
"snapshot.json", | ||
"targets.json", | ||
"timestamp.json", | ||
] | ||
|
||
# assert correct initial version | ||
assert repo.root().version == 1 | ||
assert repo.timestamp().version == 1 | ||
assert repo.snapshot().version == 1 | ||
assert repo.targets().version == 1 | ||
|
||
# assert correct top-level delegation | ||
keyid = test_signer.public_key.keyid | ||
assert list(repo.root().keys.keys()) == [keyid] | ||
assert repo.root().roles["root"].keyids == [keyid] | ||
assert repo.root().roles["timestamp"].keyids == [keyid] | ||
assert repo.root().roles["snapshot"].keyids == [keyid] | ||
assert repo.root().roles["targets"].keyids == [keyid] | ||
|
||
# assert correct snapshot and timestamp meta | ||
assert repo.timestamp().snapshot_meta.version == 1 | ||
assert repo.snapshot().meta["root.json"].version == 1 | ||
assert repo.snapshot().meta["targets.json"].version == 1 | ||
assert len(repo.snapshot().meta) == 2 | ||
|
||
# assert repo cannot be created twice | ||
with pytest.raises(FileExistsError): | ||
repo.create(test_signers) | ||
|
||
def test_add_target_files(self, tmp_path, test_signers): | ||
"""Edit metadata repository. | ||
If we edit manually, we need to make sure to create a valid snapshot. | ||
""" | ||
# Create new metadata repository | ||
repo = MetadataRepository(tmp_path) | ||
repo.create(test_signers) | ||
|
||
target_file = TargetFile.from_data("foo.txt", b"foo", ["sha256", "sha512"]) | ||
|
||
# assert add target file and correct version bumps | ||
repo.add_target_files([target_file]) | ||
assert repo.targets().targets[target_file.path] == target_file | ||
assert repo.root().version == 1 | ||
assert repo.timestamp().version == 2 | ||
assert repo.snapshot().version == 2 | ||
assert repo.targets().version == 2 | ||
assert repo.timestamp().snapshot_meta.version == 2 | ||
assert repo.snapshot().meta["root.json"].version == 1 | ||
assert repo.snapshot().meta["targets.json"].version == 2 | ||
|
||
def test_add_keys(self, tmp_path, test_signers, test_signer2): | ||
repo = MetadataRepository(tmp_path) | ||
repo.create(test_signers) | ||
|
||
# assert add new root key and version bumps (all but targets) | ||
repo.add_keys([test_signer2], "root") | ||
assert test_signer2.public_key.keyid in repo.root().keys | ||
assert test_signer2.public_key.keyid in repo.root().roles["root"].keyids | ||
assert repo.root().version == 2 | ||
assert repo.timestamp().version == 2 | ||
assert repo.snapshot().version == 2 | ||
assert repo.targets().version == 1 | ||
assert repo.timestamp().snapshot_meta.version == 2 | ||
assert repo.snapshot().meta["root.json"].version == 2 | ||
assert repo.snapshot().meta["targets.json"].version == 1 | ||
|
||
# assert add new timestamp key and version bumps (all but targets) | ||
repo.add_keys([test_signer2], "timestamp") | ||
assert test_signer2.public_key.keyid in repo.root().roles["timestamp"].keyids | ||
assert repo.root().version == 3 | ||
assert repo.timestamp().version == 3 | ||
assert repo.snapshot().version == 3 | ||
assert repo.targets().version == 1 | ||
assert repo.timestamp().snapshot_meta.version == 3 | ||
assert repo.snapshot().meta["root.json"].version == 3 | ||
assert repo.snapshot().meta["targets.json"].version == 1 | ||
|
||
# assert add new snapshot key and version bumps (all but targets) | ||
repo.add_keys([test_signer2], "snapshot") | ||
assert test_signer2.public_key.keyid in repo.root().roles["snapshot"].keyids | ||
assert repo.root().version == 4 | ||
assert repo.timestamp().version == 4 | ||
assert repo.snapshot().version == 4 | ||
assert repo.targets().version == 1 | ||
assert repo.timestamp().snapshot_meta.version == 4 | ||
assert repo.snapshot().meta["root.json"].version == 4 | ||
assert repo.snapshot().meta["targets.json"].version == 1 | ||
|
||
# assert add new targets key and version bumps (all) | ||
repo.add_keys([test_signer2], "targets") | ||
assert test_signer2.public_key.keyid in repo.root().roles["targets"].keyids | ||
assert repo.root().version == 5 | ||
assert repo.timestamp().version == 5 | ||
assert repo.snapshot().version == 5 | ||
assert repo.targets().version == 2 | ||
assert repo.timestamp().snapshot_meta.version == 5 | ||
assert repo.snapshot().meta["root.json"].version == 5 | ||
assert repo.snapshot().meta["targets.json"].version == 2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
"""Test YkSigner""" | ||
|
||
import os | ||
import pytest | ||
|
||
from taf.tuf.keys import YkSigner | ||
|
||
from securesystemslib.exceptions import UnverifiedSignatureError | ||
|
||
|
||
# Test data to sign | ||
_DATA = b"DATA" | ||
_NOT_DATA = b"NOT DATA" | ||
|
||
# Test public key | ||
# >>> with open("taf/tests/data/keystores/keystore/root1.pub", "rb") as f: | ||
# >>> _PUB = f.read() | ||
_PUB = b"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5EGVh9xqVFFHnGGIofks\ncA3vHWFs1QP60QTX+ZJUPiUJdDb8wuJ6mu9d8bKojE3SEVHCLpJeV4+muMnLtZWq\nAipiuFUU9QDpOYaqQ5SD5n/9sZfiWDzjVsqZA4WMj0OCd/Bkn+umz3ljHFe0EJUE\nCxYRvmArC05UyJej7fCaQ/cD7QELrpmBaE2qLcG0Vfirz9NekaXixGiKNiIjHAj6\nYwIfES9SycVo42LEOskGFciqgfZJVtSaTIurW+KnOToStazEWY8okon91s+5ltIN\nOS68TtBLtph5PXcLhqSozE8SqMW3gZni6zXHHQtuouFLdGkgw+0V2YLX15Ka78zj\nhQIDAQAB\n-----END PUBLIC KEY-----" | ||
|
||
# Test signature | ||
# >>> signer = load_signer_from_file("taf/tests/data/keystores/keystore/root1", None) | ||
# >>> sig = signer.sign(_DATA) | ||
# >>> _SIG = bytes.fromhex(sig.signature) | ||
_SIG = b"\xc1}\xaa\xec\xf6#;\xe6\x89\xc26\x81\x1a;\xd3\xb2\x7f\xce\xe3}\x9a6w}P\xe0d\x8d\xeb\xbcb\xba8\x8c\x96tS\xf2_\xf37\xe8Z\xc4\xf4\x1a\xaa\xdd\xdd%AB#w\x93\xc9\x0f\x8d\xe4\x93)\x9f\xa4)\x0b\xbb\xce\xf4\x9e\x8b\xaa\x1c\xda\xb8\x9ex\xe2\xc8\x9c\x02\\\xb7\x89\x88g\xd3\xb2\x0be\xf4S\x0c*\x0c\xce\xfe\x8aL=\x07\xfa\xe9\xa2\xe1\xed\x1cA\xf9\xbeZR\x91\xae@\x12\xfe<n\xe9;\xa3\xcdr\xabB\x87\x02N\xe5\x8a\x0b3>\xbey`\x07 /)Z_\xd0\xca\x7f\xcey\xe6\x1ee~\x01\x0c\xcfQZ=a\xf6\xe9\xabm_\x12\x8e\xda\xb0\xd4\xaeb1W\x0e\xf0\x909\xae\x05}\x8f\xba\xf7\xa0\\Rx\xe9\x98\x0f4j86\x87\x17\xf5\xff\xc2U\x80oh\xad\xb2\xaf\xa5\x91\x9a\xafI,\xadj\xd5\x02$\xc6\xf8\xf2`y\xd2\xa6\xf3\xce[;\r\xb6y\xd4\xa5\x96y$}{!r\xc1\xfb@\x1e<\xd9\xa0\xe6\x7f\xf1\x17\xe5\x0c\x8e\xbd\xf3\xba" | ||
|
||
|
||
class TestYkSigner: | ||
"""Test YkSigner""" | ||
|
||
def test_fake_yk(self, monkeypatch): | ||
"""Test public key export and signing with fake Yubikey.""" | ||
monkeypatch.setattr("taf.tuf.keys.export_piv_pub_key", lambda **kw: _PUB) | ||
monkeypatch.setattr("taf.tuf.keys.sign_piv_rsa_pkcs1v15", lambda *a, **kw: _SIG) | ||
|
||
key = YkSigner.import_() | ||
signer = YkSigner(key, lambda sec: None) | ||
|
||
sig = signer.sign(_DATA) | ||
key.verify_signature(sig, _DATA) | ||
with pytest.raises(UnverifiedSignatureError): | ||
key.verify_signature(sig, _NOT_DATA) | ||
|
||
@pytest.mark.skipif( | ||
not os.environ.get("REAL_YK"), | ||
reason="Run test with REAL_YK=1 (test will prompt for pin)", | ||
) | ||
def test_real_yk(self): | ||
"""Test public key export and signing with real Yubikey.""" | ||
from getpass import getpass | ||
|
||
def sec_handler(secret_name: str) -> str: | ||
return getpass(f"Enter {secret_name}: ") | ||
|
||
key = YkSigner.import_() | ||
signer = YkSigner(key, sec_handler) | ||
|
||
sig = signer.sign(_DATA) | ||
key.verify_signature(sig, _DATA) | ||
with pytest.raises(UnverifiedSignatureError): | ||
key.verify_signature(sig, _NOT_DATA) |
Oops, something went wrong.