Skip to content

Commit

Permalink
refactor: use WrappedIndexDB instead of
Browse files Browse the repository at this point in the history
  • Loading branch information
lxndrblz committed Jan 16, 2024
1 parent fab3913 commit 36c7fa5
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 202 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies = [
"dataclasses-json",
"pause",
"pyautogui",
# "pywinauto"
"pywinauto"
]

dynamic = ["version"]
Expand Down
245 changes: 44 additions & 201 deletions src/forensicsim/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,19 @@

import io
import json
from collections.abc import Iterator
import os
from pathlib import Path
from typing import Any, Optional, Union

from chromedb import (
ccl_blink_value_deserializer,
ccl_chromium_indexeddb,
ccl_chromium_localstorage,
ccl_chromium_sessionstorage,
ccl_leveldb,
ccl_v8_value_deserializer,
)
from chromedb.ccl_chromium_indexeddb import (
DatabaseMetadata,
DatabaseMetadataType,
GlobalMetadata,
ObjectStoreMetadata,
ObjectStoreMetadataType,
)

TEAMS_DB_OBJECT_STORES = ["replychains", "conversations", "people", "buddylist"]

ENCODING = "iso-8859-1"

"""
The following code is heavily adopted from the RawLevelDb and IndexedDB processing proposed by CCL Group
Expand All @@ -56,195 +47,38 @@
Additionally, it has a flag to filter for datastores, which are interesting for us.
"""

def parse_db(filepath, do_not_filter=False):
# Open raw access to a LevelDB and deserialize the records.
wrapper = ccl_chromium_indexeddb.WrappedIndexDB(filepath)

class FastIndexedDB:
def __init__(self, leveldb_dir: Path):
self._db = ccl_leveldb.RawLevelDb(leveldb_dir)
self._fetched_records: list[ccl_leveldb.Record] = []
self.global_metadata: GlobalMetadata
self.database_metadata: DatabaseMetadata
self.object_store_meta: ObjectStoreMetadata
self.fetch_data()

def fetch_data(self) -> None:
global_metadata_raw: dict[bytes, ccl_leveldb.Records] = {}
database_metadata_raw: dict[tuple, Any] = {}
objectstore_metadata_raw: dict[tuple, Any] = {}

self._fetched_records.clear()

# Fetch the records only once
for record in self._db.iterate_records_raw():
self._fetched_records.append(record)

for record in self._fetched_records:
# Global Metadata
if (
record.key.startswith(b"\x00\x00\x00\x00")
and record.state == ccl_leveldb.KeyState.Live
) and (
record.key not in global_metadata_raw
or global_metadata_raw[record.key].seq < record.seq
):
global_metadata_raw[record.key] = record

# Convert the raw metadata to a nice GlobalMetadata Object
global_metadata = ccl_chromium_indexeddb.GlobalMetadata(global_metadata_raw)

# Loop through the database IDs
for db_id in global_metadata.db_ids:
if db_id.dbid_no == None:
continue

if db_id.dbid_no > 0x7F:
raise NotImplementedError(
"there could be this many dbs, but I don't support it yet"
)

# Database keys end with 0
prefix_database = bytes([0, db_id.dbid_no, 0, 0])

# Objetstore keys end with 50
prefix_objectstore = bytes([0, db_id.dbid_no, 0, 0, 50])

for record in reversed(self._fetched_records):
if (
record.key.startswith(prefix_database)
and record.state == ccl_leveldb.KeyState.Live
):
# we only want live keys and the newest version thereof (highest seq)
meta_type = record.key[len(prefix_database)]
old_version = database_metadata_raw.get((db_id.dbid_no, meta_type))
if old_version is None or old_version.seq < record.seq:
database_metadata_raw[(db_id.dbid_no, meta_type)] = record
if (
record.key.startswith(prefix_objectstore)
and record.state == ccl_leveldb.KeyState.Live
):
# we only want live keys and the newest version thereof (highest seq)
try:
(
objstore_id,
varint_raw,
) = ccl_chromium_indexeddb.le_varint_from_bytes(
record.key[len(prefix_objectstore) :]
)
except TypeError:
continue

meta_type = record.key[len(prefix_objectstore) + len(varint_raw)]

old_version = objectstore_metadata_raw.get((
db_id.dbid_no,
objstore_id,
meta_type,
))

if old_version is None or old_version.seq < record.seq:
objectstore_metadata_raw[
(db_id.dbid_no, objstore_id, meta_type)
] = record

self.global_metadata = global_metadata
self.database_metadata = ccl_chromium_indexeddb.DatabaseMetadata(
database_metadata_raw
)
self.object_store_meta = ccl_chromium_indexeddb.ObjectStoreMetadata(
objectstore_metadata_raw
)
extracted_values = []

