Skip to content

Commit

Permalink
Merge pull request #15 from GreyDGL/log
Browse files Browse the repository at this point in the history
feat: 🎸 Add logging function
  • Loading branch information
GreyDGL authored Apr 22, 2023
2 parents 2fcf2e7 + 4d72fea commit 6310cbb
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 16 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# PentestGPT

**We're testing PentestGPT on HackTheBox**. Follow

## Introduction
**PentestGPT** is a penetration testing tool empowered by **ChatGPT**. It is designed to automate the penetration testing process. It is built on top of ChatGPT and operate in an interactive mode to guide penetration testers in both overall progress and specific operations.
A sample testing process of **PentestGPT** on a target VulnHub machine (Hackable II) is available at [here](./resources/PentestGPT_Hackable2.pdf).
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ colorama
rich
prompt-toolkit
google
pytest
pytest
openai
8 changes: 5 additions & 3 deletions utils/chatgpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ def __init__(self, config: ChatGPTConfig):
self.conversation_dict: Dict[str, Conversation] = {}
self.headers = dict(
{
"cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}",
# "cookie": self.config.cookie,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
# "cookie": f"cf_clearance={self.cf_clearance}; _puid={self._puid}; __Secure-next-auth.session-token={self.session_token}",
"cookie": self.config.cookie,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"
# 'Content-Type': 'text/event-stream; charset=utf-8',
}
Expand All @@ -79,6 +79,8 @@ def __init__(self, config: ChatGPTConfig):

def get_authorization(self):
url = "https://chat.openai.com/api/auth/session"
if "cookie" in vars(self.config):
self.headers["cookie"] = self.config.cookie
r = requests.get(url, headers=self.headers)
authorization = r.json()["accessToken"]
# authorization = self.config.accessToken
Expand Down
22 changes: 16 additions & 6 deletions utils/chatgpt_browser.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# This file is deprecated. It is not used in the project.
# -*- coding: utf-8 -*-

import json
import re
import time
from uuid import uuid1
import datetime
from chatgpt_wrapper import OpenAIAPI

import loguru
import requests

from chatgpt_wrapper import ChatGPT
from chatgpt_wrapper.config import Config
from config.chatgpt_config import ChatGPTConfig

logger = loguru.logger

Expand All @@ -22,7 +24,7 @@ class ChatGPTBrowser:
"""

def __init__(self, model=None):
config = Config()
config = ChatGPTConfig()
if model is not None:
config.set("chat.model", model)
self.bot = ChatGPT(config)
Expand All @@ -48,6 +50,7 @@ def send_new_message(self, message):
def send_message(self, message, conversation_id):
# 发送会话窗口消息
# TODO: send message from browser
# check here: https://github.com/mmabrouk/chatgpt-wrapper/blob/bafd0be7fb3355ea4a4b0276ade9f0fc6e8571fd/chatgpt_wrapper/backends/openai/repl.py#L101
return

def extract_code_fragments(self, text):
Expand All @@ -61,7 +64,14 @@ def delete_conversation(self, conversation_id=None):


if __name__ == "__main__":
chatgptBrowser_session = ChatGPTBrowser()
text, conversation_id = chatgptBrowser_session.send_new_message(
"I am a new tester for RESTful APIs."
)
# chatgptBrowser_session = ChatGPTBrowser()
# text, conversation_id = chatgptBrowser_session.send_new_message(
# "I am a new tester for RESTful APIs."
# )

bot = OpenAIAPI()
success, response, message = bot.ask("Hello, world!")
if success:
print(response)
else:
raise RuntimeError(message)
95 changes: 89 additions & 6 deletions utils/pentest_gpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from prompt_toolkit.formatted_text import HTML
from utils.task_handler import main_task_entry, mainTaskCompleter
from utils.web_parser import google_search, parse_web
import time
import datetime as dt

import loguru
import time, os, textwrap
import time, os, textwrap, json

logger = loguru.logger
logger.add(sink="logs/pentest_gpt.log")
Expand Down Expand Up @@ -53,6 +55,32 @@ def __init__(self):
self.step_reasoning = (
None # the response from the reasoning session for the current step
)
self.history = {
"user": [],
"pentestGPT": [],
"reasoning": [],
"input_parsing": [],
"generation": [],
"exception": [],
} # the history of the current conversation

def log_conversation(self, source, text):
"""
append the conversation into the history
Parameters:
----------
source: str
the source of the conversation
text: str
the content of the conversation
"""
# append the conversation into the history
timestamp = time.time()
if source not in self.history.keys():
# an exception
source = "exception"
self.history[source].append((timestamp, text))

def initialize(self):
# initialize the backbone sessions and test the connection to chatGPT
Expand Down Expand Up @@ -89,6 +117,8 @@ def reasoning_handler(self, text) -> str:
response = self.chatGPT4Agent.send_message(
self.prompts.process_results + text, self.test_reasoning_session_id
)
# log the conversation
self.log_conversation("reasoning", response)
return response

def input_parsing_handler(self, text, source=None) -> str:
Expand All @@ -110,12 +140,15 @@ def input_parsing_handler(self, text, source=None) -> str:
summarized_content += self.chatGPTAgent.send_message(
prefix + word_limit + wrapped_input, self.input_parsing_session_id
)
# log the conversation
self.log_conversation("input_parsing", summarized_content)
return summarized_content

def test_generation_handler(self, text):
# send the contents to chatGPT test_generation_session and obtain the results
response = self.chatGPTAgent.send_message(text, self.test_generation_session_id)
# print the results
# log the conversation
self.log_conversation("generation", response)
return response

def input_handler(self) -> str:
Expand All @@ -131,6 +164,7 @@ def input_handler(self) -> str:
self.chat_count += 1

request_option = main_task_entry()
self.log_conversation("user", request_option)
# request_option = prompt_select(
# title=f"({self.chat_count}) > Please select your options with cursor: ",
# values=[
Expand All @@ -141,6 +175,7 @@ def input_handler(self) -> str:
# ],
# )
# pass output

if request_option == "help":
print(mainTaskCompleter().task_details)

Expand All @@ -159,6 +194,9 @@ def input_handler(self) -> str:
"Your input: (End with <shift + right-arrow>)", style="bold green"
)
user_input = prompt_ask("> ", multiline=True)
self.log_conversation(
"user_input", "Source: " + options[int(source)] + "\n" + user_input
)
with self.console.status("[bold green] PentestGPT Thinking...") as status:
parsed_input = self.input_parsing_handler(
user_input, source=options[int(source)]
Expand All @@ -173,6 +211,11 @@ def input_handler(self) -> str:
style="bold green",
)
self.console.print(reasoning_response + "\n")
self.log_conversation(
"pentestGPT",
"Based on the analysis, the following tasks are recommended:"
+ reasoning_response,
)
response = reasoning_response

# generate more test details (beginner mode)
Expand All @@ -183,7 +226,9 @@ def input_handler(self) -> str:
"You have not initialized the task yet. Please perform the basic testing following `next` option.",
style="bold red",
)
return
response = "You have not initialized the task yet. Please perform the basic testing following `next` option."
self.log_conversation("pentestGPT", response)
return response
with self.console.status("[bold green] PentestGPT Thinking...") as status:
generation_response = self.test_generation_handler(
self.step_reasoning_response
Expand All @@ -195,6 +240,7 @@ def input_handler(self) -> str:
)
self.console.print(generation_response + "\n")
response = generation_response
self.log_conversation("pentestGPT", response)

# ask for task list (to-do list)
elif request_option == "todo":
Expand All @@ -216,20 +262,33 @@ def input_handler(self) -> str:
)
self.console.print(generation_response + "\n")
response = reasoning_response
self.log_conversation(
"pentestGPT",
"Based on the analysis, the following tasks are recommended:"
+ reasoning_response
+ "\n"
+ "You can follow the instructions below to complete the tasks."
+ generation_response,
)

# pass other information, such as questions or some observations.
elif request_option == "discuss":
## (1) Request for user multi-line input
self.console.print("Please share your thoughts/questions with PentestGPT.")
self.log_conversation(
"pentestGPT", "Please share your thoughts/questions with PentestGPT."
)
user_input = prompt_ask(
"(End with <shift + right-arrow>) Your input: ", multiline=True
)
self.log_conversation("user_input", user_input)
## (2) pass the information to the reasoning session.
with self.console.status("[bold green] PentestGPT Thinking...") as status:
response = self.reasoning_handler(self.prompts.discussion + user_input)
## (3) print the results
self.console.print("PentestGPT:\n", style="bold green")
self.console.print(response + "\n", style="yellow")
self.log_conversation("pentestGPT", response)

# Google
elif request_option == "google":
Expand All @@ -238,22 +297,34 @@ def input_handler(self) -> str:
"Please enter your search query. PentestGPT will summarize the info from google.",
style="bold green",
)
self.log_conversation(
"pentestGPT",
"Please enter your search query. PentestGPT will summarize the info from google.",
)
user_input = prompt_ask(
"(End with <shift + right-arrow>) Your input: ", multiline=False
)
self.log_conversation("user_input", user_input)
with self.console.status("[bold green] PentestGPT Thinking...") as status:
# query the question
result = self.google_search(user_input, 5) # 5 results by default
result: dict = google_search(user_input, 5) # 5 results by default
# summarize the results
# TODO
response = "Google search results:\n" + "still under development."
self.console.print(response + "\n", style="yellow")
self.log_conversation("pentestGPT", response)
return response

# end
elif request_option == "quit":
response = False
self.console.print("Thank you for using PentestGPT!", style="bold green")
self.log_conversation("pentestGPT", "Thank you for using PentestGPT!")

else:
self.console.print("Please key in the correct options.", style="bold red")
response = self.input_handler()

self.log_conversation("pentestGPT", "Please key in the correct options.")
response = "Please key in the correct options."
return response

def main(self):
Expand All @@ -268,6 +339,7 @@ def main(self):
"Please describe the penetration testing task in one line, including the target IP, task type, etc.\n> ",
multiline=False,
)
self.log_conversation("user", init_description)
## Provide the information to the reasoning session for the task initialization.
prefixed_init_description = self.prompts.task_description + init_description
with self.console.status(
Expand All @@ -285,8 +357,14 @@ def main(self):
"PentestGPT suggests you to do the following: ", style="bold green"
)
self.console.print(_response)
self.log_conversation(
"PentestGPT", "PentestGPT suggests you to do the following: \n" + _response
)
self.console.print("You may start with:", style="bold green")
self.console.print(first_generation_response)
self.log_conversation(
"PentestGPT", "You may start with: \n" + first_generation_response
)

# 4. enter the main loop.
while True:
Expand All @@ -299,6 +377,11 @@ def main(self):

# Summarize the session and end
# TODO.
# log the session.
## save self.history into a txt file based on timestamp
log_name = "pentestGPT_log_" + dt.now().strftime("%Y%m%d_%H%M%S") + ".json"
with open(log_name, "w") as f:
json.dump(self.history, f)

# clear the sessions
# TODO.
1 change: 1 addition & 0 deletions utils/web_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def google_search(keyword, num_results=5) -> dict:
for url in search(keyword, tld="com", num=num_results, stop=num_results, pause=2):
search_result[url] = parse_web(url)
result = {"keyword": keyword, "search_result": search_result}
return result


if __name__ == "__main__":
Expand Down

0 comments on commit 6310cbb

Please sign in to comment.