-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
177 additions
and
0 deletions.
There are no files selected for viewing
177 changes: 177 additions & 0 deletions
177
Age Custom Field on Vulnerabilities/age_custom_field.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
import requests | ||
import csv | ||
import time | ||
import json | ||
import gzip | ||
import io | ||
import os | ||
from collections import defaultdict | ||
import sys | ||
from datetime import datetime | ||
from tqdm import tqdm | ||
import logging | ||
from dateutil import parser | ||
import pytz | ||
|
||
# Configuration | ||
token_variable = os.environ.get('API_KEY') | ||
base_url = "https://api.kennasecurity.com" | ||
vuln_age = 37 # replace with the custom field for age from your environment | ||
vuln_age_range = 38 # replace with the custom field for range from your environment | ||
thresh_num = 25000 # Threshold for how many IDs you want to send in each request. Max possible is 30k as per API docs | ||
batch_size = 25000 # Number of vulnerabilities to process in each batch | ||
|
||
# Setup logging to a file | ||
logging.basicConfig(filename='script_log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | ||
|
||
def request_data_export(token_variable): | ||
"""Send a request to the Kenna API to start a data export and return the search ID.""" | ||
url = f"{base_url}/data_exports" | ||
headers = { | ||
'X-Risk-Token': token_variable, | ||
'accept': 'application/json', | ||
'content-type': 'application/json' | ||
} | ||
data = { | ||
"export_settings": { | ||
"format": "json", | ||
"model": "vulnerability", | ||
"slim": False, | ||
}, | ||
"status": [ | ||
"open", | ||
"risk accepted", | ||
"false positive" | ||
] | ||
} | ||
response = requests.post(url, headers=headers, json=data) | ||
if response.status_code == 200: | ||
return response.json()['search_id'] | ||
else: | ||
logging.error(f"Failed to send POST request. Status Code: {response.status_code}. Response Text: {response.text}") | ||
return None | ||
|
||
def wait_for_data_export(search_id, token_variable, max_wait_time=7200, sleep_time=10): | ||
"""Poll the Kenna API to check if the data export is ready and download the data once it is available.""" | ||
start_time = time.time() | ||
status_url = f"{base_url}/data_exports/status?search_id={search_id}" | ||
headers = { | ||
'X-Risk-Token': token_variable, | ||
'accept': 'application/json' | ||
} | ||
while True: | ||
status_response = requests.get(status_url, headers=headers) | ||
if status_response.status_code == 200 and status_response.json().get('message') == "Export ready for download": | ||
url = f"{base_url}/data_exports?search_id={search_id}" | ||
headers = { | ||
'X-Risk-Token': token_variable, | ||
'accept': 'application/gzip' | ||
} | ||
response = requests.get(url, headers=headers) | ||
if response.status_code == 200: | ||
decompressed_file = gzip.GzipFile(fileobj=io.BytesIO(response.content)) | ||
data = json.load(decompressed_file) | ||
return data | ||
else: | ||
logging.error(f"Failed to fetch data. Status Code: {response.status_code}. Response Text: {response.text}") | ||
return None | ||
elif time.time() - start_time > max_wait_time: | ||
logging.error(f"Timed out after waiting for {max_wait_time} seconds.") | ||
return None | ||
else: | ||
logging.info(f"Data export is still in progress. Waiting for {sleep_time} seconds before trying again.") | ||
print(f"Data export is still in progress. Waiting for {sleep_time} seconds before trying again.") | ||
time.sleep(sleep_time) | ||
|
||
def send_bulk_updates(vulns, vuln_age, vuln_age_range, token_variable): | ||
"""Send bulk updates to the Kenna API to update vulnerabilities with custom fields.""" | ||
url = f"{base_url}/vulnerabilities/bulk" | ||
headers = { | ||
'X-Risk-Token': token_variable, | ||
'accept': 'application/json', | ||
'content-type': 'application/json' | ||
} | ||
with tqdm(total=len(vulns), desc="Sending bulk updates", unit="vuln") as pbar: | ||
for vuln in vulns: | ||
payload = { | ||
"vulnerability_ids": [vuln['id']], | ||
"vulnerability": { | ||
"custom_fields": { | ||
str(vuln_age): vuln['age_value'], | ||
str(vuln_age_range): vuln['range_value'] | ||
} | ||
} | ||
} | ||
logging.info(f"Sending payload for vulnerability ID {vuln['id']}: {json.dumps(payload)}") | ||
response = requests.put(url, headers=headers, json=payload) | ||
if response.status_code == 200: | ||
logging.info(f"Successfully updated vulnerability ID {vuln['id']}") | ||
else: | ||
logging.error(f"Failed to update vulnerability ID {vuln['id']}. Response Status Code: {response.status_code}. Response Text: {response.text}") | ||
pbar.update(1) | ||
|
||
def calculate_age_in_days(first_found_on): | ||
"""Calculate the age of a vulnerability in days based on the first found date.""" | ||
try: | ||
# Parse the first found date | ||
first_found_date = parser.isoparse(first_found_on) | ||
logging.info(f"Parsed first found date: {first_found_date}") | ||
# Get the current date in UTC and make it timezone-aware | ||
today = datetime.utcnow().replace(tzinfo=pytz.UTC) | ||
logging.info(f"Current date (UTC): {today}") | ||
# Calculate the age in days | ||
age_in_days = (today - first_found_date).days | ||
logging.info(f"First found date: {first_found_date}, Today: {today}, Age in days: {age_in_days}") | ||
return age_in_days | ||
except Exception as e: | ||
logging.error(f"Error calculating age in days: {e}") | ||
return None | ||
|
||
def determine_range(age_in_days): | ||
"""Determine the age range category for a vulnerability based on its age in days.""" | ||
if age_in_days is None: | ||
return "Unknown" | ||
if age_in_days <= 30: | ||
return "<= 30 days" | ||
elif 30 < age_in_days <= 60: | ||
return "31 - 60 days" | ||
elif 60 < age_in_days <= 90: | ||
return "61 - 90 days" | ||
elif 90 < age_in_days <= 180: | ||
return "91 - 180 days" | ||
else: | ||
return "> 180 days" | ||
|
||
def main(): | ||
"""Main function to orchestrate the entire process.""" | ||
search_id = request_data_export(token_variable) | ||
if not search_id: | ||
sys.exit(1) | ||
|
||
vulns_data = wait_for_data_export(search_id, token_variable) | ||
if not vulns_data: | ||
sys.exit(1) | ||
|
||
# Process vulnerabilities and calculate age in days | ||
total_vulns = len(vulns_data['vulnerabilities']) | ||
vulns_to_update = [] | ||
with tqdm(total=total_vulns, desc="Processing vulnerabilities", unit="vuln") as pbar: | ||
for vuln in vulns_data['vulnerabilities']: | ||
if 'first_found_on' in vuln: | ||
age_value = calculate_age_in_days(vuln['first_found_on']) | ||
range_value = determine_range(age_value) | ||
vulns_to_update.append({ | ||
'id': vuln['id'], | ||
'age_value': age_value, | ||
'range_value': range_value | ||
}) | ||
if len(vulns_to_update) >= batch_size: | ||
send_bulk_updates(vulns_to_update, vuln_age, vuln_age_range, token_variable) | ||
vulns_to_update = [] | ||
pbar.update(1) | ||
# Send remaining vulnerabilities | ||
if vulns_to_update: | ||
send_bulk_updates(vulns_to_update, vuln_age, vuln_age_range, token_variable) | ||
|
||
if __name__ == "__main__": | ||
main() |