Skip to content

Commit

Permalink
feat: minor refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
KarelZe committed Jan 17, 2024
1 parent d74dfeb commit 10b0a87
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 57 deletions.
23 changes: 12 additions & 11 deletions src/forensicsim/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,13 @@
SOFTWARE.
"""

import io
import json
import os
from pathlib import Path

from chromedb import (
ccl_blink_value_deserializer,
ccl_chromium_indexeddb,
ccl_chromium_localstorage,
ccl_chromium_sessionstorage,
ccl_leveldb,
ccl_v8_value_deserializer,
)
from chromedb.ccl_chromium_indexeddb import (
DatabaseMetadataType,
ObjectStoreMetadataType,
)

TEAMS_DB_OBJECT_STORES = ["replychains", "conversations", "people", "buddylist"]
Expand All @@ -54,6 +45,7 @@
Additionally, it has a flag to filter for datastores, which are interesting for us.
"""


def parse_db(filepath, do_not_filter=False):
# Open raw access to a LevelDB and deserialize the records.
wrapper = ccl_chromium_indexeddb.WrappedIndexDB(filepath)
Expand All @@ -80,8 +72,17 @@ def parse_db(filepath, do_not_filter=False):
# TODO: Fix None values
state = None
seq = None
extracted_values.append({"key": record.key.raw_key, "value": record.value, "origin_file": sourcefile, "store": obj_store_name, "state": state, "seq": seq})
print(f"{obj_store_name} {db.name} (Records: {records_per_object_store})")
extracted_values.append({
"key": record.key.raw_key,
"value": record.value,
"origin_file": sourcefile,
"store": obj_store_name,
"state": state,
"seq": seq,
})
print(
f"{obj_store_name} {db.name} (Records: {records_per_object_store})"
)
return extracted_values


Expand Down
89 changes: 46 additions & 43 deletions src/forensicsim/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from typing import Any, Optional, Union

from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
from dataclasses_json import (
Expand All @@ -16,15 +16,16 @@
from forensicsim.backend import parse_db, write_results_to_json

# Suppress Beautiful Soup warnings
warnings.filterwarnings('ignore', category=MarkupResemblesLocatorWarning)
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)

def strip_html_tags(value):

def strip_html_tags(value: str) -> str:
# Get the text of any embedded html, such as divs, a href links
soup = BeautifulSoup(value, features="html.parser")
return soup.get_text()


def decode_dict(properties):
def decode_dict(properties: Union[bytes, str, dict]) -> dict[str, Any]:
if isinstance(properties, bytes):
soup = BeautifulSoup(properties, features="html.parser")
properties = properties.decode(soup.original_encoding)
Expand All @@ -38,11 +39,11 @@ def decode_dict(properties):
return json.loads(properties, strict=False)


def decode_timestamp(content_utf8_encoded) -> datetime:
def decode_timestamp(content_utf8_encoded: str) -> datetime:
return datetime.utcfromtimestamp(int(content_utf8_encoded) / 1000)


def encode_timestamp(timestamp) -> Optional[str]:
def encode_timestamp(timestamp: Optional[datetime]) -> Optional[str]:
if timestamp is not None:
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")
return None
Expand Down Expand Up @@ -71,13 +72,17 @@ class Meeting(DataClassJsonMixin):
default="meeting", metadata=config(field_name="record_type")
)

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Meeting):
return NotImplemented
return self.cached_deduplication_key == other.cachedDeduplicationKey

def __hash__(self):
def __hash__(self) -> int:
return hash(self.cached_deduplication_key)

def __lt__(self, other):
def __lt__(self, other: object) -> bool:
if not isinstance(other, Meeting):
return NotImplemented
return self.cached_deduplication_key < other.cached_deduplication_key


Expand Down Expand Up @@ -119,19 +124,23 @@ class Message(DataClassJsonMixin):
default="message", metadata=config(field_name="record_type")
)

def __post_init__(self):
def __post_init__(self) -> None:
if self.cached_deduplication_key is None:
self.cached_deduplication_key = str(self.creator) + str(
self.clientmessageid
)

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Message):
return NotImplemented
return self.cached_deduplication_key == other.cached_deduplication_key

def __hash__(self):
def __hash__(self) -> int:
return hash(self.cached_deduplication_key)

def __lt__(self, other):
def __lt__(self, other: object) -> bool:
if not isinstance(other, Message):
return NotImplemented
return self.cached_deduplication_key < other.cached_deduplication_key


Expand All @@ -151,25 +160,24 @@ class Contact(DataClassJsonMixin):
default="contact", metadata=config(field_name="record_type")
)

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Contact):
return NotImplemented
return self.mri == other.mri

def __hash__(self):
def __hash__(self) -> int:
return hash(self.mri)

def __lt__(self, other):
def __lt__(self, other: object) -> bool:
if not isinstance(other, Contact):
return NotImplemented
return self.mri < other.mri


def _parse_people(people: list[dict]) -> set[Contact]:
parsed_people = set()
for p in people:

