diff --git a/tests/test_report.py b/tests/test_report.py index d27cd5d1a8..30816dc523 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -86,6 +86,7 @@ def test_simple_conversion(self): end_offset=384, size=384, is_encrypted=False, + metadata={}, extraction_reports=[], ) ) @@ -135,6 +136,7 @@ def test_simple_conversion(self): "handler_name": "zip", "chunk_id": "test_basic_conversion:id", "is_encrypted": False, + "metadata": {}, "size": 384, "start_offset": 0, }, @@ -180,63 +182,58 @@ def test_exotic_command_output(self): json_text = ProcessResult(results=[task_result]).to_json() decoded_report = json.loads(json_text) - assert decoded_report == [ { - "__typename__": "TaskResult", + "task": { + "path": "/nonexistent", + "depth": 0, + "chunk_id": "", + "__typename__": "Task", + }, "reports": [ { - "__typename__": "ChunkReport", + "chunk_id": "test", + "handler_name": "fail", + "start_offset": 0, "end_offset": 256, + "size": 256, + "is_encrypted": False, + "metadata": {}, "extraction_reports": [ { - "__typename__": "ExtractCommandFailedReport", - "command": "dump all bytes", - "exit_code": 1, "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08" + "\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15" + '\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$' + "%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQR" + "STUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~" + "\x7f\udc80\udc81\udc82\udc83\udc84\udc85\udc86" + "\udc87\udc88\udc89\udc8a\udc8b\udc8c\udc8d\udc8e" + "\udc8f\udc90\udc91\udc92\udc93\udc94\udc95\udc96" + "\udc97\udc98\udc99\udc9a\udc9b\udc9c\udc9d\udc9e" + "\udc9f\udca0\udca1\udca2\udca3\udca4\udca5\udca6" + "\udca7\udca8\udca9\udcaa\udcab\udcac\udcad\udcae" + "\udcaf\udcb0\udcb1\udcb2\udcb3\udcb4\udcb5\udcb6" + "\udcb7\udcb8\udcb9\udcba\udcbb\udcbc\udcbd\udcbe" + "\udcbf\udcc0\udcc1\udcc2\udcc3\udcc4\udcc5\udcc6" + "\udcc7\udcc8\udcc9\udcca\udccb\udccc\udccd\udcce" + "\udccf\udcd0\udcd1\udcd2\udcd3\udcd4\udcd5\udcd6" + "\udcd7\udcd8\udcd9\udcda\udcdb\udcdc\udcdd\udcde" + "\udcdf\udce0\udce1\udce2\udce3\udce4\udce5\udce6" + "\udce7\udce8\udce9\udcea\udceb\udcec\udced\udcee" + "\udcef\udcf0\udcf1\udcf2\udcf3\udcf4\udcf5\udcf6" + "\udcf7\udcf8\udcf9\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", "stderr": "stdout is pretty strange ;)", - "stdout": ( - "b'\\x00\\x01\\x02\\x03\\x04\\x05\\x06\\x07" - "\\x08\\t\\n\\x0b\\x0c\\r\\x0e\\x0f" - "\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17" - '\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\\x1f !"#' - "$%&\\'()*+,-./0123456789:;<=>?@AB" - "CDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]^_`a" - "bcdefghijklmnopqrstuvwxyz{|}~\\x7f" - "\\x80\\x81\\x82\\x83\\x84\\x85\\x86\\x87" - "\\x88\\x89\\x8a\\x8b\\x8c\\x8d\\x8e\\x8f" - "\\x90\\x91\\x92\\x93\\x94\\x95\\x96\\x97" - "\\x98\\x99\\x9a\\x9b\\x9c\\x9d\\x9e\\x9f" - "\\xa0\\xa1\\xa2\\xa3\\xa4\\xa5\\xa6\\xa7" - "\\xa8\\xa9\\xaa\\xab\\xac\\xad\\xae\\xaf" - "\\xb0\\xb1\\xb2\\xb3\\xb4\\xb5\\xb6\\xb7" - "\\xb8\\xb9\\xba\\xbb\\xbc\\xbd\\xbe\\xbf" - "\\xc0\\xc1\\xc2\\xc3\\xc4\\xc5\\xc6\\xc7" - "\\xc8\\xc9\\xca\\xcb\\xcc\\xcd\\xce\\xcf" - "\\xd0\\xd1\\xd2\\xd3\\xd4\\xd5\\xd6\\xd7" - "\\xd8\\xd9\\xda\\xdb\\xdc\\xdd\\xde\\xdf" - "\\xe0\\xe1\\xe2\\xe3\\xe4\\xe5\\xe6\\xe7" - "\\xe8\\xe9\\xea\\xeb\\xec\\xed\\xee\\xef" - "\\xf0\\xf1\\xf2\\xf3\\xf4\\xf5\\xf6\\xf7" - "\\xf8\\xf9\\xfa\\xfb\\xfc\\xfd\\xfe\\xff" - "'" - ), + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", } ], - "handler_name": "fail", - "chunk_id": "test", - "is_encrypted": False, - "size": 256, - "start_offset": 0, + "__typename__": "ChunkReport", } ], "subtasks": [], - "task": { - "__typename__": "Task", - "chunk_id": "", - "depth": 0, - "path": "/nonexistent", - }, + "__typename__": "TaskResult", } ] diff --git a/unblob/file_utils.py b/unblob/file_utils.py index 58646522f2..6f2020c533 100644 --- a/unblob/file_utils.py +++ b/unblob/file_utils.py @@ -8,7 +8,7 @@ from pathlib import Path from typing import Iterator, Tuple -from dissect.cstruct import cstruct +from dissect.cstruct import Instance, cstruct from pyperscan import Scan from .logging import format_hex @@ -311,3 +311,26 @@ def read_until_past(file: File, pattern: bytes): return file.tell() if next_byte not in pattern: return file.tell() - 1 + + +def as_dict(obj) -> dict: + """Convert a Python class instance to a dictionary.""" + if isinstance(obj, dict): + return obj + if isinstance(obj, list): + return [as_dict(item) for item in obj] # type: ignore + if isinstance(obj, Instance): + result = {} + for k, v in obj._values.items(): # noqa: SLF001 + result[k] = v + return result + + result = {} + for key, value in obj.__dict__.items(): + if key.startswith("_"): + continue + if isinstance(value, (list, tuple)): + result[key] = [as_dict(item) for item in value] + else: + result[key] = as_dict(value) + return result diff --git a/unblob/handlers/archive/sevenzip.py b/unblob/handlers/archive/sevenzip.py index 040b409293..f8f7a446da 100644 --- a/unblob/handlers/archive/sevenzip.py +++ b/unblob/handlers/archive/sevenzip.py @@ -23,6 +23,7 @@ from structlog import get_logger from unblob.extractors import Command +from unblob.file_utils import as_dict from ...models import File, HexString, StructHandler, ValidChunk @@ -70,4 +71,6 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] # We read the signature header here to get the offset to the header database first_db_header = start_offset + len(header) + header.next_header_offset end_offset = first_db_header + header.next_header_size - return ValidChunk(start_offset=start_offset, end_offset=end_offset) + return ValidChunk( + start_offset=start_offset, end_offset=end_offset, metadata=as_dict(header) + ) diff --git a/unblob/models.py b/unblob/models.py index 2b8431fa73..94f12dea2f 100644 --- a/unblob/models.py +++ b/unblob/models.py @@ -88,6 +88,7 @@ class ValidChunk(Chunk): handler: "Handler" = attr.ib(init=False, eq=False) is_encrypted: bool = attr.ib(default=False) + metadata: dict = attr.ib(default={}) def extract(self, inpath: Path, outdir: Path): if self.is_encrypted: @@ -108,6 +109,7 @@ def as_report(self, extraction_reports: List[Report]) -> ChunkReport: size=self.size, handler_name=self.handler.NAME, is_encrypted=self.is_encrypted, + metadata=self.metadata, extraction_reports=extraction_reports, ) @@ -190,7 +192,7 @@ def default(self, obj): try: return obj.decode() except UnicodeDecodeError: - return str(obj) + return obj.decode("utf-8", errors="surrogateescape") logger.error("JSONEncoder met a non-JSON encodable value", obj=obj) # the usual fail path of custom JSONEncoders is to call the parent and let it fail diff --git a/unblob/report.py b/unblob/report.py index 1b5bed1e71..7c8fc3beb5 100644 --- a/unblob/report.py +++ b/unblob/report.py @@ -181,6 +181,7 @@ class ChunkReport(Report): end_offset: int size: int is_encrypted: bool + metadata: dict = attr.ib(default={}) extraction_reports: List[Report]