Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chat: add wakeup timer support #1290

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "function",
"function": {
"name": "delete_wakeup_timers",
"description": "Delete all active wakeup timers. You can optionally provide a message parameter to filter which timers will be deleted based on their message. When specifying the message parameter, you can use regular expressions (regex) to match patterns within the timer messages. This is useful when you want to delete timers with specific keywords or patterns in their message. For example, to delete all timers containing the word 'hello', you can use the regex '.*hello.*', where the dot-star (.*) pattern matches any character sequence.",
"parameters": {
"type": "object",
"properties": {
"message": {"type": "string", "description": "wakeup message of timers to be deleted. regex values are accepted."}
},
"required": []
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "function",
"function": {
"name": "get_wakeup_timers",
"description": "Retrieves a list of all active wakeup timers. You can optionally provide a message parameter to filter timers by their associated messages. When specifying the message parameter, you can use regular expressions (regex) to match patterns within the timer messages. This is useful when you want to find timers with specific keywords or patterns in their messages. For example, to retrieve all timers containing the word 'hello', you can use the regex '.*hello.*', where the dot-star (.*) pattern matches any character sequence.",
"parameters": {
"type": "object",
"properties": {
"message": {"type": "string", "description": "wakeup message of timers to be retrieved. regex values are accepted."}
},
"required": []
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"type": "function",
"function": {
"name": "set_wakeup_timer",
"description": "Set a timer to wake you up in a specified number of seconds in the future. This allows taking actions in the future. The wakeup message will appear with the user role but will look something like WAKEUP:<message>. Multiple wakeup messages are supported",
"parameters": {
"type": "object",
"properties": {
"seconds": {"type": "number", "description": "number of seconds in the future that the timer will wake you up"},
"message": {"type": "string", "description": "wakeup message that will be sent to you"}
},
"required": ["seconds", "message"]
}
}
}
263 changes: 202 additions & 61 deletions MAVProxy/modules/mavproxy_chat/chat_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pymavlink import mavutil
import time, re
from datetime import datetime
from threading import Thread, Lock
import json
import math

Expand All @@ -30,6 +31,14 @@ def __init__(self, mpstate, status_cb=None, wait_for_command_ack_fn=None):
# keep reference to wait_for_command_ack_fn
self.wait_for_command_ack_fn = wait_for_command_ack_fn

# lock to prevent multiple threads sending text to the assistant at the same time
self.send_lock = Lock()

# wakeup timer array
self.wakeup_schedule = []
self.thread = Thread(target=self.check_wakeup_timers)
self.thread.start()

# initialise OpenAI connection
self.client = None
self.assistant = None
Expand Down Expand Up @@ -87,71 +96,74 @@ def set_api_key(self, api_key_str):

# send text to assistant
def send_to_assistant(self, text):
# check connection
if not self.check_connection():
return "chat: failed to connect to OpenAI"

# create a new message
input_message = self.client.beta.threads.messages.create(
thread_id=self.assistant_thread.id,
role="user",
content=text
)
if input_message is None:
return "chat: failed to create input message"

# create a run
self.run = self.client.beta.threads.runs.create(
thread_id=self.assistant_thread.id,
assistant_id=self.assistant.id
)
if self.run is None:
return "chat: failed to create run"

# wait for run to complete
run_done = False
while not run_done:
# wait for one second
time.sleep(0.1)
# get lock
with self.send_lock:

# check connection
if not self.check_connection():
return "chat: failed to connect to OpenAI"

# retrieve the run
latest_run = self.client.beta.threads.runs.retrieve(
# create a new message
input_message = self.client.beta.threads.messages.create(
thread_id=self.assistant_thread.id,
run_id=self.run.id
role="user",
content=text
)
if input_message is None:
return "chat: failed to create input message"

# check run status
if latest_run.status in ["queued", "in_progress", "cancelling"]:
run_done = False
elif latest_run.status in ["cancelled", "failed", "completed", "expired"]:
run_done = True
elif latest_run.status in ["requires_action"]:
self.handle_function_call(latest_run)
run_done = False
else:
print("chat: unrecognised run status" + latest_run.status)
run_done = True

# send status to status callback
self.send_status(latest_run.status)

# retrieve messages on the thread
reply_messages = self.client.beta.threads.messages.list(self.assistant_thread.id, order = "asc", after=input_message.id)
if reply_messages is None:
return "chat: failed to retrieve messages"

# concatenate all messages into a single reply skipping the first which is our question
reply = ""
need_newline = False
for message in reply_messages.data:
reply = reply + message.content[0].text.value
if need_newline:
reply = reply + "\n"
need_newline = True

if reply is None or reply == "":
return "chat: failed to retrieve latest reply"
return reply
# create a run
self.run = self.client.beta.threads.runs.create(
thread_id=self.assistant_thread.id,
assistant_id=self.assistant.id
)
if self.run is None:
return "chat: failed to create run"

# wait for run to complete
run_done = False
while not run_done:
# wait for one second
time.sleep(0.1)

# retrieve the run
latest_run = self.client.beta.threads.runs.retrieve(
thread_id=self.assistant_thread.id,
run_id=self.run.id
)

# check run status
if latest_run.status in ["queued", "in_progress", "cancelling"]:
run_done = False
elif latest_run.status in ["cancelled", "failed", "completed", "expired"]:
run_done = True
elif latest_run.status in ["requires_action"]:
self.handle_function_call(latest_run)
run_done = False
else:
print("chat: unrecognised run status" + latest_run.status)
run_done = True

# send status to status callback
self.send_status(latest_run.status)

# retrieve messages on the thread
reply_messages = self.client.beta.threads.messages.list(self.assistant_thread.id, order = "asc", after=input_message.id)
if reply_messages is None:
return "chat: failed to retrieve messages"

# concatenate all messages into a single reply skipping the first which is our question
reply = ""
need_newline = False
for message in reply_messages.data:
reply = reply + message.content[0].text.value
if need_newline:
reply = reply + "\n"
need_newline = True

if reply is None or reply == "":
return "chat: failed to retrieve latest reply"
return reply

# handle function call request from assistant
# on success this returns the text response that should be sent to the assistant, returns None on failure
Expand Down Expand Up @@ -271,6 +283,36 @@ def handle_function_call(self, run):
output = "set_parameter: failed to set parameter value"
print("chat: set_parameter: failed to set parameter value")

# set a wakeup timer
if tool_call.function.name == "set_wakeup_timer":
recognised_function = True
try:
arguments = json.loads(tool_call.function.arguments)
output = self.set_wakeup_timer(arguments)
except:
output = tool_call.function.name + ": failed"
print("chat: " + output)

# get wakeup timers
if tool_call.function.name == "get_wakeup_timers":
recognised_function = True
try:
arguments = json.loads(tool_call.function.arguments)
output = self.get_wakeup_timers(arguments)
except:
output = tool_call.function.name + ": failed"
print("chat: " + output)

# delete wakeup timers
if tool_call.function.name == "delete_wakeup_timers":
recognised_function = True
try:
arguments = json.loads(tool_call.function.arguments)
output = self.delete_wakeup_timers(arguments)
except:
output = tool_call.function.name + ": failed"
print("chat: " + output)

if not recognised_function:
print("chat: handle_function_call: unrecognised function call: " + tool_call.function.name)
output = "unrecognised function call: " + tool_call.function.name
Expand Down Expand Up @@ -538,6 +580,105 @@ def set_parameter(self, arguments):
self.mpstate.functions.param_set(param_name, param_value, retries=3)
return "set_parameter: parameter value set"

# set a wakeup timer
def set_wakeup_timer(self, arguments):
# check required arguments are specified
seconds = arguments.get("seconds", -1)
if seconds < 0:
return "set_wakeup_timer: seconds not specified"
message = arguments.get("message", None)
if message is None:
return "set_wakeup_timer: message not specified"

# add timer to wakeup schedule
self.wakeup_schedule.append({"time": time.time() + seconds, "message": message})
return "set_wakeup_timer: wakeup timer set"

# get wake timers
def get_wakeup_timers(self, arguments):
# check message argument, default to None meaning all
message = arguments.get("message", None)

# prepare list of matching timers
matching_timers = []

# handle simple case of all timers
if message is None:
matching_timers = self.wakeup_schedule

# handle regex in message
elif self.contains_regex(message):
message_pattern = re.compile(message, re.IGNORECASE)
for wakeup_timer in self.wakeup_schedule:
if message_pattern.match(wakeup_timer["message"]) is not None:
matching_timers.append(wakeup_timer)

# handle case of a specific message
else:
for wakeup_timer in self.wakeup_schedule:
if wakeup_timer["message"] == message:
matching_timers.append(wakeup_timer)

# return matching timers
try:
return json.dumps(matching_timers)
except:
return "get_wakeup_timers: failed to convert wakeup timer list to json"

# delete wake timers
def delete_wakeup_timers(self, arguments):
# check message argument, default to all
message = arguments.get("message", None)

# find matching timers
num_timers_deleted = 0

# handle simple case of deleting all timers
if message is None:
num_timers_deleted = len(self.wakeup_schedule)
self.wakeup_schedule.clear()

# handle regex in message
elif self.contains_regex(message):
message_pattern = re.compile(message, re.IGNORECASE)
for wakeup_timer in self.wakeup_schedule:
if message_pattern.match(wakeup_timer["message"]) is not None:
num_timers_deleted = num_timers_deleted + 1
self.wakeup_schedule.remove(wakeup_timer)
else:
# handle simple case of a single message
for wakeup_timer in self.wakeup_schedule:
if wakeup_timer["message"] == message:
num_timers_deleted = num_timers_deleted + 1
self.wakeup_schedule.remove(wakeup_timer)

# return number deleted and remaining
return "delete_wakeup_timers: deleted " + str(num_timers_deleted) + " timers, " + str(len(self.wakeup_schedule)) + " remaining"

# check if any wakeup timers have expired and send messages if they have
# this function never returns so it should be called from a new thread
def check_wakeup_timers(self):
while True:
# wait for one second
time.sleep(1)

# check if any timers are set
if len(self.wakeup_schedule) == 0:
continue

# get current time
now = time.time()

# check if any timers have expired
for wakeup_timer in self.wakeup_schedule:
if now >= wakeup_timer["time"]:
# send message to assistant
message = "WAKEUP:" + wakeup_timer["message"]
self.send_to_assistant(message)

# remove from wakeup schedule
self.wakeup_schedule.remove(wakeup_timer)

# wrap latitude to range -90 to 90
def wrap_latitude(self, latitude_deg):
if latitude_deg > 90:
Expand Down
Loading