Skip to content

Commit

Permalink
chat: move lock on sending to openai file
Browse files Browse the repository at this point in the history
  • Loading branch information
rmackay9 committed Dec 27, 2023
1 parent 578bc55 commit 3313e63
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 92 deletions.
130 changes: 68 additions & 62 deletions MAVProxy/modules/mavproxy_chat/chat_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pymavlink import mavutil
import time, re
from datetime import datetime
from threading import Thread
from threading import Thread, Lock
import json
import math

Expand All @@ -31,6 +31,9 @@ def __init__(self, mpstate, status_cb=None, wait_for_command_ack_fn=None):
# keep reference to wait_for_command_ack_fn
self.wait_for_command_ack_fn = wait_for_command_ack_fn

# lock to prevent multiple threads sending text to the assistant at the same time
self.send_lock = Lock()

# wakeup timer array
self.wakeup_schedule = []
self.thread = Thread(target=self.check_wakeup_timers)
Expand Down Expand Up @@ -93,71 +96,74 @@ def set_api_key(self, api_key_str):

# send text to assistant
def send_to_assistant(self, text):
# check connection
if not self.check_connection():
return "chat: failed to connect to OpenAI"

# create a new message
input_message = self.client.beta.threads.messages.create(
thread_id=self.assistant_thread.id,
role="user",
content=text
)
if input_message is None:
return "chat: failed to create input message"

# create a run
self.run = self.client.beta.threads.runs.create(
thread_id=self.assistant_thread.id,
assistant_id=self.assistant.id
)
if self.run is None:
return "chat: failed to create run"

# wait for run to complete
run_done = False
while not run_done:
# wait for one second
time.sleep(0.1)
# get lock
with self.send_lock:

# check connection
if not self.check_connection():
return "chat: failed to connect to OpenAI"

