Skip to content

Commit

Permalink
Add Ghidra import for globals (data) (#71)
Browse files Browse the repository at this point in the history
* Implement importing globals, improve duplicate type handling
---------

Co-authored-by: jonschz <[email protected]>
  • Loading branch information
jonschz and jonschz authored Jan 15, 2025
1 parent aafe134 commit 9d9901c
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 87 deletions.
101 changes: 61 additions & 40 deletions reccmp/ghidra_scripts/import_functions_and_types_from_pdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import logging
from pathlib import Path
import traceback
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable
from functools import partial

if TYPE_CHECKING:
from reccmp.ghidra_scripts.lego_util.headers import * # pylint: disable=wildcard-import # these are just for headers
Expand Down Expand Up @@ -172,6 +173,26 @@ def import_function_into_ghidra(
GLOBALS.statistics.functions_changed += 1


def do_with_error_handling(step_name: str, action: Callable[[], None]):
try:
action()
GLOBALS.statistics.successes += 1
except Lego1Exception as e:
log_and_track_failure(step_name, e)
except RuntimeError as e:
cause = e.args[0]
if CancelledException is not None and isinstance(cause, CancelledException):
# let Ghidra's CancelledException pass through
logging.critical("Import aborted by the user.")
return

log_and_track_failure(step_name, cause, unexpected=True)
logger.error(traceback.format_exc())
except Exception as e: # pylint: disable=broad-exception-caught
log_and_track_failure(step_name, e, unexpected=True)
logger.error(traceback.format_exc())


def do_execute_import(
extraction: "PdbFunctionExtractor",
ignore_types: set[str],
Expand All @@ -188,36 +209,31 @@ def do_execute_import(
# pylint: disable=possibly-used-before-assignment
type_importer = PdbTypeImporter(api, extraction, ignore_types=ignore_types)

logger.info("Importing functions...")
logger.info("Importing globals...")
for glob in extraction.compare.get_variables():
do_with_error_handling(
glob.name or hex(glob.orig_addr),
partial(
import_global_into_ghidra, api, extraction.compare, type_importer, glob
),
)

logger.info("Importing functions...")
for pdb_func in pdb_functions:
func_name = pdb_func.match_info.name
orig_addr = pdb_func.match_info.orig_addr
try:
if orig_addr in ignore_functions:
logger.info(
"Skipping function '%s' at '%s' because it is on the ignore list",
func_name,
hex(orig_addr),
)
continue

import_function_into_ghidra(api, pdb_func, type_importer)
GLOBALS.statistics.successes += 1
except Lego1Exception as e:
log_and_track_failure(func_name, e)
except RuntimeError as e:
cause = e.args[0]
if CancelledException is not None and isinstance(cause, CancelledException):
# let Ghidra's CancelledException pass through
logging.critical("Import aborted by the user.")
return

log_and_track_failure(func_name, cause, unexpected=True)
logger.error(traceback.format_exc())
except Exception as e: # pylint: disable=broad-exception-caught
log_and_track_failure(func_name, e, unexpected=True)
logger.error(traceback.format_exc())
if orig_addr in ignore_functions:
logger.info(
"Skipping function '%s' at '%s' because it is on the ignore list",
func_name,
hex(orig_addr),
)
continue

do_with_error_handling(
func_name or hex(orig_addr),
partial(import_function_into_ghidra, api, pdb_func, type_importer),
)

logger.info("Finished importing functions.")

Expand All @@ -227,12 +243,12 @@ def do_execute_import(


def log_and_track_failure(
function_name: str | None, error: Exception, unexpected: bool = False
step_name: str | None, error: Exception, unexpected: bool = False
):
if GLOBALS.statistics.track_failure_and_tell_if_new(error):
logger.error(
"%s(): %s%s",
function_name,
"%s: %s%s",
step_name,
"Unexpected error: " if unexpected else "",
error,
)
Expand Down Expand Up @@ -356,29 +372,34 @@ def main():

reload_module("reccmp.isledecomp.compare.db")

reload_module("lego_util.exceptions")
reload_module("reccmp.ghidra_scripts.lego_util.exceptions")
from reccmp.ghidra_scripts.lego_util.exceptions import Lego1Exception

reload_module("lego_util.pdb_extraction")
reload_module("reccmp.ghidra_scripts.lego_util.pdb_extraction")
from reccmp.ghidra_scripts.lego_util.pdb_extraction import (
PdbFunctionExtractor,
PdbFunction,
)

reload_module("lego_util.vtable_importer")
from reccmp.ghidra_scripts.lego_util.vtable_importer import (
import_vftables_into_ghidra,
)

if GLOBALS.running_from_ghidra:
reload_module("lego_util.ghidra_helper")
reload_module("reccmp.ghidra_scripts.lego_util.ghidra_helper")

reload_module("reccmp.ghidra_scripts.lego_util.vtable_importer")
from reccmp.ghidra_scripts.lego_util.vtable_importer import (
import_vftables_into_ghidra,
)

reload_module("reccmp.ghidra_scripts.lego_util.globals_importer")
from reccmp.ghidra_scripts.lego_util.globals_importer import (
import_global_into_ghidra,
)

reload_module("lego_util.function_importer")
reload_module("reccmp.ghidra_scripts.lego_util.function_importer")
from reccmp.ghidra_scripts.lego_util.function_importer import (
PdbFunctionImporter,
)

reload_module("lego_util.type_importer")
reload_module("reccmp.ghidra_scripts.lego_util.type_importer")
from reccmp.ghidra_scripts.lego_util.type_importer import PdbTypeImporter

if __name__ == "__main__":
Expand Down
19 changes: 16 additions & 3 deletions reccmp/ghidra_scripts/lego_util/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
from typing import TYPE_CHECKING

# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false


if TYPE_CHECKING:
from ghidra.program.model.data import DataType


class Lego1Exception(Exception):
"""
Our own base class for exceptions.
Expand Down Expand Up @@ -32,10 +42,13 @@ def __str__(self):


class MultipleTypesFoundInGhidraError(Lego1Exception):
def __init__(self, name: str, results: list["DataType"]):
super().__init__(name, results)
self.name = name
self.results = results

def __str__(self):
return (
f"Found multiple types matching '{self.args[0]}' in Ghidra: {self.args[1]}"
)
return f"Found multiple types matching '{self.name}' in Ghidra: {self.results}"


class StackOffsetMismatchError(Lego1Exception):
Expand Down
30 changes: 29 additions & 1 deletion reccmp/ghidra_scripts/lego_util/ghidra_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from ghidra.program.flatapi import FlatProgramAPI
from ghidra.program.model.data import DataType, DataTypeConflictHandler, PointerDataType
from ghidra.program.model.symbol import Namespace
from ghidra.program.model.symbol import Namespace, SourceType

from .exceptions import (
ClassOrNamespaceNotFoundInGhidraError,
Expand Down Expand Up @@ -158,3 +158,31 @@ def get_namespace_and_name(api: FlatProgramAPI, name: str) -> tuple[Namespace, s
name = colon_split.pop()
namespace = get_or_create_namespace(api, "::".join(colon_split))
return namespace, name


def set_ghidra_label(api: FlatProgramAPI, address: int, label_with_namespace: str):
namespace, name = get_namespace_and_name(api, label_with_namespace)
symbol_table = api.getCurrentProgram().getSymbolTable()
address_hex = hex(address)
address_ghidra = api.getAddressFactory().getAddress(address_hex)
existing_label = symbol_table.getPrimarySymbol(address_ghidra)
if existing_label is not None:
existing_label_name = existing_label.getName()
if (
existing_label.getParentNamespace() == namespace
and existing_label_name == name
):
logger.debug(
"Label '%s' at 0x%s already exists", label_with_namespace, address_hex
)
else:
logger.debug(
"Changing label at %s from '%s' to '%s'",
address_hex,
existing_label_name,
label_with_namespace,
)
existing_label.setNameAndNamespace(name, namespace, SourceType.USER_DEFINED)
else:
logger.debug("Adding label '%s' at 0x%s", name, address_hex)
symbol_table.createLabel(address_ghidra, name, SourceType.USER_DEFINED)
66 changes: 66 additions & 0 deletions reccmp/ghidra_scripts/lego_util/globals_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# This file can only be imported successfully when run from Ghidra using Ghidrathon.

# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false

import logging

from ghidra.program.flatapi import FlatProgramAPI

from reccmp.isledecomp.compare.core import Compare
from reccmp.isledecomp.compare.db import ReccmpMatch

from .exceptions import Lego1Exception
from .type_importer import PdbTypeImporter
from .ghidra_helper import set_ghidra_label


logger = logging.getLogger(__name__)


def import_global_into_ghidra(
api: FlatProgramAPI,
compare: Compare,
type_importer: PdbTypeImporter,
glob: ReccmpMatch,
):
node = next(
(y for y in compare.cvdump_analysis.nodes if y.addr == glob.recomp_addr),
None,
)
if node is None:
# should never happen
raise Lego1Exception(
f"Failed to find node for {glob.name} at LEGO1 0x{glob.orig_addr:x}"
)

name = node.friendly_name or node.decorated_name
assert name is not None, "node.decorated_name must not be None"

logger.info("Handling global at %s: '%s'", hex(glob.orig_addr), name)
if node.data_type is not None:
data_type = type_importer.import_pdb_type_into_ghidra(node.data_type.key)
address_ghidra = api.getAddressFactory().getAddress(hex(glob.orig_addr))

existing_data = api.getDataAt(address_ghidra)
if existing_data is not None:
api.removeData(existing_data)

data_end = glob.orig_addr + data_type.getLength()

while True:
# Clear conflicting data (usually auto-generated by Ghidra)
next_data_entry = api.getDataAfter(address_ghidra)
if next_data_entry is None:
break
next_data_address = int(next_data_entry.getAddress().getOffset())
if next_data_address >= data_end:
break
logger.debug("Clearing conflicting data at %s", hex(next_data_address))
api.removeData(next_data_entry)

api.createData(address_ghidra, data_type)
else:
logger.debug("No datatype for variable '%s', adding label only", name)

set_ghidra_label(api, glob.orig_addr, name)
4 changes: 2 additions & 2 deletions reccmp/ghidra_scripts/lego_util/pdb_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from reccmp.isledecomp.formats.exceptions import InvalidVirtualAddressError
from reccmp.isledecomp.cvdump.symbols import SymbolsEntry
from reccmp.isledecomp.compare import Compare as IsleCompare
from reccmp.isledecomp.compare import Compare
from reccmp.isledecomp.compare.db import ReccmpMatch

logger = logging.getLogger(__file__)
Expand Down Expand Up @@ -54,7 +54,7 @@ class PdbFunctionExtractor:
and prepares the data for the import in Ghidra.
"""

def __init__(self, compare: IsleCompare):
def __init__(self, compare: Compare):
self.compare = compare

scalar_type_regex = re.compile(r"t_(?P<typename>\w+)(?:\((?P<type_id>\d+)\))?")
Expand Down
32 changes: 25 additions & 7 deletions reccmp/ghidra_scripts/lego_util/type_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from reccmp.isledecomp.cvdump.types import VirtualBasePointer

from .exceptions import (
MultipleTypesFoundInGhidraError,
TypeNotFoundError,
TypeNotFoundInGhidraError,
TypeNotImplementedError,
Expand Down Expand Up @@ -496,6 +497,9 @@ def _get_or_create_data_type(
Note that the return value of `addDataType()` is not the same instance as the input
even if there is no name collision.
"""

data_type_manager = self.api.getCurrentProgram().getDataTypeManager()

try:
data_type = get_ghidra_type(self.api, type_name)
logger.debug(
Expand All @@ -505,16 +509,30 @@ def _get_or_create_data_type(
data_type.getCategoryPath(),
)
except TypeNotFoundInGhidraError:
data_type = (
self.api.getCurrentProgram()
.getDataTypeManager()
.addDataType(
new_instance_callback(), DataTypeConflictHandler.KEEP_HANDLER
)
logger.info(
"Creating new %s data type %s",
readable_name_of_type_category,
type_name,
)
data_type = data_type_manager.addDataType(
new_instance_callback(), DataTypeConflictHandler.KEEP_HANDLER
)
except MultipleTypesFoundInGhidraError as e:
logger.error(
"Found multiple existing types matching '%s'. Deleting all of them and trying to recreate..."
)
for result in e.results:
logger.info("Deleting data type '%s'", result.getPathName())
data_type_manager.remove(result, ConsoleTaskMonitor())
logger.info(
"Created new %s data type %s", readable_name_of_type_category, type_name
"(Re)creating new %s data type '%s'",
readable_name_of_type_category,
type_name,
)
data_type = data_type_manager.addDataType(
new_instance_callback(), DataTypeConflictHandler.KEEP_HANDLER
)

assert isinstance(
data_type, expected_type
), f"Found existing type named {type_name} that is not a {readable_name_of_type_category}"
Expand Down
Loading

0 comments on commit 9d9901c

Please sign in to comment.