Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test that handlers tolerate a non-empty unknown chunk prefix/suffix #372

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ jobs:
uses: ./.github/actions/setup-git-lfs

- name: Run pytest
timeout-minutes: 4
run: poetry run pytest -vvv
5 changes: 4 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
[flake8]
# We start with a strict setting, this is ranked as A in radon
max-complexity = 8
ignore = E501,W503
ignore =
E501 # line too long
W503 # line break before binary operator
E203 # whitespace before ':' - known black/flake8 incompatibility
exclude =
__init__.py
108 changes: 108 additions & 0 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
- The expected output in the __output__ folder.
"""

import hashlib
import inspect
from collections import Counter
from pathlib import Path
from typing import Type

import attr
import pytest

from unblob import handlers
Expand Down Expand Up @@ -39,6 +42,111 @@ def test_all_handlers(
check_output_is_the_same(output_dir, extraction_config.extract_root)


BLOCK_SHIFTING_PREFIX = bytes([0]) + b"unique unknown prefix" + bytes([0])
PADDING_CHECK_SUFFIX = bytes([0] * 511) + b"unique unknown suffix" + bytes([0])


@pytest.mark.parametrize(
"input_dir, output_dir", gather_integration_tests(TEST_DATA_PATH)
)
@pytest.mark.parametrize(
"prefix, suffix",
[
# pytest.param(b"", b"", id="no-extras"),
pytest.param(BLOCK_SHIFTING_PREFIX, b"", id="block-shifted"),
pytest.param(b"", PADDING_CHECK_SUFFIX, id="padding-check"),
pytest.param(
BLOCK_SHIFTING_PREFIX,
PADDING_CHECK_SUFFIX,
id="block-shifted-padding-check",
),
],
)
def test_all_handlers_chunk_stability(
input_dir: Path,
output_dir: Path,
extraction_config: ExtractionConfig,
tmp_path: Path,
prefix: bytes,
suffix: bytes,
):
"""Test that handlers tolerate a non-empty unknown chunk prefix/suffix"""
altered_input_file = tmp_path / "input_file"

for input_file in input_dir.iterdir():
altered_input_file.write_bytes(prefix + input_file.read_bytes() + suffix)

config = attr.evolve(
extraction_config,
extract_root=extraction_config.extract_root / input_file.name,
)
reports = process_file(config, altered_input_file)
check_result(reports)

check_output_do_not_change_much_due_to_extras(
input_file,
expected=output_dir / (input_file.name + config.extract_suffix),
actual=config.extract_root,
)


def hash_bytes(data: bytes) -> str:
return hashlib.sha512(data).hexdigest()


def hash_dir(root: Path, print=lambda *_: None) -> Counter:
"""Hash all files under an unblob extraction directory :root:, ignoring test prefix/suffix.

Directory structures having the same set of files will result in the same output,
even if the file names are different or the directory structure is different.

Test prefix/suffix is excluded from hash calculation, so that unknown chunks extended
with them will produce the same hash.

Returns: count of each hashes found
"""
hash_counter = Counter()
for path in sorted(root.rglob("*")):
if not path.is_file() or path.name == ".gitkeep":
continue

content = path.read_bytes()
# ignore newly introduced unknown chunk files
if content in (BLOCK_SHIFTING_PREFIX, PADDING_CHECK_SUFFIX):
continue

# remove extras introduced before hashing
if content.startswith(BLOCK_SHIFTING_PREFIX):
content = content[len(BLOCK_SHIFTING_PREFIX) :]
if content.endswith(PADDING_CHECK_SUFFIX):
content = content[: -len(PADDING_CHECK_SUFFIX)]

hash = hash_bytes(content)
hash_counter[hash] += 1
# Output for debugging failures
print(f" {path} =\n {hash}")
return hash_counter


def check_output_do_not_change_much_due_to_extras(
original_input_file: Path, expected: Path, actual: Path
):
print(f"{expected=}")
expected_counts = hash_dir(expected, print=print)
print(f"{actual=}")
actual_counts = hash_dir(actual, print=print)

# original input will show up in the extraction of the modified input due to the extra unknown chunks
# but it might not show up in the expected output if it was a "whole file chunk"
hash_original_input = hash_bytes(original_input_file.read_bytes())
if hash_original_input not in expected_counts:
print(f"Warn: hash of original input: {hash_original_input} not in expected")
assert actual_counts[hash_original_input] <= 1
del actual_counts[hash_original_input]

assert expected_counts == actual_counts


@pytest.mark.parametrize(
"handler",
(pytest.param(handler, id=handler.NAME) for handler in handlers.BUILTIN_HANDLERS),
Expand Down
27 changes: 15 additions & 12 deletions unblob/handlers/filesystem/romfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from unblob.extractor import is_safe_path

from ...file_utils import Endian, InvalidInputFormat, read_until_past, round_up
from ...file_utils import Endian, InvalidInputFormat, round_up
from ...models import Extractor, File, HexString, StructHandler, ValidChunk

logger = get_logger()
Expand All @@ -25,6 +25,7 @@
WORLD_RWX = 0o777
ROMFS_HEADER_SIZE = 512
ROMFS_SIGNATURE = b"-rom1fs-"
PADDING_ALIGNMENT = 1024


@unique
Expand Down Expand Up @@ -361,29 +362,31 @@ class RomFSFSHandler(StructHandler):

def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:

if not valid_checksum(file.read(512)):
checksum = file.read(512)
if not valid_checksum(checksum):
raise InvalidInputFormat("Invalid RomFS checksum.")

file.seek(-512, io.SEEK_CUR)
file.seek(-len(checksum), io.SEEK_CUR)

# Every multi byte value must be in big endian order.
header = self.parse_header(file, Endian.BIG)

# The zero terminated name of the volume, padded to 16 byte boundary.
get_string(file)

# seek filesystem size (number of accessible bytes in this fs)
# from the actual end of the header
file.seek(header.full_size, io.SEEK_CUR)

# Another thing to note is that romfs works on file headers and data
# aligned to 16 byte boundaries, but most hardware devices and the block
# device drivers are unable to cope with smaller than block-sized data.
# To overcome this limitation, the whole size of the file system must be
# padded to an 1024 byte boundary.
read_until_past(file, b"\x00")
end_offset = start_offset + round_up(
len(header) + header.full_size, PADDING_ALIGNMENT
)
fs_size = len(header) + header.full_size
padding_size = round_up(fs_size, PADDING_ALIGNMENT) - fs_size
if padding_size:
file.seek(start_offset + fs_size, io.SEEK_SET)
if file.read(padding_size) != bytes([0]) * padding_size:
raise InvalidInputFormat("Invalid padding.")

return ValidChunk(
start_offset=start_offset,
end_offset=file.tell(),
end_offset=end_offset,
)
2 changes: 1 addition & 1 deletion unblob/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def gather_integration_tests(test_data_path: Path):
@pytest.fixture
def extraction_config(tmp_path: Path):
config = ExtractionConfig(
extract_root=tmp_path,
extract_root=tmp_path / "extract_root",
entropy_depth=0,
keep_extracted_chunks=True,
)
Expand Down