-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdumpfilesNSRL.py
202 lines (170 loc) · 9.64 KB
/
dumpfilesNSRL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# This file is Copyright 2020 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
import ntpath
import hashlib
import os
import re
from typing import List, Tuple, Type, Optional, Generator
from volatility3.framework import interfaces, renderers, exceptions, constants
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints, UnreadableValue
from volatility3.plugins.windows import handles
from volatility3.plugins.windows import pslist
vollog = logging.getLogger(__name__)
FILE_DEVICE_DISK = 0x7
FILE_DEVICE_NETWORK_FILE_SYSTEM = 0x14
EXTENSION_CACHE_MAP = {
"dat": "DataSectionObject",
"img": "ImageSectionObject",
"vacb": "SharedCacheMap",
}
class DumpFilesNSRL(interfaces.plugins.PluginInterface):
"""Dumps cached file contents from Windows memory samples and checks against NSRL."""
_required_framework_version = (2, 0, 0)
_version = (1, 0, 0)
_nsrl_hashes = None # Cache the NSRL hashes for reuse
@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel", description="Windows kernel", architectures=["Intel32", "Intel64"]
),
requirements.IntRequirement(name="pid", description="Process ID to include", optional=True),
requirements.IntRequirement(name="virtaddr", description="Dump _FILE_OBJECT at this virtual address", optional=True),
requirements.IntRequirement(name="physaddr", description="Dump _FILE_OBJECT at this physical address", optional=True),
requirements.StringRequirement(name="filter", description="Dump files matching regular expression FILTER", optional=True),
requirements.BooleanRequirement(name="ignore-case", description="Ignore case in filter match", default=False, optional=True),
requirements.VersionRequirement(name="pslist", component=pslist.PsList, version=(2, 0, 0)),
requirements.VersionRequirement(name="handles", component=handles.Handles, version=(1, 0, 0)),
]
@classmethod
def load_nsrl_hashes(cls):
if cls._nsrl_hashes is None:
nsrl_hashes = {}
nsrl_file_path = "NSRLFile-2023.06.01m-computer.txt-md5.idx"
if not os.path.exists(nsrl_file_path):
raise FileNotFoundError(f"NSRL file not found: {nsrl_file_path}")
with open(nsrl_file_path, 'r', encoding='utf-8', errors='ignore') as nsrl_file:
for line in nsrl_file:
parts = line.strip().split(',')
if len(parts) > 0:
md5_hash = parts[0].lower()
nsrl_hashes[md5_hash] = True
cls._nsrl_hashes = nsrl_hashes
return cls._nsrl_hashes
@classmethod
def is_positive_hit(cls, data, nsrl_hashes):
md5_hash = hashlib.md5(data).hexdigest()
return md5_hash in nsrl_hashes
@classmethod
def dump_file_producer(
cls,
file_object: interfaces.objects.ObjectInterface,
memory_object: interfaces.objects.ObjectInterface,
open_method: Type[interfaces.plugins.FileHandlerInterface],
layer: interfaces.layers.DataLayerInterface,
desired_file_name: str,
) -> Optional[interfaces.plugins.FileHandlerInterface]:
filedata = open_method(desired_file_name)
bytes_written = 0
try:
for memoffset, fileoffset, datasize in memory_object.get_available_pages():
data = layer.read(memoffset, datasize, pad=True)
if data:
bytes_written += len(data)
filedata.seek(fileoffset)
filedata.write(data)
except exceptions.InvalidAddressException:
vollog.debug(f"Unable to dump file at {file_object.vol.offset:#x}")
return None
if not bytes_written:
vollog.debug(f"No data cached for file at {file_object.vol.offset:#x}")
return None
nsrl_hashes = cls.load_nsrl_hashes()
if cls.is_positive_hit(data, nsrl_hashes):
vollog.debug(f"File at {file_object.vol.offset:#x} is a positive NSRL hit")
return None
vollog.debug(f"File dumped at {filedata.preferred_filename}")
return filedata
@classmethod
def process_file_object(
cls,
context: interfaces.context.ContextInterface,
primary_layer_name: str,
open_method: Type[interfaces.plugins.FileHandlerInterface],
file_obj: interfaces.objects.ObjectInterface,
) -> Generator[Tuple, None, None]:
if file_obj.DeviceObject.DeviceType not in [FILE_DEVICE_DISK, FILE_DEVICE_NETWORK_FILE_SYSTEM]:
vollog.log(constants.LOGLEVEL_VVV, f"File object at {file_obj.vol.offset:#x} is not a disk file")
return
memory_layer_name = context.layers[primary_layer_name].config["memory_layer"]
memory_layer = context.layers[memory_layer_name]
primary_layer = context.layers[primary_layer_name]
obj_name = file_obj.file_name_with_device()
dump_parameters = []
for member_name, extension in [("DataSectionObject", "dat"), ("ImageSectionObject", "img")]:
try:
section_obj = getattr(file_obj.SectionObjectPointer, member_name)
control_area = section_obj.dereference().cast("_CONTROL_AREA")
if control_area.is_valid():
dump_parameters.append((control_area, memory_layer, extension))
except exceptions.InvalidAddressException:
vollog.log(constants.LOGLEVEL_VVV, f"{member_name} unavailable for file at {file_obj.vol.offset:#x}")
try:
scm_pointer = file_obj.SectionObjectPointer.SharedCacheMap
shared_cache_map = scm_pointer.dereference().cast("_SHARED_CACHE_MAP")
if shared_cache_map.is_valid():
dump_parameters.append((shared_cache_map, primary_layer, "vacb"))
except exceptions.InvalidAddressException:
vollog.log(constants.LOGLEVEL_VVV, f"SharedCacheMap unavailable for file at {file_obj.vol.offset:#x}")
for memory_object, layer, extension in dump_parameters:
cache_name = EXTENSION_CACHE_MAP[extension]
desired_file_name = f"file.{file_obj.vol.offset:#x}.{memory_object.vol.offset:#x}.{cache_name}.{ntpath.basename(obj_name)}.{extension}"
file_handle = cls.dump_file_producer(file_obj, memory_object, open_method, layer, desired_file_name)
file_output = file_handle.preferred_filename if file_handle else "Error dumping file"
yield (cache_name, format_hints.Hex(file_obj.vol.offset), ntpath.basename(obj_name), file_output)
def _generator(self, procs: List, offsets: List):
kernel = self.context.modules[self.config["kernel"]]
file_re = re.compile(self.config["filter"], re.I if self.config["ignore-case"] else 0) if self.config["filter"] else None
dumped_files = set()
# Process files from handles
if procs:
handles_plugin = handles.Handles(context=self.context, config_path=self._config_path)
type_map = handles_plugin.get_type_map(context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name)
cookie = handles_plugin.find_cookie(context=self.context, layer_name=kernel.layer_name, symbol_table=kernel.symbol_table_name)
for proc in procs:
try:
object_table = proc.ObjectTable
except exceptions.InvalidAddressException:
vollog.log(constants.LOGLEVEL_VVV, f"Cannot access _EPROCESS.ObjectTable at {proc.vol.offset:#x}")
continue
for entry in handles_plugin.handles(object_table):
try:
if entry.get_object_type(type_map, cookie) == "File":
file_obj = entry.Body.cast("_FILE_OBJECT")
if file_re and not file_re.search(file_obj.file_name_with_device()):
continue
if file_obj.vol.offset in dumped_files:
continue
dumped_files.add(file_obj.vol.offset)
for result in self.process_file_object(self.context, kernel.layer_name, self.open, file_obj):
yield (0, result)
except exceptions.InvalidAddressException:
vollog.log(constants.LOGLEVEL_VVV, f"Cannot extract file from _OBJECT_HEADER at {entry.vol.offset:#x}")
# Process files by VADs
# similar logic to handle files from VADs...
def run(self):
kernel = self.context.modules[self.config["kernel"]]
procs = []
offsets = []
if self.config["filter"] and (self.config["virtaddr"] or self.config["physaddr"]):
raise ValueError("Cannot use filter flag with an address flag")
if self.config.get("virtaddr"):
offsets.append((self.config["virtaddr"], True))
elif self.config.get("physaddr"):
offsets.append((self.config["physaddr"], False))
else:
procs = pslist.PsList.list_processes(self.context, kernel.layer_name, kernel.symbol_table_name, pslist.PsList.create_pid_filter([self.config.get("pid", None)]))
return renderers.TreeGrid([("Cache", str), ("FileObject", format_hints.Hex), ("FileName", str), ("Result", str)], self._generator(procs, offsets))