forked from dperetin/Nessus-XMLRPC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNessusXMLRPC.py
executable file
·401 lines (330 loc) · 14.3 KB
/
NessusXMLRPC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
#!/usr/bin/python
# coding=utf-8
"""
Copyright (c) 2010 HomeAway, Inc.
All rights reserved. http://www.homeaway.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import xml.etree.ElementTree
from httplib import HTTPSConnection, CannotSendRequest, ImproperConnectionState
from urllib import urlencode
from random import randint
from time import sleep
from exceptions import Exception
from Logger import get_logger
# Arbitary minimum and maximum values for random sequence num
SEQMIN = 10000
SEQMAX = 99999
# Simple exceptions for error handling
class NessusError(Exception):
"""
Base exception.
"""
def __init__(self, info, contents):
self.info = info
self.contents = contents
def __str__(self):
return "%s: %s" % (self.info, self.contents)
class RequestError(NessusError):
"""
General requests.
"""
pass
class LoginError(NessusError):
"""
Login.
"""
pass
class PolicyError(NessusError):
"""
Policies.
"""
pass
class ScanError(NessusError):
"""
Scans.
"""
pass
class ReportError(NessusError):
"""
Reports.
"""
pass
class ParseError(NessusError):
"""
Parsing XML.
"""
pass
class Scanner(object):
def __init__(self, host, port, login=None, password=None, timeout=60, debug=False):
"""
Initialize the scanner instance by setting up a connection and authenticating
if credentials are provided.
@type host: string
@param host: The hostname of the running Nessus server.
@type port: number
@param port: The port number for the XMLRPC interface on the Nessus server.
@type login: string
@param login: The username for logging in to Nessus.
@type password: string
@param password: The password for logging in to Nessus.
@type debug: bool
@param debug: turn on debugging.
"""
self.token = None
self.isadmin = None
self.host = host
self.port = port
self.timeout = timeout
self.debug = debug
self.logger = get_logger('Scanner')
self.connection = None
self.headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"}
self.username = login
self.password = password
self._connect()
self.login()
def _connect(self):
"""
Internal method for connecting to the target Nessus server.
"""
self.connection = HTTPSConnection(self.host, self.port, timeout=self.timeout)
def _request(self, method, target, params):
"""
Internal method for submitting requests to the target Nessus server, rebuilding
the connection if needed.
@type method: string
@param method: The HTTP verb/method used in the request (almost always POST).
@type target: string
@param target: The target path (or function) of the request.
@type params: string
@param params: The URL encoded parameters used in the request.
"""
def _log_headers(headers):
if isinstance(headers, dict):
for (name, value) in headers.items():
self.logger.debug(" %s: %s" % (name, value))
elif type(headers) is tuple or type(headers) is list:
for tup in headers:
self.logger.debug(" %s: %s" % (tup[0], tup[1]))
try:
if self.connection is None:
self._connect()
if self.debug is True:
self.logger.debug("Sending request: %s %s" % (method, target))
self.logger.debug("Params: %s" % params)
self.logger.debug("Headers:")
_log_headers(self.headers)
self.connection.request(method, target, params, self.headers)
except (CannotSendRequest, ImproperConnectionState):
self._connect()
self.login()
self.connection.request(method, target, params, self.headers)
response = self.connection.getresponse()
response_page = response.read()
if self.debug is True:
self.logger.debug("Response: %s %s" % (response.status, response.reason))
self.logger.debug("Response headers:")
_log_headers(response.getheaders())
self.logger.debug(response_page)
if int(response.status) != 200:
if int(response.status) == 403:
# Session times out?
if self.login():
return self._request(method, target, params)
else:
raise LoginError("Login credentials needed to access: ", target)
raise RequestError("Error sending request:", response)
return response_page
def _rparse(self, parsed):
"""
Recursively parse XML and generate an interable hybrid dictionary/list with all data.
@type parsed: xml.etree.ElementTree.Element
@param parsed: An ElementTree Element object of the parsed XML.
"""
result = dict()
# Iterate over each element
for element in parsed.getchildren():
# If the element has children, use a dictionary
children = element.getchildren()
if len(children) > 0:
# We have children for this element
if type(result) is list:
# Append the next parse, we're apparently in a list()
result.append(self._rparse(element))
elif type(result) is dict and element.tag in result:
# Change the dict() to a list() if we have multiple hits
tmp = result
result = list()
# Iterate through the values in the dictionary, adding values only
# - This reduces redundancy in parsed output (no outer tags)
for val in tmp.itervalues():
result.append(val)
else:
result[element.tag] = dict()
result[element.tag] = self._rparse(element)
else:
result[element.tag] = element.text
return result
def parse(self, response):
"""
Parse the XML response from the server.
@type response: string
@param response: Response XML from the server following a request.
"""
# Okay, for some reason there's a bug with how expat handles newlines
try:
return self._rparse(xml.etree.ElementTree.fromstring(response.replace("\n", "")))
except Exception:
raise ParseError("Error parsing XML", response)
def login(self, seq=randint(SEQMIN, SEQMAX)):
"""
Log in to the Nessus server and preserve the token value for subsequent requests.
Returns True for successful login, False when credentials weren't set. Login failure throws an exception.
@type seq: number
@param seq: A sequence number that will be echoed back for unique identification (optional).
"""
if self.username is None or self.password is None:
return False
params = urlencode({'login': self.username, 'password': self.password, 'seq': seq})
response = self._request("POST", "/login", params)
parsed = self.parse(response)
contents = parsed['contents']
if parsed['status'] == "OK":
self.token = contents['token'] # Actual token value
user = contents['user'] # User dict (admin status, user name)
self.isadmin = user['admin'] # Is the logged in user an admin?
self.headers["Cookie"] = "token=%s" % self.token # Persist token value for subsequent requests
else:
raise LoginError("Unable to login", contents)
return True
def logout(self, seq=randint(SEQMIN, SEQMAX)):
"""
Log out of the Nessus server, invalidating the current token value. Returns True if successful, False if not.
@type seq: number
@param seq: A sequence number that will be echoed back for unique identification (optional).
"""
params = urlencode({'seq': seq})
response = self._request("POST", "/logout", params)
parsed = self.parse(response)
if parsed['status'] == "OK" and parsed['contents'] == "OK":
return True
else:
return False
def policyList(self, seq=randint(SEQMIN, SEQMAX)):
"""
List the current policies configured on the server and return a dict with the info.
@type seq: number
@param seq: A sequence number that will be echoed back for unique identification (optional).
"""
params = urlencode({'seq': seq})
response = self._request("POST", "/policy/list", params)
parsed = self.parse(response)
contents = parsed['contents']
if parsed['status'] == "OK":
policies = contents['policies'] # Should be an iterable list of policies
else:
raise PolicyError("Unable to get policy list", contents)
return policies
def getErrors(self, scan, seq=randint(SEQMIN, SEQMAX)):
params = urlencode({'report': scan['uuid'], 'seq': seq})
response = self._request("POST", "/report/errors", params)
parsed = self.parse(response)
contents = parsed['contents']
if parsed['status'] == "OK":
return contents['errors'] # Return the collected errors.
else:
raise ReportError("Unable to get error status for scan job: ", (scan['uuid'], contents['errors']))
def scanNew(self, scan_name, target, policy_id, seq=randint(SEQMIN, SEQMAX)):
"""
Start up a new scan on the Nessus server immediately.
@type scan_name: string
@param scan_name: The desired name of the scan.
@type target: string
@param target: A Nessus-compatible target string (comma separation, CIDR notation, etc.)
@type policy_id: number
@param policy_id: The unique ID of the policy to be used in the scan.
@type seq: number
@param seq: A sequence number that will be echoed back for unique identification (optional).
"""
params = urlencode({'target': target, 'policy_id': policy_id, 'scan_name': scan_name, 'seq': seq})
response = self._request("POST", "/scan/new", params)
parsed = self.parse(response)
contents = parsed['contents']
if parsed['status'] == "OK":
return contents['scan'] # Return what you can about the scan
else:
raise ScanError("Unable to start scan", contents)
def quickScan(self, scan_name, target, policy_name, seq=randint(SEQMIN, SEQMAX)):
"""
Configure a new scan using a canonical name for the policy. Perform a lookup for the policy ID and configure the
scan, starting it immediately.
@type scan_name: string
@param scan_name: The desired name of the scan.
@type target: string
@param target: A Nessus-compatible target string (comma separation, CIDR notation, etc.)
@type policy_name: string
@param policy_name: The name of the policy to be used in the scan.
@type seq: number
@param seq: A sequence number that will be echoed back for unique identification (optional).
"""
policies = self.policyList()
if type(policies) is dict:
# There appears to be only one configured policy
policy = policies['policy']
if policy['policyName'] == policy_name:
policy_id = policy['policyID']
else:
raise PolicyError("Unable to parse policies from policyList()", (scan_name, target, policy_name))
else:
# We have multiple policies configured
policy_id = None
for policy in policies:
if policy['policyName'] == policy_name:
policy_id = policy['policyID']
if policy_id is None:
raise PolicyError("Unable to find policy", (scan_name, target, policy_name))
return self.scanNew(scan_name, target, policy_id, seq=seq)
def reportList(self, seq=randint(SEQMIN, SEQMAX)):
"""
Generate a list of reports available on the Nessus server.
@type seq: number
@param seq: A sequence number that will be echoed back for unique identification (optional).
"""
params = urlencode({'seq': seq})
response = self._request("POST", "/report/list", params)
parsed = self.parse(response)
contents = parsed['contents']
if parsed['status'] == "OK":
reports = contents['reports']
if type(reports) is dict:
# We've only got one report, put it into a list
temp = reports
reports = list()
reports.append(temp['report'])
return reports # Return an iterable list of reports
else:
raise ReportError("Unable to get reports.", contents)
def reportDownload(self, report, version="v2"):
"""
Download a report (XML) for a completed scan.
@type report: string
@param report: The UUID of the report or completed scan.
@type version: string
@param version: The version of the .nessus XML file you wish to download.
"""
if version == "v1":
params = urlencode({'report': report, 'v1': version})
else:
params = urlencode({'report': report})
return self._request("POST", "/file/report/download", params)
# vim: expandtab sw=4 ts=4 ai