diff --git a/CHANGELOG.md b/CHANGELOG.md index e8fa68aeb..f83c595f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,8 +17,9 @@ - vmray: load more analysis archives @mr-tz - dynamic: only check file limitations for static file formats @mr-tz - vmray: skip non-printable strings @mike-hunhoff -- strings: add type hints and fix uncovered bugs @williballenthin @2555 +- strings: add type hints and fix uncovered bugs @williballenthin #2555 - elffile: handle symbols without a name @williballenthin #2553 +- vmray: loosen file checks to enable processing more file types @mike-hunhoff #2571 ### capa Explorer Web diff --git a/capa/features/extractors/vmray/__init__.py b/capa/features/extractors/vmray/__init__.py index 45ad068af..cc67c34cb 100644 --- a/capa/features/extractors/vmray/__init__.py +++ b/capa/features/extractors/vmray/__init__.py @@ -56,7 +56,8 @@ def __init__(self, zipfile_path: Path): self.sv2 = SummaryV2.model_validate_json( self.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD) ) - self.file_type: str = self.sv2.analysis_metadata.sample_type + self.submission_type: str = self.sv2.analysis_metadata.sample_type + self.submission_name: str = self.sv2.analysis_metadata.submission_filename # flog.xml contains all of the call information that VMRay captured during execution flog_xml = self.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD) @@ -80,36 +81,54 @@ def __init__(self, zipfile_path: Path): # map function calls to their associated monitor thread ID mapped to its associated monitor process ID self.monitor_process_calls: dict[int, dict[int, list[FunctionCall]]] = defaultdict(lambda: defaultdict(list)) - self.base_address: int - - self.sample_file_name: Optional[str] = None - self.sample_file_analysis: Optional[File] = None - self.sample_file_static_data: Optional[StaticData] = None + self.submission_base_address: Optional[int] = None + self.submission_sha256: Optional[str] = None + self.submission_meta: Optional[File] = None + self.submission_static: Optional[StaticData] = None + # order matters, call this before attempting the analysis that follows self._find_sample_file() - # VMRay analysis archives in various shapes and sizes and file type does not definitively tell us what data - # we can expect to find in the archive, so to be explicit we check for the various pieces that we need at - # minimum to run capa analysis - if self.sample_file_name is None or self.sample_file_analysis is None: - raise UnsupportedFormatError("VMRay archive does not contain sample file (file_type: %s)" % self.file_type) - - if not self.sample_file_static_data: - raise UnsupportedFormatError("VMRay archive does not contain static data (file_type: %s)" % self.file_type) - - if not self.sample_file_static_data.pe and not self.sample_file_static_data.elf: + # something bad must have happened if there is no submission analysis + if self.submission_meta is None: raise UnsupportedFormatError( - "VMRay feature extractor only supports PE and ELF at this time (file_type: %s)" % self.file_type + "archive does not contain submission analysis (submission_name: %s, submission_type: %s)" + % (self.submission_name, self.submission_type) + ) + + if self.submission_static is not None: + if self.submission_static.pe is None and self.submission_static.elf is None: + # we only support static analysis for PE and ELF files for now + raise UnsupportedFormatError( + "archive does not contain a supported file format (submission_name: %s, submission_type: %s)" + % (self.submission_name, self.submission_type) + ) + else: + # VMRay may not record static analysis for certain file types, e.g. MSI, but we'd still like to match dynamic + # execution so we continue without and accept that the results may be incomplete + logger.warning( + "archive does not contain submission static data analysis, results may be incomplete (submission_name: %s, submission_type: %s)", + self.submission_name, + self.submission_type, ) # VMRay does not store static strings for the sample file so we must use the source file # stored in the archive - sample_sha256: str = self.sample_file_analysis.hash_values.sha256.lower() - sample_file_path: str = f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}" + submission_path: str = ( + f"internal/static_analyses/{self.submission_sha256}/objects/files/{self.submission_sha256}" + ) + + logger.debug( + "\nsubmission_name: %s\nsubmission_type: %s\nsubmission_sha256: %s\nsubmission_zip_path: %s", + self.submission_name, + self.submission_type, + self.submission_sha256, + submission_path, + ) - logger.debug("file_type: %s, file_path: %s", self.file_type, sample_file_path) + self.submission_bytes: bytes = self.zipfile.read(submission_path, pwd=DEFAULT_ARCHIVE_PASSWORD) - self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD) + logger.debug("submission_bytes: %s", self.submission_bytes[:10]) # do not change order, it matters self._compute_base_address() @@ -121,45 +140,52 @@ def __init__(self, zipfile_path: Path): self._compute_monitor_process_calls() def _find_sample_file(self): - for file_name, file_analysis in self.sv2.files.items(): - if file_analysis.is_sample: - # target the sample submitted for analysis - self.sample_file_name = file_name - self.sample_file_analysis = file_analysis + logger.debug("searching archive for submission") + + # VMRay may mark more than one file as the submission, e.g., when a compound ZIP file is used + # both the ZIP file and embedded target file are marked as submissions. We have yet to find a + # guarenteed way to differentiate which is the actual submission, so we opt to choose the last + # file that is marked as the submission for now + for file_analysis in self.sv2.files.values(): + if not file_analysis.is_sample: + continue + + self.submission_meta = file_analysis + self.submission_sha256 = self.submission_meta.hash_values.sha256 - if file_analysis.ref_static_data: - # like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data - # key for the file's static data - self.sample_file_static_data = self.sv2.static_data[file_analysis.ref_static_data.path[1]] + logger.debug("sha256: %s marked as submission", self.submission_sha256) - break + if file_analysis.ref_static_data is not None: + # like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data + # key for the file's static data + self.submission_static = self.sv2.static_data[file_analysis.ref_static_data.path[1]] def _compute_base_address(self): - assert self.sample_file_static_data is not None - if self.sample_file_static_data.pe: - self.base_address = self.sample_file_static_data.pe.basic_info.image_base + if self.submission_static is not None: + if self.submission_static.pe: + self.submission_base_address = self.submission_static.pe.basic_info.image_base def _compute_exports(self): - assert self.sample_file_static_data is not None - if self.sample_file_static_data.pe: - for export in self.sample_file_static_data.pe.exports: - self.exports[export.address] = export.api.name + if self.submission_static is not None: + if self.submission_static.pe: + for export in self.submission_static.pe.exports: + self.exports[export.address] = export.api.name def _compute_imports(self): - assert self.sample_file_static_data is not None - if self.sample_file_static_data.pe: - for module in self.sample_file_static_data.pe.imports: - for api in module.apis: - self.imports[api.address] = (module.dll, api.api.name) + if self.submission_static is not None: + if self.submission_static.pe: + for module in self.submission_static.pe.imports: + for api in module.apis: + self.imports[api.address] = (module.dll, api.api.name) def _compute_sections(self): - assert self.sample_file_static_data is not None - if self.sample_file_static_data.pe: - for pefile_section in self.sample_file_static_data.pe.sections: - self.sections[pefile_section.virtual_address] = pefile_section.name - elif self.sample_file_static_data.elf: - for elffile_section in self.sample_file_static_data.elf.sections: - self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name + if self.submission_static is not None: + if self.submission_static.pe: + for pefile_section in self.submission_static.pe.sections: + self.sections[pefile_section.virtual_address] = pefile_section.name + elif self.submission_static.elf: + for elffile_section in self.submission_static.elf.sections: + self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name def _compute_monitor_processes(self): for process in self.sv2.processes.values(): diff --git a/capa/features/extractors/vmray/extractor.py b/capa/features/extractors/vmray/extractor.py index b540d0658..e1dd197b9 100644 --- a/capa/features/extractors/vmray/extractor.py +++ b/capa/features/extractors/vmray/extractor.py @@ -20,7 +20,7 @@ import capa.features.extractors.vmray.call import capa.features.extractors.vmray.file import capa.features.extractors.vmray.global_ -from capa.features.common import Feature, Characteristic +from capa.features.common import Feature from capa.features.address import ( NO_ADDRESS, Address, @@ -56,13 +56,13 @@ def get_formatted_params(params: ParamList) -> list[str]: class VMRayExtractor(DynamicFeatureExtractor): def __init__(self, analysis: VMRayAnalysis): - assert analysis.sample_file_analysis is not None + assert analysis.submission_meta is not None super().__init__( hashes=SampleHashes( - md5=analysis.sample_file_analysis.hash_values.md5.lower(), - sha1=analysis.sample_file_analysis.hash_values.sha1.lower(), - sha256=analysis.sample_file_analysis.hash_values.sha256.lower(), + md5=analysis.submission_meta.hash_values.md5.lower(), + sha1=analysis.submission_meta.hash_values.sha1.lower(), + sha256=analysis.submission_meta.hash_values.sha256.lower(), ) ) @@ -72,8 +72,12 @@ def __init__(self, analysis: VMRayAnalysis): self.global_features = list(capa.features.extractors.vmray.global_.extract_features(self.analysis)) def get_base_address(self) -> Address: - # value according to the PE header, the actual trace may use a different imagebase - return AbsoluteVirtualAddress(self.analysis.base_address) + # value according to submission file header, the actual trace may use a different imagebase + # value may not exist for certain submission file types, e.g. PS1 + if self.analysis.submission_base_address is None: + return NO_ADDRESS + else: + return AbsoluteVirtualAddress(self.analysis.submission_base_address) def extract_file_features(self) -> Iterator[tuple[Feature, Address]]: yield from capa.features.extractors.vmray.file.extract_features(self.analysis) @@ -102,11 +106,8 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: yield ThreadHandle(address=address, inner=monitor_thread) def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]: - if False: - # force this routine to be a generator, - # but we don't actually have any elements to generate. - yield Characteristic("never"), NO_ADDRESS - return + # we have not identified thread-specific features for VMRay yet + yield from [] def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: for function_call in self.analysis.monitor_process_calls[ph.inner.monitor_id][th.inner.monitor_id]: diff --git a/capa/features/extractors/vmray/file.py b/capa/features/extractors/vmray/file.py index f6348a73c..7a8c91494 100644 --- a/capa/features/extractors/vmray/file.py +++ b/capa/features/extractors/vmray/file.py @@ -67,7 +67,8 @@ def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[t def extract_file_strings(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: - yield from capa.features.extractors.common.extract_file_strings(analysis.sample_file_buf) + if analysis.submission_static is not None: + yield from capa.features.extractors.common.extract_file_strings(analysis.submission_bytes) def extract_features(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: diff --git a/capa/features/extractors/vmray/global_.py b/capa/features/extractors/vmray/global_.py index 2d6517b10..1c4a44b58 100644 --- a/capa/features/extractors/vmray/global_.py +++ b/capa/features/extractors/vmray/global_.py @@ -18,6 +18,8 @@ from capa.features.common import ( OS, + OS_ANY, + ARCH_ANY, OS_LINUX, ARCH_I386, FORMAT_PE, @@ -35,35 +37,50 @@ def extract_arch(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: - file_type: str = analysis.file_type - - if "x86-32" in file_type: + if "x86-32" in analysis.submission_type: yield Arch(ARCH_I386), NO_ADDRESS - elif "x86-64" in file_type: + elif "x86-64" in analysis.submission_type: yield Arch(ARCH_AMD64), NO_ADDRESS else: - raise ValueError("unrecognized arch from the VMRay report: %s" % file_type) + yield Arch(ARCH_ANY), NO_ADDRESS + + logger.debug( + "unrecognized arch for submission (filename: %s, file_type: %s)", + analysis.submission_name, + analysis.submission_type, + ) def extract_format(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: - assert analysis.sample_file_static_data is not None - if analysis.sample_file_static_data.pe: - yield Format(FORMAT_PE), NO_ADDRESS - elif analysis.sample_file_static_data.elf: - yield Format(FORMAT_ELF), NO_ADDRESS + if analysis.submission_static is not None: + if analysis.submission_static.pe: + yield Format(FORMAT_PE), NO_ADDRESS + elif analysis.submission_static.elf: + yield Format(FORMAT_ELF), NO_ADDRESS else: - raise ValueError("unrecognized file format from the VMRay report: %s" % analysis.file_type) + # there is no "FORMAT_ANY" to yield here, but few rules rely on the "format" feature + # so this should be fine for now + logger.debug( + "unrecognized format for submission (filename: %s, file_type: %s)", + analysis.submission_name, + analysis.submission_type, + ) -def extract_os(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: - file_type: str = analysis.file_type - if "windows" in file_type.lower(): +def extract_os(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: + if "windows" in analysis.submission_type.lower(): yield OS(OS_WINDOWS), NO_ADDRESS - elif "linux" in file_type.lower(): + elif "linux" in analysis.submission_type.lower(): yield OS(OS_LINUX), NO_ADDRESS else: - raise ValueError("unrecognized OS from the VMRay report: %s" % file_type) + yield OS(OS_ANY), NO_ADDRESS + + logger.debug( + "unrecognized os for submission (filename: %s, file_type: %s)", + analysis.submission_name, + analysis.submission_type, + ) def extract_features(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]: diff --git a/scripts/minimize_vmray_results.py b/scripts/minimize_vmray_results.py index c88fb0f54..9ffb9139d 100644 --- a/scripts/minimize_vmray_results.py +++ b/scripts/minimize_vmray_results.py @@ -49,9 +49,9 @@ def main(argv=None): vmra = VMRayAnalysis(analysis_archive) sv2_json = vmra.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD) flog_xml = vmra.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD) - sample_file_buf = vmra.sample_file_buf - assert vmra.sample_file_analysis is not None - sample_sha256: str = vmra.sample_file_analysis.hash_values.sha256.lower() + sample_file_buf = vmra.submission_bytes + assert vmra.submission_meta is not None + sample_sha256: str = vmra.submission_meta.hash_values.sha256.lower() new_zip_name = f"{analysis_archive.parent / analysis_archive.stem}_min.zip" with zipfile.ZipFile(new_zip_name, "w") as new_zip: diff --git a/tests/fixtures.py b/tests/fixtures.py index 1c3ebd820..187a5f05f 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -453,6 +453,14 @@ def get_data_path_by_name(name) -> Path: / "vmray" / "2f8a79b12a7a989ac7e5f6ec65050036588a92e65aeb6841e08dc228ff0e21b4_min_archive.zip" ) + elif name.startswith("eb1287-vmray"): + return ( + CD + / "data" + / "dynamic" + / "vmray" + / "eb12873c0ce3e9ea109c2a447956cbd10ca2c3e86936e526b2c6e28764999f21_min_archive.zip" + ) elif name.startswith("ea2876"): return CD / "data" / "ea2876e9175410b6f6719f80ee44b9553960758c7d0f7bed73c0fe9a78d8e669.dll_" elif name.startswith("1038a2"): diff --git a/tests/test_vmray_features.py b/tests/test_vmray_features.py index 8eeb2408d..31e1df906 100644 --- a/tests/test_vmray_features.py +++ b/tests/test_vmray_features.py @@ -35,6 +35,7 @@ ("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("DoesNotExist"), False), # call/api ("93b2d1-vmray", "process=(2176:0),thread=2420,call=2361", capa.features.insn.API("GetAddrInfoW"), True), + ("eb1287-vmray", "process=(4968:0),thread=5992,call=10981", capa.features.insn.API("CreateMutexW"), True), # call/string argument ( "93b2d1-vmray",