diff --git a/scanners/path_detector.py b/scanners/path_detector.py index b72d086..7ff516d 100644 --- a/scanners/path_detector.py +++ b/scanners/path_detector.py @@ -20,6 +20,7 @@ import ssl from urllib3.exceptions import InsecureRequestWarning import warnings +import hashlib # 禁用urllib3中的不安全请求警告 warnings.simplefilter('ignore', InsecureRequestWarning) @@ -49,11 +50,14 @@ class PathDetector: SSE_MAX_SIZE = 5120 # 5KB MAX_RESPONSE_LENGTH = 102400 # 100KB PATH_THREAD_COUNT = 3 # 使用独立的3个线程池进行路径探测 + HASH_THRESHOLD = 5 # 哈希值重复次数阈值 def __init__(self, paths, proxy_manager): self.paths = paths self.proxy = proxy_manager.get_proxy() self.thread_local = threading.local() # 创建线程本地存储 + self.hash_counter = {} # 哈希值计数器 + self.lock = threading.Lock() # 用于线程安全的锁 def detect(self, url): """检测指定URL的敏感路径""" @@ -61,6 +65,9 @@ def detect(self, url): path_success_count = 0 detected_paths = [] + # 重置哈希值计数器 + self.hash_counter = {} + # 使用独立的线程池进行路径探测,并指定最大线程数为3 with ThreadPoolExecutor(max_workers=self.PATH_THREAD_COUNT) as executor: futures = {executor.submit(self._detect_path, url, path, signature): path for path, signature in self.paths.items()} @@ -108,15 +115,35 @@ def _make_request(self, url): content += chunk if len(content) > self.SSE_MAX_SIZE: break - return content.decode("utf-8", errors="ignore") + response_content = content.decode("utf-8", errors="ignore") elif res.status_code == 200: # ANSI 控制字符实现闪动效果 blinking_effect = "\033[5m" # 修改logger.info调用,输出红色闪动的成功消息 logger.info(f"{blinking_effect}{Fore.RED} [{res.status_code}] [Content-Length: {res.headers.get('Content-Length', 0)}] {Fore.CYAN}<-- [Success] {Fore.RESET}", extra={"target": url}) # 返回前 MAX_RESPONSE_LENGTH 的内容 - return res.text[:self.MAX_RESPONSE_LENGTH] - logger.info(f"[{res.status_code}] [Content-Length: {res.headers.get('Content-Length', 0)}]", extra={"target": url}) + response_content = res.text[:self.MAX_RESPONSE_LENGTH] + else: + logger.info(f"[{res.status_code}] [Content-Length: {res.headers.get('Content-Length', 0)}]", extra={"target": url}) + return None + + # 计算响应内容的哈希值 + response_hash = hashlib.md5(response_content.encode()).hexdigest() + + # 更新哈希值计数器 + with self.lock: + if response_hash in self.hash_counter: + self.hash_counter[response_hash] += 1 + else: + self.hash_counter[response_hash] = 1 + + # 如果哈希值重复次数达到阈值,丢弃该路径 + if self.hash_counter[response_hash] >= self.HASH_THRESHOLD: + logger.info(f"Hash {response_hash} repeated {self.HASH_THRESHOLD} times, discarding path: {url}") + return None + + return response_content + except requests.exceptions.SSLError as ssl_error: logger.error(f"SSL error occurred for {url}: {ssl_error}", extra={"target": url}) return self._retry_with_different_ssl_version(session, url) # 使用不同的 SSL/TLS 版本重新连接 @@ -188,4 +215,4 @@ def close_sessions(detector_instance): paths = {"actuator": "_links", "actuator/beans": "beans"} path_d = PathDetector(paths, proxy_manager) print(path_d.detect("http://192.168.1.13:8080/")) - print(path_d.detect("http://192.168.1.13:8083/")) \ No newline at end of file + print(path_d.detect("http://192.168.1.13:8083/"))