From 1cc1169dafa89f296f9d87bd9a509a86ac404411 Mon Sep 17 00:00:00 2001 From: Quentin Kaiser Date: Fri, 14 Apr 2023 14:07:04 +0200 Subject: [PATCH] feat(reporting): report meta-data information about chunks. Allow handlers to provide a dict value as part of a ValidChunk metadata attribute. That dictionnary can contain any relevant metadata information from the perspective of the handler, but we advise handler writers to report parsed information such as header values. This metadata dict is later reported as part of our ChunkReports and available in the JSON report file if the user requested one. The idea is to expose metadata to further analysis steps through the unblob report. For example, a binary analysis toolkit would read the load address and architecture from a uImage chunk to analyze the file extracted from that chunk with the right settings. A note on the 'as_dict' implementation. The initial idea was to implement it in dissect.cstruct (see https://github.com/fox-it/dissect.cstruct/pull/29), but due to expected changes in the project's API I chose to implement it in unblob so we're not dependent on another project. --- tests/test_report.py | 83 ++++++++++++++--------------- unblob/file_utils.py | 25 ++++++++- unblob/handlers/archive/sevenzip.py | 5 +- unblob/models.py | 4 +- unblob/report.py | 1 + 5 files changed, 72 insertions(+), 46 deletions(-) diff --git a/tests/test_report.py b/tests/test_report.py index d27cd5d1a8..30816dc523 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -86,6 +86,7 @@ def test_simple_conversion(self): end_offset=384, size=384, is_encrypted=False, + metadata={}, extraction_reports=[], ) ) @@ -135,6 +136,7 @@ def test_simple_conversion(self): "handler_name": "zip", "chunk_id": "test_basic_conversion:id", "is_encrypted": False, + "metadata": {}, "size": 384, "start_offset": 0, }, @@ -180,63 +182,58 @@ def test_exotic_command_output(self): json_text = ProcessResult(results=[task_result]).to_json() decoded_report = json.loads(json_text) - assert decoded_report == [ { - "__typename__": "TaskResult", + "task": { + "path": "/nonexistent", + "depth": 0, + "chunk_id": "", + "__typename__": "Task", + }, "reports": [ { - "__typename__": "ChunkReport", + "chunk_id": "test", + "handler_name": "fail", + "start_offset": 0, "end_offset": 256, + "size": 256, + "is_encrypted": False, + "metadata": {}, "extraction_reports": [ { - "__typename__": "ExtractCommandFailedReport", - "command": "dump all bytes", - "exit_code": 1, "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08" + "\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15" + '\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$' + "%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQR" + "STUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~" + "\x7f\udc80\udc81\udc82\udc83\udc84\udc85\udc86" + "\udc87\udc88\udc89\udc8a\udc8b\udc8c\udc8d\udc8e" + "\udc8f\udc90\udc91\udc92\udc93\udc94\udc95\udc96" + "\udc97\udc98\udc99\udc9a\udc9b\udc9c\udc9d\udc9e" + "\udc9f\udca0\udca1\udca2\udca3\udca4\udca5\udca6" + "\udca7\udca8\udca9\udcaa\udcab\udcac\udcad\udcae" + "\udcaf\udcb0\udcb1\udcb2\udcb3\udcb4\udcb5\udcb6" + "\udcb7\udcb8\udcb9\udcba\udcbb\udcbc\udcbd\udcbe" + "\udcbf\udcc0\udcc1\udcc2\udcc3\udcc4\udcc5\udcc6" + "\udcc7\udcc8\udcc9\udcca\udccb\udccc\udccd\udcce" + "\udccf\udcd0\udcd1\udcd2\udcd3\udcd4\udcd5\udcd6" + "\udcd7\udcd8\udcd9\udcda\udcdb\udcdc\udcdd\udcde" + "\udcdf\udce0\udce1\udce2\udce3\udce4\udce5\udce6" + "\udce7\udce8\udce9\udcea\udceb\udcec\udced\udcee" + "\udcef\udcf0\udcf1\udcf2\udcf3\udcf4\udcf5\udcf6" + "\udcf7\udcf8\udcf9\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", "stderr": "stdout is pretty strange ;)", - "stdout": ( - "b'\\x00\\x01\\x02\\x03\\x04\\x05\\x06\\x07" - "\\x08\\t\\n\\x0b\\x0c\\r\\x0e\\x0f" - "\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17" - '\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\\x1f !"#' - "$%&\\'()*+,-./0123456789:;<=>?@AB" - "CDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]^_`a" - "bcdefghijklmnopqrstuvwxyz{|}~\\x7f" - "\\x80\\x81\\x82\\x83\\x84\\x85\\x86\\x87" - "\\x88\\x89\\x8a\\x8b\\x8c\\x8d\\x8e\\x8f" - "\\x90\\x91\\x92\\x93\\x94\\x95\\x96\\x97" - "\\x98\\x99\\x9a\\x9b\\x9c\\x9d\\x9e\\x9f" - "\\xa0\\xa1\\xa2\\xa3\\xa4\\xa5\\xa6\\xa7" - "\\xa8\\xa9\\xaa\\xab\\xac\\xad\\xae\\xaf" - "\\xb0\\xb1\\xb2\\xb3\\xb4\\xb5\\xb6\\xb7" - "\\xb8\\xb9\\xba\\xbb\\xbc\\xbd\\xbe\\xbf" - "\\xc0\\xc1\\xc2\\xc3\\xc4\\xc5\\xc6\\xc7" - "\\xc8\\xc9\\xca\\xcb\\xcc\\xcd\\xce\\xcf" - "\\xd0\\xd1\\xd2\\xd3\\xd4\\xd5\\xd6\\xd7" - "\\xd8\\xd9\\xda\\xdb\\xdc\\xdd\\xde\\xdf" - "\\xe0\\xe1\\xe2\\xe3\\xe4\\xe5\\xe6\\xe7" - "\\xe8\\xe9\\xea\\xeb\\xec\\xed\\xee\\xef" - "\\xf0\\xf1\\xf2\\xf3\\xf4\\xf5\\xf6\\xf7" - "\\xf8\\xf9\\xfa\\xfb\\xfc\\xfd\\xfe\\xff" - "'" - ), + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", } ], - "handler_name": "fail", - "chunk_id": "test", - "is_encrypted": False, - "size": 256, - "start_offset": 0, + "__typename__": "ChunkReport", } ], "subtasks": [], - "task": { - "__typename__": "Task", - "chunk_id": "", - "depth": 0, - "path": "/nonexistent", - }, + "__typename__": "TaskResult", } ] diff --git a/unblob/file_utils.py b/unblob/file_utils.py index 58646522f2..6f2020c533 100644 --- a/unblob/file_utils.py +++ b/unblob/file_utils.py @@ -8,7 +8,7 @@ from pathlib import Path from typing import Iterator, Tuple -from dissect.cstruct import cstruct +from dissect.cstruct import Instance, cstruct from pyperscan import Scan from .logging import format_hex @@ -311,3 +311,26 @@ def read_until_past(file: File, pattern: bytes): return file.tell() if next_byte not in pattern: return file.tell() - 1 + + +def as_dict(obj) -> dict: + """Convert a Python class instance to a dictionary.""" + if isinstance(obj, dict): + return obj + if isinstance(obj, list): + return [as_dict(item) for item in obj] # type: ignore + if isinstance(obj, Instance): + result = {} + for k, v in obj._values.items(): # noqa: SLF001 + result[k] = v + return result + + result = {} + for key, value in obj.__dict__.items(): + if key.startswith("_"): + continue + if isinstance(value, (list, tuple)): + result[key] = [as_dict(item) for item in value] + else: + result[key] = as_dict(value) + return result diff --git a/unblob/handlers/archive/sevenzip.py b/unblob/handlers/archive/sevenzip.py index 040b409293..f8f7a446da 100644 --- a/unblob/handlers/archive/sevenzip.py +++ b/unblob/handlers/archive/sevenzip.py @@ -23,6 +23,7 @@ from structlog import get_logger from unblob.extractors import Command +from unblob.file_utils import as_dict from ...models import File, HexString, StructHandler, ValidChunk @@ -70,4 +71,6 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] # We read the signature header here to get the offset to the header database first_db_header = start_offset + len(header) + header.next_header_offset end_offset = first_db_header + header.next_header_size - return ValidChunk(start_offset=start_offset, end_offset=end_offset) + return ValidChunk( + start_offset=start_offset, end_offset=end_offset, metadata=as_dict(header) + ) diff --git a/unblob/models.py b/unblob/models.py index 2b8431fa73..94f12dea2f 100644 --- a/unblob/models.py +++ b/unblob/models.py @@ -88,6 +88,7 @@ class ValidChunk(Chunk): handler: "Handler" = attr.ib(init=False, eq=False) is_encrypted: bool = attr.ib(default=False) + metadata: dict = attr.ib(default={}) def extract(self, inpath: Path, outdir: Path): if self.is_encrypted: @@ -108,6 +109,7 @@ def as_report(self, extraction_reports: List[Report]) -> ChunkReport: size=self.size, handler_name=self.handler.NAME, is_encrypted=self.is_encrypted, + metadata=self.metadata, extraction_reports=extraction_reports, ) @@ -190,7 +192,7 @@ def default(self, obj): try: return obj.decode() except UnicodeDecodeError: - return str(obj) + return obj.decode("utf-8", errors="surrogateescape") logger.error("JSONEncoder met a non-JSON encodable value", obj=obj) # the usual fail path of custom JSONEncoders is to call the parent and let it fail diff --git a/unblob/report.py b/unblob/report.py index 1b5bed1e71..7c8fc3beb5 100644 --- a/unblob/report.py +++ b/unblob/report.py @@ -181,6 +181,7 @@ class ChunkReport(Report): end_offset: int size: int is_encrypted: bool + metadata: dict = attr.ib(default={}) extraction_reports: List[Report]