From 10b0a875c6903243ecf92b9a02f868437b67b37f Mon Sep 17 00:00:00 2001 From: Markus Bilz Date: Wed, 17 Jan 2024 18:14:23 +0100 Subject: [PATCH] feat: minor refactorings --- src/forensicsim/backend.py | 23 +++++----- src/forensicsim/parser.py | 89 +++++++++++++++++++------------------ tools/Forensicsim_Parser.py | 2 +- tools/main.py | 4 +- 4 files changed, 61 insertions(+), 57 deletions(-) diff --git a/src/forensicsim/backend.py b/src/forensicsim/backend.py index 6533254..3aab92e 100644 --- a/src/forensicsim/backend.py +++ b/src/forensicsim/backend.py @@ -22,22 +22,13 @@ SOFTWARE. """ -import io import json -import os from pathlib import Path from chromedb import ( - ccl_blink_value_deserializer, ccl_chromium_indexeddb, ccl_chromium_localstorage, ccl_chromium_sessionstorage, - ccl_leveldb, - ccl_v8_value_deserializer, -) -from chromedb.ccl_chromium_indexeddb import ( - DatabaseMetadataType, - ObjectStoreMetadataType, ) TEAMS_DB_OBJECT_STORES = ["replychains", "conversations", "people", "buddylist"] @@ -54,6 +45,7 @@ Additionally, it has a flag to filter for datastores, which are interesting for us. """ + def parse_db(filepath, do_not_filter=False): # Open raw access to a LevelDB and deserialize the records. wrapper = ccl_chromium_indexeddb.WrappedIndexDB(filepath) @@ -80,8 +72,17 @@ def parse_db(filepath, do_not_filter=False): # TODO: Fix None values state = None seq = None - extracted_values.append({"key": record.key.raw_key, "value": record.value, "origin_file": sourcefile, "store": obj_store_name, "state": state, "seq": seq}) - print(f"{obj_store_name} {db.name} (Records: {records_per_object_store})") + extracted_values.append({ + "key": record.key.raw_key, + "value": record.value, + "origin_file": sourcefile, + "store": obj_store_name, + "state": state, + "seq": seq, + }) + print( + f"{obj_store_name} {db.name} (Records: {records_per_object_store})" + ) return extracted_values diff --git a/src/forensicsim/parser.py b/src/forensicsim/parser.py index bcfb656..218d391 100644 --- a/src/forensicsim/parser.py +++ b/src/forensicsim/parser.py @@ -3,7 +3,7 @@ from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Any, Optional +from typing import Any, Optional, Union from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning from dataclasses_json import ( @@ -16,15 +16,16 @@ from forensicsim.backend import parse_db, write_results_to_json # Suppress Beautiful Soup warnings -warnings.filterwarnings('ignore', category=MarkupResemblesLocatorWarning) +warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) -def strip_html_tags(value): + +def strip_html_tags(value: str) -> str: # Get the text of any embedded html, such as divs, a href links soup = BeautifulSoup(value, features="html.parser") return soup.get_text() -def decode_dict(properties): +def decode_dict(properties: Union[bytes, str, dict]) -> dict[str, Any]: if isinstance(properties, bytes): soup = BeautifulSoup(properties, features="html.parser") properties = properties.decode(soup.original_encoding) @@ -38,11 +39,11 @@ def decode_dict(properties): return json.loads(properties, strict=False) -def decode_timestamp(content_utf8_encoded) -> datetime: +def decode_timestamp(content_utf8_encoded: str) -> datetime: return datetime.utcfromtimestamp(int(content_utf8_encoded) / 1000) -def encode_timestamp(timestamp) -> Optional[str]: +def encode_timestamp(timestamp: Optional[datetime]) -> Optional[str]: if timestamp is not None: return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f") return None @@ -71,13 +72,17 @@ class Meeting(DataClassJsonMixin): default="meeting", metadata=config(field_name="record_type") ) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: + if not isinstance(other, Meeting): + return NotImplemented return self.cached_deduplication_key == other.cachedDeduplicationKey - def __hash__(self): + def __hash__(self) -> int: return hash(self.cached_deduplication_key) - def __lt__(self, other): + def __lt__(self, other: object) -> bool: + if not isinstance(other, Meeting): + return NotImplemented return self.cached_deduplication_key < other.cached_deduplication_key @@ -119,19 +124,23 @@ class Message(DataClassJsonMixin): default="message", metadata=config(field_name="record_type") ) - def __post_init__(self): + def __post_init__(self) -> None: if self.cached_deduplication_key is None: self.cached_deduplication_key = str(self.creator) + str( self.clientmessageid ) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: + if not isinstance(other, Message): + return NotImplemented return self.cached_deduplication_key == other.cached_deduplication_key - def __hash__(self): + def __hash__(self) -> int: return hash(self.cached_deduplication_key) - def __lt__(self, other): + def __lt__(self, other: object) -> bool: + if not isinstance(other, Message): + return NotImplemented return self.cached_deduplication_key < other.cached_deduplication_key @@ -151,25 +160,24 @@ class Contact(DataClassJsonMixin): default="contact", metadata=config(field_name="record_type") ) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: + if not isinstance(other, Contact): + return NotImplemented return self.mri == other.mri - def __hash__(self): + def __hash__(self) -> int: return hash(self.mri) - def __lt__(self, other): + def __lt__(self, other: object) -> bool: + if not isinstance(other, Contact): + return NotImplemented return self.mri < other.mri def _parse_people(people: list[dict]) -> set[Contact]: parsed_people = set() for p in people: - p |= p.get("value", {}) - p |= {"display_name": p.get("displayName")} - p |= {"email": p.get("email")} - p |= {"mri": p.get("mri")} - p |= {"user_principal_name": p.get("userPrincipalName")} p |= {"origin_file": p.get("origin_file")} parsed_people.add(Contact.from_dict(p)) @@ -181,7 +189,6 @@ def _parse_buddies(buddies: list[dict]) -> set[Contact]: for b in buddies: buddies_of_b = b.get("value", {}).get("buddies", []) for b_of_b in buddies_of_b: - b_of_b |= {"origin_file": b.get("origin_file")} parsed_buddies.add(Contact.from_dict(b_of_b)) return parsed_buddies @@ -190,21 +197,12 @@ def _parse_buddies(buddies: list[dict]) -> set[Contact]: def _parse_conversations(conversations: list[dict]) -> set[Meeting]: cleaned_conversations = set() for c in conversations: - - - if c.get("value", {}).get("type", "") == "Meeting" and "meeting" in c.get("value", {}).get( - "threadProperties", {} - ): - last_message = c.get("value", {}).get("lastMessage", {}) - meeting_properties = c.get("value", {}).get("threadProperties", {}) - c |= c.get("value", {}) - c |= {"client_update_time": c.get("clientUpdateTime")} - c |= {"id": c.get("id")} - c |= {"members": c.get("members")} - c |= {"thread_properties": meeting_properties} - c |= {"client_update_time": c.get("clientUpdateTime")} - c |= {"version": c.get("version")} - c |= {"last_message": last_message} + if c.get("value", {}).get("type", "") == "Meeting" and "meeting" in c.get( + "value", {} + ).get("threadProperties", {}): + c_value = c.get("value", {}) + c |= c_value + c |= {"thread_properties": c_value.get("threadProperties", {})} c |= {"cached_deduplication_key": c.get("id")} cleaned_conversations.add(Meeting.from_dict(c)) return cleaned_conversations @@ -214,9 +212,11 @@ def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]: cleaned_reply_chains = set() for rc in reply_chains: rc |= {"origin_file": rc.get("origin_file")} - + message_dict = {} - if rc.get("value", {}).get("messageMap", {}) or rc.get("value", {}).get("messages", {}): + if rc.get("value", {}).get("messageMap", {}) or rc.get("value", {}).get( + "messages", {} + ): if rc.get("value", {}).get("messageMap", {}): message_dict = rc.get("value", {}).get("messageMap", {}) else: @@ -225,7 +225,10 @@ def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]: for k in message_dict: md = message_dict[k] - if md.get("messageType", "") == "RichText/Html" or md.get("messageType", "") == "Text": + if ( + md.get("messageType", "") == "RichText/Html" + or md.get("messageType", "") == "Text" + ): rc |= rc.get("value", {}) rc |= {"cached_deduplication_key": md.get("dedupeKey")} rc |= {"clientmessageid": md.get("clientMessageId")} @@ -242,7 +245,7 @@ def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]: rc |= {"original_arrival_time": md.get("clientArrivalTime")} rc |= {"version": md.get("version")} rc |= {"properties": md.get("properties")} - + cleaned_reply_chains.add(Message.from_dict(rc)) return cleaned_reply_chains @@ -265,14 +268,14 @@ def parse_records(records: list[dict]) -> list[dict]: # sort within groups i.e., Contacts, Meetings, Conversations parsed_records = ( sorted(_parse_people(people)) - # + sorted(_parse_buddies(buddies)) + # + sorted(_parse_buddies(buddies)) + sorted(_parse_reply_chains(reply_chains)) + sorted(_parse_conversations(conversations)) ) return [r.to_dict() for r in parsed_records] -def process_db(input_path: Path, output_path: Path): +def process_db(input_path: Path, output_path: Path) -> None: if not input_path.parts[-1].endswith(".leveldb"): raise ValueError(f"Expected a leveldb folder. Path: {input_path}") diff --git a/tools/Forensicsim_Parser.py b/tools/Forensicsim_Parser.py index 3b35d94..ce35ec1 100644 --- a/tools/Forensicsim_Parser.py +++ b/tools/Forensicsim_Parser.py @@ -88,7 +88,7 @@ # Factory that defines the name and details of the module and allows Autopsy # to create instances of the modules that will do the analysis. class ForensicIMIngestModuleFactory(IngestModuleFactoryAdapter): - def __init__(self): + def __init__(self) -> None: self.settings = None moduleName = "Microsoft Teams Parser" diff --git a/tools/main.py b/tools/main.py index 10bda5a..71f30e4 100644 --- a/tools/main.py +++ b/tools/main.py @@ -31,7 +31,7 @@ from forensicsim.parser import parse_records -def process_db(input_path: Path, output_path: Path): +def process_db(input_path: Path, output_path: Path) -> None: if not input_path.parts[-1].endswith(".leveldb"): raise ValueError(f"Expected a leveldb folder. Path: {input_path}") @@ -57,7 +57,7 @@ def process_db(input_path: Path, output_path: Path): required=True, help="File path to the processed output.", ) -def process_cmd(filepath, outputpath): +def process_cmd(filepath: Path, outputpath: Path) -> None: click.echo(XTRACT_HEADER) process_db(filepath, outputpath)