Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vmray: loosen file checks to enable processing of additional file types #2571

Merged
merged 9 commits into from
Jan 23, 2025
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
- vmray: load more analysis archives @mr-tz
- dynamic: only check file limitations for static file formats @mr-tz
- vmray: skip non-printable strings @mike-hunhoff
- strings: add type hints and fix uncovered bugs @williballenthin @2555
mike-hunhoff marked this conversation as resolved.
Show resolved Hide resolved
- strings: add type hints and fix uncovered bugs @williballenthin #2555
- elffile: handle symbols without a name @williballenthin #2553
- vmray: loosen file checks to enable processing more file types @mike-hunhoff #2571

### capa Explorer Web

Expand Down
126 changes: 76 additions & 50 deletions capa/features/extractors/vmray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def __init__(self, zipfile_path: Path):
self.sv2 = SummaryV2.model_validate_json(
self.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD)
)
self.file_type: str = self.sv2.analysis_metadata.sample_type
self.submission_type: str = self.sv2.analysis_metadata.sample_type
self.submission_name: str = self.sv2.analysis_metadata.submission_filename

# flog.xml contains all of the call information that VMRay captured during execution
flog_xml = self.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
Expand All @@ -80,36 +81,54 @@ def __init__(self, zipfile_path: Path):
# map function calls to their associated monitor thread ID mapped to its associated monitor process ID
self.monitor_process_calls: dict[int, dict[int, list[FunctionCall]]] = defaultdict(lambda: defaultdict(list))

self.base_address: int

self.sample_file_name: Optional[str] = None
self.sample_file_analysis: Optional[File] = None
self.sample_file_static_data: Optional[StaticData] = None
self.submission_base_address: Optional[int] = None
self.submission_sha256: Optional[str] = None
self.submission_meta: Optional[File] = None
self.submission_static: Optional[StaticData] = None

# order matters, call this before attempting the analysis that follows
self._find_sample_file()

# VMRay analysis archives in various shapes and sizes and file type does not definitively tell us what data
# we can expect to find in the archive, so to be explicit we check for the various pieces that we need at
# minimum to run capa analysis
if self.sample_file_name is None or self.sample_file_analysis is None:
raise UnsupportedFormatError("VMRay archive does not contain sample file (file_type: %s)" % self.file_type)

if not self.sample_file_static_data:
raise UnsupportedFormatError("VMRay archive does not contain static data (file_type: %s)" % self.file_type)

if not self.sample_file_static_data.pe and not self.sample_file_static_data.elf:
# something bad must have happened if there is no submission analysis
if self.submission_meta is None:
raise UnsupportedFormatError(
"VMRay feature extractor only supports PE and ELF at this time (file_type: %s)" % self.file_type
"archive does not contain submission analysis (submission_name: %s, submission_type: %s)"
% (self.submission_name, self.submission_type)
)

if self.submission_static is not None:
if self.submission_static.pe is None and self.submission_static.elf is None:
# we only support static analysis for PE and ELF files for now
raise UnsupportedFormatError(
"archive does not contain a supported file format (submission_name: %s, submission_type: %s)"
% (self.submission_name, self.submission_type)
)
else:
# VMRay may not record static analysis for certain file types, e.g. MSI, but we'd still like to match dynamic
# execution so we continue without and accept that the results may be incomplete
logger.warning(
"archive does not contain submission static data analysis, results may be incomplete (submission_name: %s, submission_type: %s)",
self.submission_name,
self.submission_type,
)

# VMRay does not store static strings for the sample file so we must use the source file
# stored in the archive
sample_sha256: str = self.sample_file_analysis.hash_values.sha256.lower()
sample_file_path: str = f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}"
submission_path: str = (
f"internal/static_analyses/{self.submission_sha256}/objects/files/{self.submission_sha256}"
)

logger.debug(
"\nsubmission_name: %s\nsubmission_type: %s\nsubmission_sha256: %s\nsubmission_zip_path: %s",
self.submission_name,
self.submission_type,
self.submission_sha256,
submission_path,
)

logger.debug("file_type: %s, file_path: %s", self.file_type, sample_file_path)
self.submission_bytes: bytes = self.zipfile.read(submission_path, pwd=DEFAULT_ARCHIVE_PASSWORD)

self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD)
logger.debug("submission_bytes: %s", self.submission_bytes[:10])

# do not change order, it matters
self._compute_base_address()
Expand All @@ -121,45 +140,52 @@ def __init__(self, zipfile_path: Path):
self._compute_monitor_process_calls()

def _find_sample_file(self):
for file_name, file_analysis in self.sv2.files.items():
if file_analysis.is_sample:
# target the sample submitted for analysis
self.sample_file_name = file_name
self.sample_file_analysis = file_analysis
logger.debug("searching archive for submission")

