-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexploitdb.py
87 lines (74 loc) · 3.3 KB
/
exploitdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import requests
import json
from typing import List, Dict
from functools import lru_cache
import os
from cachetools import cached, TTLCache
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ExploitDB:
def __init__(self, cache_dir: str = ".cache"):
logger.debug(f"Initializing ExploitDB with cache_dir: {cache_dir}")
self.base_url = "https://www.exploit-db.com/search"
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
self.cache = TTLCache(maxsize=100, ttl=600)
logger.debug("ExploitDB initialized")
@cached(cache=lambda self: self.cache)
def search(self, query: str, vuln_type: str = None) -> List[Dict]:
logger.debug(f"Searching for query: {query}, vuln_type: {vuln_type}")
cache_file = os.path.join(self.cache_dir, f"{query}_{vuln_type}.json")
if os.path.exists(cache_file):
logger.debug(f"Cache hit: Loading from {cache_file}")
with open(cache_file, 'r') as f:
return json.load(f)
logger.debug("Cache miss: Fetching from API")
params = {
"text": query,
"type": vuln_type,
"order_by": "date_published",
"order": "desc"
}
logger.debug(f"Making API request with params: {params}")
response = requests.get(self.base_url, params=params)
results = response.json()
logger.debug(f"Caching results to {cache_file}")
with open(cache_file, 'w') as f:
json.dump(results, f)
processed_results = self._process_results(results)
logger.debug(f"Processed {len(processed_results)} results")
return processed_results
def _process_results(self, results: Dict) -> List[Dict]:
logger.debug("Processing search results")
processed = []
for exploit in results.get('data', []):
processed.append({
'id': exploit['id'],
'title': exploit['title'],
'type': exploit['type'],
'platform': exploit['platform'],
'author': exploit['author'],
'date': exploit['date_published'],
'verified': exploit['verified'],
'description': exploit['description']
})
logger.debug(f"Processed {len(processed)} exploits")
return processed
def get_exploit_details(self, exploit_id: int) -> Dict:
logger.debug(f"Getting details for exploit ID: {exploit_id}")
cache_file = os.path.join(self.cache_dir, f"exploit_{exploit_id}.json")
if os.path.exists(cache_file):
logger.debug(f"Cache hit: Loading from {cache_file}")
with open(cache_file, 'r') as f:
return json.load(f)
logger.debug("Cache miss: Fetching from API")
url = f"https://www.exploit-db.com/exploits/{exploit_id}"
logger.debug(f"Making API request to: {url}")
response = requests.get(url)
details = response.json()
logger.debug(f"Caching details to {cache_file}")
with open(cache_file, 'w') as f:
json.dump(details, f)
logger.debug("Returning exploit details")
return details