From 2d525843b7c8b837ea3bb1dec1e0f04fd66573c8 Mon Sep 17 00:00:00 2001 From: guanw Date: Sat, 2 Dec 2023 19:37:59 -0500 Subject: [PATCH 01/10] update supervisord config --- supervisord.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/supervisord.conf b/supervisord.conf index 8c2517a..59bf5b8 100644 --- a/supervisord.conf +++ b/supervisord.conf @@ -4,8 +4,8 @@ logfile=./log/supervisord.log pidfile=./log/supervisord.pid [program:gpt_server] -command=python websocket_server.py -directory=./server +command=python ./server/websocket_server.py +directory=./ autostart=true autorestart=true stdout_logfile=./log/access.log From 081b211ac1e311ad4759869cdde976deb9d98221 Mon Sep 17 00:00:00 2001 From: guanw Date: Sun, 3 Dec 2023 14:18:31 -0500 Subject: [PATCH 02/10] rm client ping timeout --- client.py | 12 +++++++++--- server/websocket_server.py | 11 +++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/client.py b/client.py index 95a113c..d46703e 100644 --- a/client.py +++ b/client.py @@ -3,8 +3,9 @@ async def send_message(): uri = "ws://localhost:8081" # Replace with the actual WebSocket server URI + # uri = "ws://129.213.151.7:8081" - async with websockets.connect(uri) as websocket: + async with websockets.connect(uri, ping_timeout=None) as websocket: # message = "@remindme to take notes!" message = "@gguf tell me about yourself" print(f"Sending message to server: {message}") @@ -15,10 +16,15 @@ async def send_message(): print(f"Received response from server: {response}") if not response: break + except websockets.exceptions.ConnectionClosed as e: + print(f"Connection closed by the client. Reason: {e.reason}") + break + except asyncio.TimeoutError: + print("Timeout waiting for response. Closing connection.") + break except Exception as e: print(f"An exception occurred: {e}") break # Run the WebSocket client -asyncio.run(send_message()) -# asyncio.get_event_loop().run_until_complete(send_message()) \ No newline at end of file +asyncio.run(send_message()) \ No newline at end of file diff --git a/server/websocket_server.py b/server/websocket_server.py index 0b906c8..ce0ab0b 100644 --- a/server/websocket_server.py +++ b/server/websocket_server.py @@ -27,15 +27,10 @@ async def message(websocket, path): except websockets.exceptions.ConnectionClosed: print("Connection closed by the client.") - async def main(): - # Start the WebSocket server on localhost, port 8081 - server = await websockets.serve(message, "localhost", PORT) - - print(f"WebSocket server started on ws://localhost:{PORT}") - - # Keep the server running until it's manually stopped - await server.wait_closed() + async with websockets.serve(message, "localhost", PORT): + print(f"WebSocket server started on ws://localhost:{PORT}") + await asyncio.Event().wait() # Run the WebSocket server asyncio.run(main()) From 02a2af570d41c7caa147f8993e0ac39cd5c114bd Mon Sep 17 00:00:00 2001 From: guanw Date: Sun, 3 Dec 2023 14:26:15 -0500 Subject: [PATCH 03/10] add simple http healthcheck endpoint --- server/websocket_server.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/server/websocket_server.py b/server/websocket_server.py index ce0ab0b..6f95bda 100644 --- a/server/websocket_server.py +++ b/server/websocket_server.py @@ -3,8 +3,10 @@ from callbacks import websocket_callback_singleton from models.llama2_gguf_model import Llama2GGUFModel from remindme_parser import Remindme +from http.server import SimpleHTTPRequestHandler, HTTPServer PORT = 8081 +ADDRESS = "0.0.0.0" async def message(websocket, path): print(f"Client connected to path: {path}") @@ -27,10 +29,27 @@ async def message(websocket, path): except websockets.exceptions.ConnectionClosed: print("Connection closed by the client.") + +class HealthCheckHandler(SimpleHTTPRequestHandler): + def do_GET(self): + if self.path == '/healthcheck': + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + self.wfile.write(b'OK') + else: + super().do_GET() + async def main(): - async with websockets.serve(message, "localhost", PORT): - print(f"WebSocket server started on ws://localhost:{PORT}") - await asyncio.Event().wait() + websocket_server = websockets.serve(message, ADDRESS, PORT) + print(f"WebSocket server started on ws://{ADDRESS}:{PORT}") + + # Start the HTTP server on the same port + http_server = HTTPServer((ADDRESS, PORT), HealthCheckHandler) + print(f"HTTP server started on http://{ADDRESS}:{PORT}") + + # Run both servers concurrently + await asyncio.gather(websocket_server, http_server.serve_forever()) # Run the WebSocket server asyncio.run(main()) From d8e28ac8997e5e629b8d5a3407bf2a6542c7d85a Mon Sep 17 00:00:00 2001 From: guanw Date: Sun, 3 Dec 2023 15:04:16 -0500 Subject: [PATCH 04/10] remove http server --- client.py | 7 ++++--- server/websocket_server.py | 22 +++------------------- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/client.py b/client.py index d46703e..db8ace2 100644 --- a/client.py +++ b/client.py @@ -2,16 +2,17 @@ import websockets async def send_message(): - uri = "ws://localhost:8081" # Replace with the actual WebSocket server URI - # uri = "ws://129.213.151.7:8081" + uri = "ws://0.0.0.0:8081" # Replace with the actual WebSocket server URI async with websockets.connect(uri, ping_timeout=None) as websocket: - # message = "@remindme to take notes!" + # Send your actual message message = "@gguf tell me about yourself" print(f"Sending message to server: {message}") await websocket.send(message) + while True: try: + # Receive and print the response from the server response = await websocket.recv() print(f"Received response from server: {response}") if not response: diff --git a/server/websocket_server.py b/server/websocket_server.py index 6f95bda..86a43e5 100644 --- a/server/websocket_server.py +++ b/server/websocket_server.py @@ -3,7 +3,6 @@ from callbacks import websocket_callback_singleton from models.llama2_gguf_model import Llama2GGUFModel from remindme_parser import Remindme -from http.server import SimpleHTTPRequestHandler, HTTPServer PORT = 8081 ADDRESS = "0.0.0.0" @@ -30,26 +29,11 @@ async def message(websocket, path): except websockets.exceptions.ConnectionClosed: print("Connection closed by the client.") -class HealthCheckHandler(SimpleHTTPRequestHandler): - def do_GET(self): - if self.path == '/healthcheck': - self.send_response(200) - self.send_header('Content-type', 'text/html') - self.end_headers() - self.wfile.write(b'OK') - else: - super().do_GET() - async def main(): - websocket_server = websockets.serve(message, ADDRESS, PORT) - print(f"WebSocket server started on ws://{ADDRESS}:{PORT}") - - # Start the HTTP server on the same port - http_server = HTTPServer((ADDRESS, PORT), HealthCheckHandler) - print(f"HTTP server started on http://{ADDRESS}:{PORT}") + async with websockets.serve(message, ADDRESS, PORT): + print(f"WebSocket server started on ws://{ADDRESS}:{PORT}") + await asyncio.Event().wait() - # Run both servers concurrently - await asyncio.gather(websocket_server, http_server.serve_forever()) # Run the WebSocket server asyncio.run(main()) From 59f4777d0a17bcbade8e5c4413ec3af93756aba0 Mon Sep 17 00:00:00 2001 From: guanw Date: Sun, 3 Dec 2023 15:09:38 -0500 Subject: [PATCH 05/10] add prod call with client.py --- client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/client.py b/client.py index db8ace2..2d79351 100644 --- a/client.py +++ b/client.py @@ -3,6 +3,7 @@ async def send_message(): uri = "ws://0.0.0.0:8081" # Replace with the actual WebSocket server URI + # uri = "ws://129.213.151.7:8081" async with websockets.connect(uri, ping_timeout=None) as websocket: # Send your actual message From d7ed1e0cacb058e61fdc3b4c675f9817a9fab6b6 Mon Sep 17 00:00:00 2001 From: guanw Date: Mon, 4 Dec 2023 19:24:57 -0500 Subject: [PATCH 06/10] handle websocket close upon model finishes reply --- server/websocket_server.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/server/websocket_server.py b/server/websocket_server.py index 86a43e5..0a4599f 100644 --- a/server/websocket_server.py +++ b/server/websocket_server.py @@ -7,6 +7,20 @@ PORT = 8081 ADDRESS = "0.0.0.0" +async def handle_websocket_close(websocket): + try: + # Your WebSocket handling logic goes here + + # To close the WebSocket connection, you can use the close method + await websocket.close() + + except websockets.exceptions.ConnectionClosedOK: + # Handle the case where the connection is already closed + pass + except Exception as e: + # Handle other exceptions as needed + print(f"Error: {e}") + async def message(websocket, path): print(f"Client connected to path: {path}") websocket_callback_singleton.set_websocket(websocket) @@ -28,6 +42,8 @@ async def message(websocket, path): except websockets.exceptions.ConnectionClosed: print("Connection closed by the client.") + finally: + await handle_websocket_close(websocket) async def main(): async with websockets.serve(message, ADDRESS, PORT): From 8bfe457fc3f954f2bc069ff81ae6df21a47e78a6 Mon Sep 17 00:00:00 2001 From: guanw Date: Tue, 5 Dec 2023 21:22:26 -0500 Subject: [PATCH 07/10] reorganize folder --- TODO.md => experiment/TODO.md | 0 fine_tune.py => experiment/fine_tune.py | 0 llama2_test.py => experiment/llama2_test.py | 0 regular_qa.py => experiment/regular_qa.py | 0 sample.pdf | Bin 1806 -> 0 bytes 5 files changed, 0 insertions(+), 0 deletions(-) rename TODO.md => experiment/TODO.md (100%) rename fine_tune.py => experiment/fine_tune.py (100%) rename llama2_test.py => experiment/llama2_test.py (100%) rename regular_qa.py => experiment/regular_qa.py (100%) delete mode 100644 sample.pdf diff --git a/TODO.md b/experiment/TODO.md similarity index 100% rename from TODO.md rename to experiment/TODO.md diff --git a/fine_tune.py b/experiment/fine_tune.py similarity index 100% rename from fine_tune.py rename to experiment/fine_tune.py diff --git a/llama2_test.py b/experiment/llama2_test.py similarity index 100% rename from llama2_test.py rename to experiment/llama2_test.py diff --git a/regular_qa.py b/experiment/regular_qa.py similarity index 100% rename from regular_qa.py rename to experiment/regular_qa.py diff --git a/sample.pdf b/sample.pdf deleted file mode 100644 index 6c031017c0aedc675c382fa6c6570373fdc1ffd5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1806 zcmY!laBo9EiO|91BLvgEE^krpUk|}#G=fq)D#6neIErw3j;HKe+3JD zR|P|3V@rKMplFDKp{b#zzK24zf~lpEg0X=?teqWKYF-M^1TKYv1YBlte+1>{mnfJ6 zxk37#d1?6y#t^1Uw2g(Ck&~mdqp6{Zk&A(&v8khxvx~WtiJ66kv$=_-rKO#Xfw{T4 zp}C8NqqCcZiKU^dk%gm+xw)x@i<^b1iLs%jU95gEFob|1q90I{S_TXbBLjV()V%bP z3PG~nrs=z7=9Hus>AU45mZZ9*Cg-Q5>O-i2#G>3{U}!t%=alB=6)S)Z zO36$v$uCkcG_tb;x;Z7aBHC2JB33^<+EBqr0Tjl?B}J);xm*=}2pBp8q6`t;P_nE7w>7DK~7)*K8D*=#AG7=h}5#!YcaVo^y&QED0&Fy2*N{oU}! z78fY=Ac+F(Bj?1D#GL$e{eZ;u)MAi}6oL@ZZV2K-6bCC9n_=@gG=NNi-a~jKAt50x zA%QD_f$7*^o&ttcV@5-6Cy^y$J(3c)ByLD(NlP%qF)*iN`eh)K8D(~JS-^t|9Il}7 z1O{qRYF-H>RP}vRQ!*2s@++bZfGW*^L2GVl6ssSUTAW{6l$;7mH*WcPB_PT`!4RYd z7!klU<6M+qfT7D7s1j&$v4Wu?L?JArTB5qbKPfA?Bo~~&-7<@cOB77;#Gxs292%Ph z7zAwCc#(glj>pyV)91A>^6U9(F%~)~wf*wzJi0O=vH!}PO;;wJdE#Pi&9G^bU_EAR zf*l7*K3MV$H02l?m?9+|?I%V73$arPo_|wTWt|qp?c2xAJBQCuwQN7pJZqQCbmD=f~ADaRz&`!-_jsgfz5G z^=*yyO}BC!Ddc^~^@NGxTdL9nEN;aSa7MrY0|lHVo`ADN3AiAG0D~|Cvrt&z@q76A z>t6eQ&PTIv8^bBqqpz%=Pk%O5L*mv=#;F$KOVy^OJbS)VW!jRg9TAd}5|YwVQnMr_ zW=cs#x@Erl{cQPz37a-EGdS1DT*7p;3Ea{2C>TxQrJ(^irJ*UPSj8+24HQgarJ)gO rX~@8+<`$MM@bkCC1Qt#Mqrd63=Re3ZXl1i9Vul2w5HvD0g%*PV5yTAD From 214c8244133b367d3e30046a9784e1bf1f7eb4bd Mon Sep 17 00:00:00 2001 From: guanw Date: Wed, 6 Dec 2023 20:33:12 -0500 Subject: [PATCH 08/10] minor adjustment on params per https://www.reddit.com/r/LocalLLaMA/comments/1343bgz/what_model_parameters_is_everyone_using/ --- server/models/llama2_gguf_model.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/models/llama2_gguf_model.py b/server/models/llama2_gguf_model.py index f0d4bfb..3807038 100644 --- a/server/models/llama2_gguf_model.py +++ b/server/models/llama2_gguf_model.py @@ -11,9 +11,12 @@ llm = LlamaCpp( model_path="./server/models/llama-2-7b.gguf.q4_K_M.bin", - temperature=0.75, + temperature=0.7, + repeat_penalty=1.176, + top_p=0.1, + max_tokens=-1, callback_manager=CallbackManager([websocket_callback_singleton]), - verbose=True, + verbose=False, ) text_chunks = load_content() embeddings=HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2', model_kwargs={'device':'cpu'}) From c8997b1808e3f75364e05e3dbf2f9cac1ba6a7ca Mon Sep 17 00:00:00 2001 From: guanw Date: Thu, 7 Dec 2023 19:44:49 -0500 Subject: [PATCH 09/10] refactor to enable concurrent request (remove singleton callback to avoid leaked semaphore) NOTE: it still needs further perf improvement because we are now init Llama2GGUFModel everytime a new request comes in --- server/callbacks.py | 13 ++------- server/models/llama2_gguf_model.py | 44 ++++++++++++++++-------------- server/websocket_server.py | 19 ++++++------- 3 files changed, 33 insertions(+), 43 deletions(-) diff --git a/server/callbacks.py b/server/callbacks.py index 1e24131..c4f977b 100644 --- a/server/callbacks.py +++ b/server/callbacks.py @@ -1,16 +1,7 @@ from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler class WebsocketCallbackHandler(StreamingStdOutCallbackHandler): - _instance = None - def __new__(cls): - if cls._instance is None: - cls._instance = super(WebsocketCallbackHandler, cls).__new__(cls) - # Initialize the singleton instance here - return cls._instance - - def set_websocket(self, websocket): + def __init__(self, websocket): self.websocket = websocket async def on_llm_new_token(self, token, **kwargs): """Run on new LLM token. Only available when streaming is enabled.""" - await self.websocket.send(token) - -websocket_callback_singleton = WebsocketCallbackHandler() \ No newline at end of file + await self.websocket.send(token) \ No newline at end of file diff --git a/server/models/llama2_gguf_model.py b/server/models/llama2_gguf_model.py index 3807038..9a26222 100644 --- a/server/models/llama2_gguf_model.py +++ b/server/models/llama2_gguf_model.py @@ -1,6 +1,5 @@ from langchain.llms import LlamaCpp from langchain.callbacks.manager import CallbackManager -from callbacks import websocket_callback_singleton from text_loader import load_content from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS @@ -9,32 +8,35 @@ from text_color import TextColor from qa_template import TEMPLATE -llm = LlamaCpp( - model_path="./server/models/llama-2-7b.gguf.q4_K_M.bin", - temperature=0.7, - repeat_penalty=1.176, - top_p=0.1, - max_tokens=-1, - callback_manager=CallbackManager([websocket_callback_singleton]), - verbose=False, -) -text_chunks = load_content() -embeddings=HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2', model_kwargs={'device':'cpu'}) -vector_store=FAISS.from_documents(text_chunks, embeddings) -print(TextColor.BLUE + "convert text chunks into embeddings!" + TextColor.RESET) - -# create chain -qa_prompt=PromptTemplate(template=TEMPLATE, input_variables=['context', 'question']) -print(TextColor.BLUE + "create q&a template!" + TextColor.RESET) class Llama2GGUFModel: + def __init__(self, callback): + # TODO refactor __init__ and swap out callbackManager to make init model faster + callback_manager = CallbackManager([callback]) + self.llm = LlamaCpp( + model_path="./server/models/llama-2-7b.gguf.q4_K_M.bin", + temperature=0.7, + repeat_penalty=1.176, + top_p=0.1, + max_tokens=-1, + callback_manager=callback_manager, + verbose=False, + ) + text_chunks = load_content() + embeddings=HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2', model_kwargs={'device':'cpu'}) + self.vector_store=FAISS.from_documents(text_chunks, embeddings) + print(TextColor.BLUE + "convert text chunks into embeddings!" + TextColor.RESET) + + # create chain + self.qa_prompt=PromptTemplate(template=TEMPLATE, input_variables=['context', 'question']) + print(TextColor.BLUE + "create q&a template!" + TextColor.RESET) def is_matched(self, text): return "@gguf" in text.lower() def execute_action(self, text): text = text.replace("@gguf", "") - llama2_chain = RetrievalQA.from_chain_type(llm=llm, + llama2_chain = RetrievalQA.from_chain_type(llm=self.llm, chain_type='stuff', - retriever=vector_store.as_retriever(search_kwargs={'k': 2}), + retriever=self.vector_store.as_retriever(search_kwargs={'k': 2}), return_source_documents=True, - chain_type_kwargs={'prompt': qa_prompt}) + chain_type_kwargs={'prompt': self.qa_prompt}) llama2_chain({'query': text}) \ No newline at end of file diff --git a/server/websocket_server.py b/server/websocket_server.py index 0a4599f..7078700 100644 --- a/server/websocket_server.py +++ b/server/websocket_server.py @@ -1,6 +1,6 @@ import asyncio import websockets -from callbacks import websocket_callback_singleton +from callbacks import WebsocketCallbackHandler from models.llama2_gguf_model import Llama2GGUFModel from remindme_parser import Remindme @@ -9,9 +9,6 @@ async def handle_websocket_close(websocket): try: - # Your WebSocket handling logic goes here - - # To close the WebSocket connection, you can use the close method await websocket.close() except websockets.exceptions.ConnectionClosedOK: @@ -23,19 +20,19 @@ async def handle_websocket_close(websocket): async def message(websocket, path): print(f"Client connected to path: {path}") - websocket_callback_singleton.set_websocket(websocket) + callback = WebsocketCallbackHandler(websocket) try: async for message in websocket: print(f"Received message from client: {message}") r = Remindme() if r.is_matched(message): - resp = r.execute_action(message) + resp = await asyncio.to_thread(r.execute_action, message) await websocket.send(f'remindme executed: {resp}') return - gguf_model = Llama2GGUFModel() + gguf_model = Llama2GGUFModel(callback) if gguf_model.is_matched(message): - gguf_model.execute_action(message) + await asyncio.to_thread(gguf_model.execute_action, message) return await websocket.send('voided conversation with no match') @@ -47,9 +44,9 @@ async def message(websocket, path): async def main(): async with websockets.serve(message, ADDRESS, PORT): + await asyncio.Future() print(f"WebSocket server started on ws://{ADDRESS}:{PORT}") - await asyncio.Event().wait() - # Run the WebSocket server -asyncio.run(main()) +if __name__ == "__main__": + asyncio.run(main()) From 1132a165f9689e83ab0da57b01fe5ab1bf1e7901 Mon Sep 17 00:00:00 2001 From: guanw Date: Thu, 7 Dec 2023 19:58:17 -0500 Subject: [PATCH 10/10] enable model to swap callback_handler thus improve perf of model loading for each request --- server/models/llama2_gguf_model.py | 8 +++++--- server/websocket_server.py | 9 ++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/server/models/llama2_gguf_model.py b/server/models/llama2_gguf_model.py index 9a26222..b1dbc9c 100644 --- a/server/models/llama2_gguf_model.py +++ b/server/models/llama2_gguf_model.py @@ -10,16 +10,16 @@ class Llama2GGUFModel: - def __init__(self, callback): + def __init__(self): # TODO refactor __init__ and swap out callbackManager to make init model faster - callback_manager = CallbackManager([callback]) + self.callback_manager = CallbackManager([]) self.llm = LlamaCpp( model_path="./server/models/llama-2-7b.gguf.q4_K_M.bin", temperature=0.7, repeat_penalty=1.176, top_p=0.1, max_tokens=-1, - callback_manager=callback_manager, + callback_manager=self.callback_manager, verbose=False, ) text_chunks = load_content() @@ -30,6 +30,8 @@ def __init__(self, callback): # create chain self.qa_prompt=PromptTemplate(template=TEMPLATE, input_variables=['context', 'question']) print(TextColor.BLUE + "create q&a template!" + TextColor.RESET) + def update_callback_handler(self, callback_handler): + self.callback_manager.set_handler(callback_handler) def is_matched(self, text): return "@gguf" in text.lower() def execute_action(self, text): diff --git a/server/websocket_server.py b/server/websocket_server.py index 7078700..3111f54 100644 --- a/server/websocket_server.py +++ b/server/websocket_server.py @@ -18,19 +18,22 @@ async def handle_websocket_close(websocket): # Handle other exceptions as needed print(f"Error: {e}") +r = Remindme() +gguf_model = Llama2GGUFModel() async def message(websocket, path): print(f"Client connected to path: {path}") - callback = WebsocketCallbackHandler(websocket) + try: async for message in websocket: print(f"Received message from client: {message}") - r = Remindme() + if r.is_matched(message): resp = await asyncio.to_thread(r.execute_action, message) await websocket.send(f'remindme executed: {resp}') return - gguf_model = Llama2GGUFModel(callback) + callback_handler = WebsocketCallbackHandler(websocket) + gguf_model.update_callback_handler(callback_handler) if gguf_model.is_matched(message): await asyncio.to_thread(gguf_model.execute_action, message) return