# retrieve the run
latest_run = self.client.beta.threads.runs.retrieve(
# create a new message
input_message = self.client.beta.threads.messages.create(
thread_id=self.assistant_thread.id,
run_id=self.run.id
role="user",
content=text
)
if input_message is None:
return "chat: failed to create input message"

# check run status
if latest_run.status in ["queued", "in_progress", "cancelling"]:
run_done = False
elif latest_run.status in ["cancelled", "failed", "completed", "expired"]:
run_done = True
elif latest_run.status in ["requires_action"]:
self.handle_function_call(latest_run)
run_done = False
else:
print("chat: unrecognised run status" + latest_run.status)
run_done = True

# send status to status callback
self.send_status(latest_run.status)

# retrieve messages on the thread
reply_messages = self.client.beta.threads.messages.list(self.assistant_thread.id, order = "asc", after=input_message.id)
if reply_messages is None:
return "chat: failed to retrieve messages"

# concatenate all messages into a single reply skipping the first which is our question
reply = ""
need_newline = False
for message in reply_messages.data:
reply = reply + message.content[0].text.value
if need_newline:
reply = reply + "\n"
need_newline = True

if reply is None or reply == "":
return "chat: failed to retrieve latest reply"
return reply
# create a run
self.run = self.client.beta.threads.runs.create(
thread_id=self.assistant_thread.id,
assistant_id=self.assistant.id
)
if self.run is None:
return "chat: failed to create run"

# wait for run to complete
run_done = False
while not run_done:
# wait for one second
time.sleep(0.1)

# retrieve the run
latest_run = self.client.beta.threads.runs.retrieve(
thread_id=self.assistant_thread.id,
run_id=self.run.id
)

# check run status
if latest_run.status in ["queued", "in_progress", "cancelling"]:
run_done = False
elif latest_run.status in ["cancelled", "failed", "completed", "expired"]:
run_done = True
elif latest_run.status in ["requires_action"]:
self.handle_function_call(latest_run)
run_done = False
else:
print("chat: unrecognised run status" + latest_run.status)
run_done = True

# send status to status callback
self.send_status(latest_run.status)

# retrieve messages on the thread
reply_messages = self.client.beta.threads.messages.list(self.assistant_thread.id, order = "asc", after=input_message.id)
if reply_messages is None:
return "chat: failed to retrieve messages"

# concatenate all messages into a single reply skipping the first which is our question
reply = ""
need_newline = False
for message in reply_messages.data:
reply = reply + message.content[0].text.value
if need_newline:
reply = reply + "\n"
need_newline = True

if reply is None or reply == "":
return "chat: failed to retrieve latest reply"
return reply

# handle function call request from assistant
# on success this returns the text response that should be sent to the assistant, returns None on failure
Expand Down
55 changes: 25 additions & 30 deletions MAVProxy/modules/mavproxy_chat/chat_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@

from MAVProxy.modules.lib.wx_loader import wx
from MAVProxy.modules.mavproxy_chat import chat_openai, chat_voice_to_text
from threading import Thread, Lock
from threading import Thread

class chat_window():
def __init__(self, mpstate, wait_for_command_ack_fn):
# keep reference to mpstate
self.mpstate = mpstate

# lock to prevent multiple threads sending text to the assistant at the same time
self.send_lock = Lock()

# create chat_openai object
self.chat_openai = chat_openai.chat_openai(self.mpstate, self.set_status_text, wait_for_command_ack_fn)

Expand Down Expand Up @@ -150,32 +147,30 @@ def text_input_change(self, event):

# send text to assistant. should be called from a separate thread to avoid blocking
def send_text_to_assistant(self):
# get lock
with self.send_lock:
# disable buttons and text input to stop multiple inputs (can't be done from a thread or must use CallAfter)
wx.CallAfter(self.record_button.Disable)
wx.CallAfter(self.text_input.Disable)
wx.CallAfter(self.send_button.Disable)

# get text from text input and clear text input
send_text = self.text_input.GetValue()
wx.CallAfter(self.text_input.Clear)

# copy user input text to reply box
orig_text_attr = self.text_reply.GetDefaultStyle()
wx.CallAfter(self.text_reply.SetDefaultStyle, wx.TextAttr(wx.RED))
wx.CallAfter(self.text_reply.AppendText, send_text + "\n")

# send text to assistant and place reply in reply box
reply = self.chat_openai.send_to_assistant(send_text)
if reply:
wx.CallAfter(self.text_reply.SetDefaultStyle, orig_text_attr)
wx.CallAfter(self.text_reply.AppendText, reply + "\n\n")

# reenable buttons and text input (can't be done from a thread or must use CallAfter)
wx.CallAfter(self.record_button.Enable)
wx.CallAfter(self.text_input.Enable)
wx.CallAfter(self.send_button.Enable)
# disable buttons and text input to stop multiple inputs (can't be done from a thread or must use CallAfter)
wx.CallAfter(self.record_button.Disable)
wx.CallAfter(self.text_input.Disable)
wx.CallAfter(self.send_button.Disable)

# get text from text input and clear text input
send_text = self.text_input.GetValue()
wx.CallAfter(self.text_input.Clear)

# copy user input text to reply box
orig_text_attr = self.text_reply.GetDefaultStyle()
wx.CallAfter(self.text_reply.SetDefaultStyle, wx.TextAttr(wx.RED))
wx.CallAfter(self.text_reply.AppendText, send_text + "\n")

# send text to assistant and place reply in reply box
reply = self.chat_openai.send_to_assistant(send_text)
if reply:
wx.CallAfter(self.text_reply.SetDefaultStyle, orig_text_attr)
wx.CallAfter(self.text_reply.AppendText, reply + "\n\n")

# reenable buttons and text input (can't be done from a thread or must use CallAfter)
wx.CallAfter(self.record_button.Enable)
wx.CallAfter(self.text_input.Enable)
wx.CallAfter(self.send_button.Enable)

# set status text
def set_status_text(self, text):
Expand Down

0 comments on commit 3313e63

Please sign in to comment.