# VMRay may mark more than one file as the submission, e.g., when a compound ZIP file is used
# both the ZIP file and embedded target file are marked as submissions. We have yet to find a
# guarenteed way to differentiate which is the actual submission, so we opt to choose the last
# file that is marked as the submission for now
for file_analysis in self.sv2.files.values():
if not file_analysis.is_sample:
continue

self.submission_meta = file_analysis
self.submission_sha256 = self.submission_meta.hash_values.sha256

if file_analysis.ref_static_data:
# like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data
# key for the file's static data
self.sample_file_static_data = self.sv2.static_data[file_analysis.ref_static_data.path[1]]
logger.debug("sha256: %s marked as submission", self.submission_sha256)

break
if file_analysis.ref_static_data is not None:
# like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data
# key for the file's static data
self.submission_static = self.sv2.static_data[file_analysis.ref_static_data.path[1]]

def _compute_base_address(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
self.base_address = self.sample_file_static_data.pe.basic_info.image_base
if self.submission_static is not None:
if self.submission_static.pe:
self.submission_base_address = self.submission_static.pe.basic_info.image_base

def _compute_exports(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for export in self.sample_file_static_data.pe.exports:
self.exports[export.address] = export.api.name
if self.submission_static is not None:
if self.submission_static.pe:
for export in self.submission_static.pe.exports:
self.exports[export.address] = export.api.name

def _compute_imports(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for module in self.sample_file_static_data.pe.imports:
for api in module.apis:
self.imports[api.address] = (module.dll, api.api.name)
if self.submission_static is not None:
if self.submission_static.pe:
for module in self.submission_static.pe.imports:
for api in module.apis:
self.imports[api.address] = (module.dll, api.api.name)

def _compute_sections(self):
assert self.sample_file_static_data is not None
if self.sample_file_static_data.pe:
for pefile_section in self.sample_file_static_data.pe.sections:
self.sections[pefile_section.virtual_address] = pefile_section.name
elif self.sample_file_static_data.elf:
for elffile_section in self.sample_file_static_data.elf.sections:
self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name
if self.submission_static is not None:
if self.submission_static.pe:
for pefile_section in self.submission_static.pe.sections:
self.sections[pefile_section.virtual_address] = pefile_section.name
elif self.submission_static.elf:
for elffile_section in self.submission_static.elf.sections:
self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name

def _compute_monitor_processes(self):
for process in self.sv2.processes.values():
Expand Down
25 changes: 13 additions & 12 deletions capa/features/extractors/vmray/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import capa.features.extractors.vmray.call
import capa.features.extractors.vmray.file
import capa.features.extractors.vmray.global_
from capa.features.common import Feature, Characteristic
from capa.features.common import Feature
from capa.features.address import (
NO_ADDRESS,
Address,
Expand Down Expand Up @@ -56,13 +56,13 @@ def get_formatted_params(params: ParamList) -> list[str]:

class VMRayExtractor(DynamicFeatureExtractor):
def __init__(self, analysis: VMRayAnalysis):
assert analysis.sample_file_analysis is not None
assert analysis.submission_meta is not None

super().__init__(
hashes=SampleHashes(
md5=analysis.sample_file_analysis.hash_values.md5.lower(),
sha1=analysis.sample_file_analysis.hash_values.sha1.lower(),
sha256=analysis.sample_file_analysis.hash_values.sha256.lower(),
md5=analysis.submission_meta.hash_values.md5.lower(),
sha1=analysis.submission_meta.hash_values.sha1.lower(),
sha256=analysis.submission_meta.hash_values.sha256.lower(),
)
)

Expand All @@ -72,8 +72,12 @@ def __init__(self, analysis: VMRayAnalysis):
self.global_features = list(capa.features.extractors.vmray.global_.extract_features(self.analysis))

def get_base_address(self) -> Address:
# value according to the PE header, the actual trace may use a different imagebase
return AbsoluteVirtualAddress(self.analysis.base_address)
# value according to submission file header, the actual trace may use a different imagebase
# value may not exist for certain submission file types, e.g. PS1
if self.analysis.submission_base_address is None:
return NO_ADDRESS
else:
return AbsoluteVirtualAddress(self.analysis.submission_base_address)

def extract_file_features(self) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.vmray.file.extract_features(self.analysis)
Expand Down Expand Up @@ -102,11 +106,8 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield ThreadHandle(address=address, inner=monitor_thread)

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return
# we have not identified thread-specific features for VMRay yet
yield from []
mike-hunhoff marked this conversation as resolved.
Show resolved Hide resolved

def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
for function_call in self.analysis.monitor_process_calls[ph.inner.monitor_id][th.inner.monitor_id]:
Expand Down
3 changes: 2 additions & 1 deletion capa/features/extractors/vmray/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[t


def extract_file_strings(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.common.extract_file_strings(analysis.sample_file_buf)
if analysis.submission_static is not None:
yield from capa.features.extractors.common.extract_file_strings(analysis.submission_bytes)


def extract_features(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
Expand Down
49 changes: 33 additions & 16 deletions capa/features/extractors/vmray/global_.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from capa.features.common import (
OS,
OS_ANY,
ARCH_ANY,
OS_LINUX,
ARCH_I386,
FORMAT_PE,
Expand All @@ -35,35 +37,50 @@


def extract_arch(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
file_type: str = analysis.file_type

if "x86-32" in file_type:
if "x86-32" in analysis.submission_type:
yield Arch(ARCH_I386), NO_ADDRESS
elif "x86-64" in file_type:
elif "x86-64" in analysis.submission_type:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
raise ValueError("unrecognized arch from the VMRay report: %s" % file_type)
yield Arch(ARCH_ANY), NO_ADDRESS

logger.debug(
"unrecognized arch for submission (filename: %s, file_type: %s)",
analysis.submission_name,
analysis.submission_type,
)


def extract_format(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
assert analysis.sample_file_static_data is not None
if analysis.sample_file_static_data.pe:
yield Format(FORMAT_PE), NO_ADDRESS
elif analysis.sample_file_static_data.elf:
yield Format(FORMAT_ELF), NO_ADDRESS
if analysis.submission_static is not None:
if analysis.submission_static.pe:
yield Format(FORMAT_PE), NO_ADDRESS
elif analysis.submission_static.elf:
yield Format(FORMAT_ELF), NO_ADDRESS
else:
raise ValueError("unrecognized file format from the VMRay report: %s" % analysis.file_type)
# there is no "FORMAT_ANY" to yield here, but few rules rely on the "format" feature
# so this should be fine for now

logger.debug(
"unrecognized format for submission (filename: %s, file_type: %s)",
analysis.submission_name,
analysis.submission_type,
)

def extract_os(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
file_type: str = analysis.file_type

if "windows" in file_type.lower():
def extract_os(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
if "windows" in analysis.submission_type.lower():
yield OS(OS_WINDOWS), NO_ADDRESS
elif "linux" in file_type.lower():
elif "linux" in analysis.submission_type.lower():
yield OS(OS_LINUX), NO_ADDRESS
else:
raise ValueError("unrecognized OS from the VMRay report: %s" % file_type)
yield OS(OS_ANY), NO_ADDRESS

logger.debug(
"unrecognized os for submission (filename: %s, file_type: %s)",
analysis.submission_name,
analysis.submission_type,
)


def extract_features(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
Expand Down
6 changes: 3 additions & 3 deletions scripts/minimize_vmray_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def main(argv=None):
vmra = VMRayAnalysis(analysis_archive)
sv2_json = vmra.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD)
flog_xml = vmra.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
sample_file_buf = vmra.sample_file_buf
assert vmra.sample_file_analysis is not None
sample_sha256: str = vmra.sample_file_analysis.hash_values.sha256.lower()
sample_file_buf = vmra.submission_bytes
assert vmra.submission_meta is not None
sample_sha256: str = vmra.submission_meta.hash_values.sha256.lower()

new_zip_name = f"{analysis_archive.parent / analysis_archive.stem}_min.zip"
with zipfile.ZipFile(new_zip_name, "w") as new_zip:
Expand Down
8 changes: 8 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,14 @@ def get_data_path_by_name(name) -> Path:
/ "vmray"
/ "2f8a79b12a7a989ac7e5f6ec65050036588a92e65aeb6841e08dc228ff0e21b4_min_archive.zip"
)
elif name.startswith("eb1287-vmray"):
return (
CD
/ "data"
/ "dynamic"
/ "vmray"
/ "eb12873c0ce3e9ea109c2a447956cbd10ca2c3e86936e526b2c6e28764999f21_min_archive.zip"
)
elif name.startswith("ea2876"):
return CD / "data" / "ea2876e9175410b6f6719f80ee44b9553960758c7d0f7bed73c0fe9a78d8e669.dll_"
elif name.startswith("1038a2"):
Expand Down
1 change: 1 addition & 0 deletions tests/test_vmray_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2361", capa.features.insn.API("GetAddrInfoW"), True),
("eb1287-vmray", "process=(4968:0),thread=5992,call=10981", capa.features.insn.API("CreateMutexW"), True),
# call/string argument
(
"93b2d1-vmray",
Expand Down
Loading