Skip to content

Commit

Permalink
Capture futures while running in background
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui committed Jun 6, 2024
1 parent 27f04d1 commit 3508936
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 21 deletions.
59 changes: 38 additions & 21 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,8 @@ PyDefaultArgumentToMutableType(const py::object& argument)
void
AsyncEventFutureDoneCallback(const py::object& py_future)
{
// TODO: Why using `py_future.result()` with error hangs on exit?
try {
py::object exception = py_future.attr("exception")();
if (!py::isinstance<py::none>(exception)) {
std::string err_msg = "";
py::object traceback = py::module_::import("traceback")
.attr("TracebackException")
.attr("from_exception")(exception)
.attr("format")();
for (py::handle line : traceback) {
err_msg += py::str(line);
}
LOG_ERROR << err_msg;
}
}
catch (const PythonBackendException& pb_exception) {
LOG_ERROR << pb_exception.what();
}
catch (const py::error_already_set& error) {
LOG_ERROR << error.what();
}
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
stub->BackgroundFutureDone(py_future);
}

void
Expand Down Expand Up @@ -556,6 +537,7 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());

async_event_loop_ = py::none();
background_futures_ = py::module_::import("builtins").attr("set")();

py::object TritonPythonModel = sys.attr("TritonPythonModel");
deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor");
Expand Down Expand Up @@ -838,11 +820,45 @@ Stub::RunCoroutine(py::object coroutine, bool in_background)
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
background_futures_.attr("add")(py_future);
return py::none();
}
return py_future.attr("result")();
}

void
Stub::BackgroundFutureDone(const py::object& py_future)
{
// TODO: Why using `py_future.result()` with error hangs on exit?
try {
py::object exception = py_future.attr("exception")();
if (!py::isinstance<py::none>(exception)) {
std::string err_msg = "";
py::object traceback = py::module_::import("traceback")
.attr("TracebackException")
.attr("from_exception")(exception)
.attr("format")();
for (py::handle line : traceback) {
err_msg += py::str(line);
}
LOG_ERROR << err_msg;
}
}
catch (const PythonBackendException& pb_exception) {
LOG_ERROR << pb_exception.what();
}
catch (const py::error_already_set& error) {
LOG_ERROR << error.what();
}
// Remove future from background
try {
background_futures_.attr("remove")(py_future);
}
catch (const py::error_already_set& error) {
LOG_ERROR << "Cannot remove future from background; " << error.what();
}
}

void
Stub::UpdateHealth()
{
Expand Down Expand Up @@ -923,6 +939,7 @@ Stub::~Stub()
{
py::gil_scoped_acquire acquire;
async_event_loop_ = py::none();
background_futures_ = py::none();
model_instance_ = py::none();
}
stub_instance_.reset();
Expand Down
3 changes: 3 additions & 0 deletions src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ class Stub {

py::object RunCoroutine(py::object coroutine, bool in_background);

void BackgroundFutureDone(const py::object& py_future);

/// Get the memory manager message queue
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();

Expand Down Expand Up @@ -367,6 +369,7 @@ class Stub {
py::object deserialize_bytes_;
py::object serialize_bytes_;
py::object async_event_loop_;
py::object background_futures_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
stub_message_queue_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
Expand Down

0 comments on commit 3508936

Please sign in to comment.