def get_database_metadata(
self, db_id: int, meta_type: DatabaseMetadataType
) -> Optional[Union[str, int]]:
return self.database_metadata.get_meta(db_id, meta_type)
for db_info in wrapper.database_ids:
# Skip databases without a valid dbid_no
if db_info.dbid_no is None:
continue

def get_object_store_metadata(
self, db_id: int, obj_store_id: int, meta_type: ObjectStoreMetadataType
) -> Optional[Any]:
return self.object_store_meta.get_meta(db_id, obj_store_id, meta_type)
db = wrapper[db_info.dbid_no]

def __iter__(self) -> Iterator[dict[str, Any]]:
blink_deserializer = ccl_blink_value_deserializer.BlinkV8Deserializer()
# Loop through the databases and object stores based on their ids
for global_id in self.global_metadata.db_ids:
# print(f"Processing database: {global_id.name}")
if global_id.dbid_no == None:
print(f"WARNING: Skipping database {global_id.name}")
for obj_store_name in db.object_store_names:
# Skip empty object stores
if obj_store_name is None:
continue

max_object_stores = self.database_metadata.get_meta(
global_id.dbid_no, DatabaseMetadataType.MaximumObjectStoreId
)
max_object_stores = max_object_stores if max_object_stores else 0

for object_store_id in range(
1,
max_object_stores + 1,
):
datastore = self.object_store_meta.get_meta(
global_id.dbid_no,
object_store_id,
ObjectStoreMetadataType.StoreName,
)

# print(f"\t Processing object store: {datastore}")
if datastore in TEAMS_DB_OBJECT_STORES:
prefix = bytes([0, global_id.dbid_no, object_store_id, 1])
for record in self._fetched_records:
if record.key.startswith(prefix):
# Skip records with empty values as these cant properly decoded
if record.value == b"":
continue
(
_value_version,
varint_raw,
) = ccl_chromium_indexeddb.le_varint_from_bytes(
record.value
)
val_idx = len(varint_raw)
# read the blink envelope
blink_type_tag = record.value[val_idx]
if blink_type_tag != 0xFF:
print("Blink type tag not present")
val_idx += 1

(
_,
varint_raw,
) = ccl_chromium_indexeddb.le_varint_from_bytes(
record.value[val_idx:]
)

val_idx += len(varint_raw)

# read the raw value of the record.
obj_raw = io.BytesIO(record.value[val_idx:])
try:
# Initialize deserializer and try deserialization.
deserializer = ccl_v8_value_deserializer.Deserializer(
obj_raw,
host_object_delegate=blink_deserializer.read,
)
value = deserializer.read()
yield {
"key": record.key,
"value": value,
"origin_file": record.origin_file,
"store": datastore,
"state": record.state,
"seq": record.seq,
}
except Exception:
# TODO Some proper error handling wouldn't hurt
continue


def parse_db(filepath: Path) -> list[dict]:
db = FastIndexedDB(filepath)
return list(db)
if obj_store_name in TEAMS_DB_OBJECT_STORES or do_not_filter:
obj_store = db[obj_store_name]
records_per_object_store = 0
for record in obj_store.iterate_records():
records_per_object_store += 1
sourcefile = str(filepath)
# TODO: Replace None values with actual values
state = None
seq = None
extracted_values.append({"key": record.key.raw_key, "value": record.value, "origin_file": sourcefile, "store": obj_store_name, "state": state, "seq": seq})
print(f"{obj_store_name} {db.name} (Records: {records_per_object_store})")
return extracted_values


def parse_localstorage(filepath: Path) -> list[dict]:
def parse_localstorage(filepath):
local_store = ccl_chromium_localstorage.LocalStoreDb(filepath)
extracted_values = []
for record in local_store.iter_all_records():
Expand All @@ -255,7 +89,7 @@ def parse_localstorage(filepath: Path) -> list[dict]:
return extracted_values


def parse_sessionstorage(filepath: Path) -> list[dict]:
def parse_sessionstorage(filepath):
session_storage = ccl_chromium_sessionstorage.SessionStoreDb(filepath)
extracted_values = []
for host in session_storage:
Expand All @@ -276,12 +110,21 @@ def parse_sessionstorage(filepath: Path) -> list[dict]:
return extracted_values


def write_results_to_json(data: object, outputpath: Path) -> None:
with outputpath.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=4, sort_keys=True, default=str, ensure_ascii=False)
def write_results_to_json(data, outputpath):
# Dump messages into a json file
try:
with open(outputpath, "w", encoding="utf-8") as f:
json.dump(
data, f, indent=4, sort_keys=True, default=str, ensure_ascii=False
)
except OSError as e:
print(e)


def parse_json() -> Any:
def parse_json():
# read data from a file. This is only for testing purpose.
with Path("teams.json").open() as json_file:
return json.load(json_file)
try:
with Path("teams.json").open() as json_file:
return json.load(json_file)
except OSError as e:
print(e)

0 comments on commit 36c7fa5

Please sign in to comment.