p |= p.get("value", {})
p |= {"display_name": p.get("displayName")}
p |= {"email": p.get("email")}
p |= {"mri": p.get("mri")}
p |= {"user_principal_name": p.get("userPrincipalName")}
p |= {"origin_file": p.get("origin_file")}

parsed_people.add(Contact.from_dict(p))
Expand All @@ -181,7 +189,6 @@ def _parse_buddies(buddies: list[dict]) -> set[Contact]:
for b in buddies:
buddies_of_b = b.get("value", {}).get("buddies", [])
for b_of_b in buddies_of_b:

b_of_b |= {"origin_file": b.get("origin_file")}
parsed_buddies.add(Contact.from_dict(b_of_b))
return parsed_buddies
Expand All @@ -190,21 +197,12 @@ def _parse_buddies(buddies: list[dict]) -> set[Contact]:
def _parse_conversations(conversations: list[dict]) -> set[Meeting]:
cleaned_conversations = set()
for c in conversations:


if c.get("value", {}).get("type", "") == "Meeting" and "meeting" in c.get("value", {}).get(
"threadProperties", {}
):
last_message = c.get("value", {}).get("lastMessage", {})
meeting_properties = c.get("value", {}).get("threadProperties", {})
c |= c.get("value", {})
c |= {"client_update_time": c.get("clientUpdateTime")}
c |= {"id": c.get("id")}
c |= {"members": c.get("members")}
c |= {"thread_properties": meeting_properties}
c |= {"client_update_time": c.get("clientUpdateTime")}
c |= {"version": c.get("version")}
c |= {"last_message": last_message}
if c.get("value", {}).get("type", "") == "Meeting" and "meeting" in c.get(
"value", {}
).get("threadProperties", {}):
c_value = c.get("value", {})
c |= c_value
c |= {"thread_properties": c_value.get("threadProperties", {})}
c |= {"cached_deduplication_key": c.get("id")}
cleaned_conversations.add(Meeting.from_dict(c))
return cleaned_conversations
Expand All @@ -214,9 +212,11 @@ def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]:
cleaned_reply_chains = set()
for rc in reply_chains:
rc |= {"origin_file": rc.get("origin_file")}

message_dict = {}
if rc.get("value", {}).get("messageMap", {}) or rc.get("value", {}).get("messages", {}):
if rc.get("value", {}).get("messageMap", {}) or rc.get("value", {}).get(
"messages", {}
):
if rc.get("value", {}).get("messageMap", {}):
message_dict = rc.get("value", {}).get("messageMap", {})
else:
Expand All @@ -225,7 +225,10 @@ def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]:
for k in message_dict:
md = message_dict[k]

if md.get("messageType", "") == "RichText/Html" or md.get("messageType", "") == "Text":
if (
md.get("messageType", "") == "RichText/Html"
or md.get("messageType", "") == "Text"
):
rc |= rc.get("value", {})
rc |= {"cached_deduplication_key": md.get("dedupeKey")}
rc |= {"clientmessageid": md.get("clientMessageId")}
Expand All @@ -242,7 +245,7 @@ def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]:
rc |= {"original_arrival_time": md.get("clientArrivalTime")}
rc |= {"version": md.get("version")}
rc |= {"properties": md.get("properties")}

cleaned_reply_chains.add(Message.from_dict(rc))

return cleaned_reply_chains
Expand All @@ -265,14 +268,14 @@ def parse_records(records: list[dict]) -> list[dict]:
# sort within groups i.e., Contacts, Meetings, Conversations
parsed_records = (
sorted(_parse_people(people))
# + sorted(_parse_buddies(buddies))
# + sorted(_parse_buddies(buddies))
+ sorted(_parse_reply_chains(reply_chains))
+ sorted(_parse_conversations(conversations))
)
return [r.to_dict() for r in parsed_records]


def process_db(input_path: Path, output_path: Path):
def process_db(input_path: Path, output_path: Path) -> None:
if not input_path.parts[-1].endswith(".leveldb"):
raise ValueError(f"Expected a leveldb folder. Path: {input_path}")

Expand Down
2 changes: 1 addition & 1 deletion tools/Forensicsim_Parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
# Factory that defines the name and details of the module and allows Autopsy
# to create instances of the modules that will do the analysis.
class ForensicIMIngestModuleFactory(IngestModuleFactoryAdapter):
def __init__(self):
def __init__(self) -> None:
self.settings = None

moduleName = "Microsoft Teams Parser"
Expand Down
4 changes: 2 additions & 2 deletions tools/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from forensicsim.parser import parse_records


def process_db(input_path: Path, output_path: Path):
def process_db(input_path: Path, output_path: Path) -> None:
if not input_path.parts[-1].endswith(".leveldb"):
raise ValueError(f"Expected a leveldb folder. Path: {input_path}")

Expand All @@ -57,7 +57,7 @@ def process_db(input_path: Path, output_path: Path):
required=True,
help="File path to the processed output.",
)
def process_cmd(filepath, outputpath):
def process_cmd(filepath: Path, outputpath: Path) -> None:
click.echo(XTRACT_HEADER)
process_db(filepath, outputpath)

Expand Down

0 comments on commit 10b0a87

Please sign in to comment.