diff --git a/Age Custom Field on Vulnerabilities/age_custom_field.py b/Age Custom Field on Vulnerabilities/age_custom_field.py deleted file mode 100644 index 2e17861..0000000 --- a/Age Custom Field on Vulnerabilities/age_custom_field.py +++ /dev/null @@ -1,177 +0,0 @@ -import requests -import csv -import time -import json -import gzip -import io -import os -from collections import defaultdict -import sys -from datetime import datetime -from tqdm import tqdm -import logging -from dateutil import parser -import pytz - -# Configuration -token_variable = os.environ.get('API_KEY') -base_url = "https://api.kennasecurity.com" -vuln_age = 37 # replace 37 with the custom field for age from your environment -vuln_age_range = 38 # replace 38 with the custom field for range from your environment -thresh_num = 25000 # Threshold for how many IDs you want to send in each request. Max possible is 30k as per API docs -batch_size = 25000 # Number of vulnerabilities to process in each batch - -# Setup logging to a file -logging.basicConfig(filename='script_log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def request_data_export(token_variable): - """Send a request to the Kenna API to start a data export and return the search ID.""" - url = f"{base_url}/data_exports" - headers = { - 'X-Risk-Token': token_variable, - 'accept': 'application/json', - 'content-type': 'application/json' - } - data = { - "export_settings": { - "format": "json", - "model": "vulnerability", - "slim": False, - }, - "status": [ - "open", - "risk accepted", - "false positive" - ] - } - response = requests.post(url, headers=headers, json=data) - if response.status_code == 200: - return response.json()['search_id'] - else: - logging.error(f"Failed to send POST request. Status Code: {response.status_code}. Response Text: {response.text}") - return None - -def wait_for_data_export(search_id, token_variable, max_wait_time=7200, sleep_time=10): - """Poll the Kenna API to check if the data export is ready and download the data once it is available.""" - start_time = time.time() - status_url = f"{base_url}/data_exports/status?search_id={search_id}" - headers = { - 'X-Risk-Token': token_variable, - 'accept': 'application/json' - } - while True: - status_response = requests.get(status_url, headers=headers) - if status_response.status_code == 200 and status_response.json().get('message') == "Export ready for download": - url = f"{base_url}/data_exports?search_id={search_id}" - headers = { - 'X-Risk-Token': token_variable, - 'accept': 'application/gzip' - } - response = requests.get(url, headers=headers) - if response.status_code == 200: - decompressed_file = gzip.GzipFile(fileobj=io.BytesIO(response.content)) - data = json.load(decompressed_file) - return data - else: - logging.error(f"Failed to fetch data. Status Code: {response.status_code}. Response Text: {response.text}") - return None - elif time.time() - start_time > max_wait_time: - logging.error(f"Timed out after waiting for {max_wait_time} seconds.") - return None - else: - logging.info(f"Data export is still in progress. Waiting for {sleep_time} seconds before trying again.") - print(f"Data export is still in progress. Waiting for {sleep_time} seconds before trying again.") - time.sleep(sleep_time) - -def send_bulk_updates(vulns, vuln_age, vuln_age_range, token_variable): - """Send bulk updates to the Kenna API to update vulnerabilities with custom fields.""" - url = f"{base_url}/vulnerabilities/bulk" - headers = { - 'X-Risk-Token': token_variable, - 'accept': 'application/json', - 'content-type': 'application/json' - } - with tqdm(total=len(vulns), desc="Sending bulk updates", unit="vuln") as pbar: - for vuln in vulns: - payload = { - "vulnerability_ids": [vuln['id']], - "vulnerability": { - "custom_fields": { - str(vuln_age): vuln['age_value'], - str(vuln_age_range): vuln['range_value'] - } - } - } - logging.info(f"Sending payload for vulnerability ID {vuln['id']}: {json.dumps(payload)}") - response = requests.put(url, headers=headers, json=payload) - if response.status_code == 200: - logging.info(f"Successfully updated vulnerability ID {vuln['id']}") - else: - logging.error(f"Failed to update vulnerability ID {vuln['id']}. Response Status Code: {response.status_code}. Response Text: {response.text}") - pbar.update(1) - -def calculate_age_in_days(first_found_on): - """Calculate the age of a vulnerability in days based on the first found date.""" - try: - # Parse the first found date - first_found_date = parser.isoparse(first_found_on) - logging.info(f"Parsed first found date: {first_found_date}") - # Get the current date in UTC and make it timezone-aware - today = datetime.utcnow().replace(tzinfo=pytz.UTC) - logging.info(f"Current date (UTC): {today}") - # Calculate the age in days - age_in_days = (today - first_found_date).days - logging.info(f"First found date: {first_found_date}, Today: {today}, Age in days: {age_in_days}") - return age_in_days - except Exception as e: - logging.error(f"Error calculating age in days: {e}") - return None - -def determine_range(age_in_days): - """Determine the age range category for a vulnerability based on its age in days.""" - if age_in_days is None: - return "Unknown" - if age_in_days <= 30: - return "<= 30 days" - elif 30 < age_in_days <= 60: - return "31 - 60 days" - elif 60 < age_in_days <= 90: - return "61 - 90 days" - elif 90 < age_in_days <= 180: - return "91 - 180 days" - else: - return "> 180 days" - -def main(): - """Main function to orchestrate the entire process.""" - search_id = request_data_export(token_variable) - if not search_id: - sys.exit(1) - - vulns_data = wait_for_data_export(search_id, token_variable) - if not vulns_data: - sys.exit(1) - - # Process vulnerabilities and calculate age in days - total_vulns = len(vulns_data['vulnerabilities']) - vulns_to_update = [] - with tqdm(total=total_vulns, desc="Processing vulnerabilities", unit="vuln") as pbar: - for vuln in vulns_data['vulnerabilities']: - if 'first_found_on' in vuln: - age_value = calculate_age_in_days(vuln['first_found_on']) - range_value = determine_range(age_value) - vulns_to_update.append({ - 'id': vuln['id'], - 'age_value': age_value, - 'range_value': range_value - }) - if len(vulns_to_update) >= batch_size: - send_bulk_updates(vulns_to_update, vuln_age, vuln_age_range, token_variable) - vulns_to_update = [] - pbar.update(1) - # Send remaining vulnerabilities - if vulns_to_update: - send_bulk_updates(vulns_to_update, vuln_age, vuln_age_range, token_variable) - -if __name__ == "__main__": - main()