From 36c7fa5d76280418a76b8721489431034e430ee7 Mon Sep 17 00:00:00 2001 From: Alexander Bilz Date: Tue, 16 Jan 2024 17:35:09 +0100 Subject: [PATCH] refactor: use WrappedIndexDB instead of --- pyproject.toml | 2 +- src/forensicsim/backend.py | 245 +++++++------------------------------ 2 files changed, 45 insertions(+), 202 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ef39a50..0361102 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "dataclasses-json", "pause", "pyautogui", -# "pywinauto" +"pywinauto" ] dynamic = ["version"] diff --git a/src/forensicsim/backend.py b/src/forensicsim/backend.py index 002dfd3..9df24d2 100644 --- a/src/forensicsim/backend.py +++ b/src/forensicsim/backend.py @@ -24,28 +24,19 @@ import io import json -from collections.abc import Iterator +import os from pathlib import Path -from typing import Any, Optional, Union from chromedb import ( - ccl_blink_value_deserializer, ccl_chromium_indexeddb, ccl_chromium_localstorage, ccl_chromium_sessionstorage, - ccl_leveldb, - ccl_v8_value_deserializer, -) -from chromedb.ccl_chromium_indexeddb import ( - DatabaseMetadata, - DatabaseMetadataType, - GlobalMetadata, - ObjectStoreMetadata, - ObjectStoreMetadataType, ) TEAMS_DB_OBJECT_STORES = ["replychains", "conversations", "people", "buddylist"] +ENCODING = "iso-8859-1" + """ The following code is heavily adopted from the RawLevelDb and IndexedDB processing proposed by CCL Group @@ -56,195 +47,38 @@ Additionally, it has a flag to filter for datastores, which are interesting for us. """ +def parse_db(filepath, do_not_filter=False): + # Open raw access to a LevelDB and deserialize the records. + wrapper = ccl_chromium_indexeddb.WrappedIndexDB(filepath) -class FastIndexedDB: - def __init__(self, leveldb_dir: Path): - self._db = ccl_leveldb.RawLevelDb(leveldb_dir) - self._fetched_records: list[ccl_leveldb.Record] = [] - self.global_metadata: GlobalMetadata - self.database_metadata: DatabaseMetadata - self.object_store_meta: ObjectStoreMetadata - self.fetch_data() - - def fetch_data(self) -> None: - global_metadata_raw: dict[bytes, ccl_leveldb.Records] = {} - database_metadata_raw: dict[tuple, Any] = {} - objectstore_metadata_raw: dict[tuple, Any] = {} - - self._fetched_records.clear() - - # Fetch the records only once - for record in self._db.iterate_records_raw(): - self._fetched_records.append(record) - - for record in self._fetched_records: - # Global Metadata - if ( - record.key.startswith(b"\x00\x00\x00\x00") - and record.state == ccl_leveldb.KeyState.Live - ) and ( - record.key not in global_metadata_raw - or global_metadata_raw[record.key].seq < record.seq - ): - global_metadata_raw[record.key] = record - - # Convert the raw metadata to a nice GlobalMetadata Object - global_metadata = ccl_chromium_indexeddb.GlobalMetadata(global_metadata_raw) - - # Loop through the database IDs - for db_id in global_metadata.db_ids: - if db_id.dbid_no == None: - continue - - if db_id.dbid_no > 0x7F: - raise NotImplementedError( - "there could be this many dbs, but I don't support it yet" - ) - - # Database keys end with 0 - prefix_database = bytes([0, db_id.dbid_no, 0, 0]) - - # Objetstore keys end with 50 - prefix_objectstore = bytes([0, db_id.dbid_no, 0, 0, 50]) - - for record in reversed(self._fetched_records): - if ( - record.key.startswith(prefix_database) - and record.state == ccl_leveldb.KeyState.Live - ): - # we only want live keys and the newest version thereof (highest seq) - meta_type = record.key[len(prefix_database)] - old_version = database_metadata_raw.get((db_id.dbid_no, meta_type)) - if old_version is None or old_version.seq < record.seq: - database_metadata_raw[(db_id.dbid_no, meta_type)] = record - if ( - record.key.startswith(prefix_objectstore) - and record.state == ccl_leveldb.KeyState.Live - ): - # we only want live keys and the newest version thereof (highest seq) - try: - ( - objstore_id, - varint_raw, - ) = ccl_chromium_indexeddb.le_varint_from_bytes( - record.key[len(prefix_objectstore) :] - ) - except TypeError: - continue - - meta_type = record.key[len(prefix_objectstore) + len(varint_raw)] - - old_version = objectstore_metadata_raw.get(( - db_id.dbid_no, - objstore_id, - meta_type, - )) - - if old_version is None or old_version.seq < record.seq: - objectstore_metadata_raw[ - (db_id.dbid_no, objstore_id, meta_type) - ] = record - - self.global_metadata = global_metadata - self.database_metadata = ccl_chromium_indexeddb.DatabaseMetadata( - database_metadata_raw - ) - self.object_store_meta = ccl_chromium_indexeddb.ObjectStoreMetadata( - objectstore_metadata_raw - ) + extracted_values = [] - def get_database_metadata( - self, db_id: int, meta_type: DatabaseMetadataType - ) -> Optional[Union[str, int]]: - return self.database_metadata.get_meta(db_id, meta_type) + for db_info in wrapper.database_ids: + # Skip databases without a valid dbid_no + if db_info.dbid_no is None: + continue - def get_object_store_metadata( - self, db_id: int, obj_store_id: int, meta_type: ObjectStoreMetadataType - ) -> Optional[Any]: - return self.object_store_meta.get_meta(db_id, obj_store_id, meta_type) + db = wrapper[db_info.dbid_no] - def __iter__(self) -> Iterator[dict[str, Any]]: - blink_deserializer = ccl_blink_value_deserializer.BlinkV8Deserializer() - # Loop through the databases and object stores based on their ids - for global_id in self.global_metadata.db_ids: - # print(f"Processing database: {global_id.name}") - if global_id.dbid_no == None: - print(f"WARNING: Skipping database {global_id.name}") + for obj_store_name in db.object_store_names: + # Skip empty object stores + if obj_store_name is None: continue - - max_object_stores = self.database_metadata.get_meta( - global_id.dbid_no, DatabaseMetadataType.MaximumObjectStoreId - ) - max_object_stores = max_object_stores if max_object_stores else 0 - - for object_store_id in range( - 1, - max_object_stores + 1, - ): - datastore = self.object_store_meta.get_meta( - global_id.dbid_no, - object_store_id, - ObjectStoreMetadataType.StoreName, - ) - - # print(f"\t Processing object store: {datastore}") - if datastore in TEAMS_DB_OBJECT_STORES: - prefix = bytes([0, global_id.dbid_no, object_store_id, 1]) - for record in self._fetched_records: - if record.key.startswith(prefix): - # Skip records with empty values as these cant properly decoded - if record.value == b"": - continue - ( - _value_version, - varint_raw, - ) = ccl_chromium_indexeddb.le_varint_from_bytes( - record.value - ) - val_idx = len(varint_raw) - # read the blink envelope - blink_type_tag = record.value[val_idx] - if blink_type_tag != 0xFF: - print("Blink type tag not present") - val_idx += 1 - - ( - _, - varint_raw, - ) = ccl_chromium_indexeddb.le_varint_from_bytes( - record.value[val_idx:] - ) - - val_idx += len(varint_raw) - - # read the raw value of the record. - obj_raw = io.BytesIO(record.value[val_idx:]) - try: - # Initialize deserializer and try deserialization. - deserializer = ccl_v8_value_deserializer.Deserializer( - obj_raw, - host_object_delegate=blink_deserializer.read, - ) - value = deserializer.read() - yield { - "key": record.key, - "value": value, - "origin_file": record.origin_file, - "store": datastore, - "state": record.state, - "seq": record.seq, - } - except Exception: - # TODO Some proper error handling wouldn't hurt - continue - - -def parse_db(filepath: Path) -> list[dict]: - db = FastIndexedDB(filepath) - return list(db) + if obj_store_name in TEAMS_DB_OBJECT_STORES or do_not_filter: + obj_store = db[obj_store_name] + records_per_object_store = 0 + for record in obj_store.iterate_records(): + records_per_object_store += 1 + sourcefile = str(filepath) + # TODO: Replace None values with actual values + state = None + seq = None + extracted_values.append({"key": record.key.raw_key, "value": record.value, "origin_file": sourcefile, "store": obj_store_name, "state": state, "seq": seq}) + print(f"{obj_store_name} {db.name} (Records: {records_per_object_store})") + return extracted_values -def parse_localstorage(filepath: Path) -> list[dict]: +def parse_localstorage(filepath): local_store = ccl_chromium_localstorage.LocalStoreDb(filepath) extracted_values = [] for record in local_store.iter_all_records(): @@ -255,7 +89,7 @@ def parse_localstorage(filepath: Path) -> list[dict]: return extracted_values -def parse_sessionstorage(filepath: Path) -> list[dict]: +def parse_sessionstorage(filepath): session_storage = ccl_chromium_sessionstorage.SessionStoreDb(filepath) extracted_values = [] for host in session_storage: @@ -276,12 +110,21 @@ def parse_sessionstorage(filepath: Path) -> list[dict]: return extracted_values -def write_results_to_json(data: object, outputpath: Path) -> None: - with outputpath.open("w", encoding="utf-8") as f: - json.dump(data, f, indent=4, sort_keys=True, default=str, ensure_ascii=False) +def write_results_to_json(data, outputpath): + # Dump messages into a json file + try: + with open(outputpath, "w", encoding="utf-8") as f: + json.dump( + data, f, indent=4, sort_keys=True, default=str, ensure_ascii=False + ) + except OSError as e: + print(e) -def parse_json() -> Any: +def parse_json(): # read data from a file. This is only for testing purpose. - with Path("teams.json").open() as json_file: - return json.load(json_file) + try: + with Path("teams.json").open() as json_file: + return json.load(json_file) + except OSError as e: + print(e)