Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Models should filter outputs based on requested outputs #366

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ InferRequest::InferRequest(
}
}

inputs_ = inputs;
requested_output_names_ = requested_output_names;
inputs_ = inputs; // TODO: do we need this?
Tabrizian marked this conversation as resolved.
Show resolved Hide resolved
requested_output_names_ = requested_output_names; // TODO: do we need this?
#ifdef TRITON_PB_STUB
pb_cancel_ =
std::make_shared<PbCancel>(response_factory_address_, request_address_);
response_sender_ = std::make_shared<ResponseSender>(
request_address_, response_factory_address_, nullptr /* is_decoupled */,
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
RequestedOutputNames(), Stub::GetOrCreateInstance()->SharedMemory(),
pb_cancel_);
#endif
}

Expand Down Expand Up @@ -390,7 +391,8 @@ InferRequest::InferRequest(
std::make_shared<PbCancel>(response_factory_address_, request_address_);
response_sender_ = std::make_shared<ResponseSender>(
request_address_, response_factory_address_, is_model_decoupled,
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
RequestedOutputNames(), Stub::GetOrCreateInstance()->SharedMemory(),
pb_cancel_);
#endif
}

Expand Down
34 changes: 27 additions & 7 deletions src/response_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ CheckResponseSenderArguments(

ResponseSender::ResponseSender(
intptr_t request_address, intptr_t response_factory_address,
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
bool const* is_decoupled,
const std::set<std::string>& requested_output_names,
std::unique_ptr<SharedMemoryManager>& shm_pool,
const std::shared_ptr<PbCancel>& pb_cancel)
: request_address_(request_address),
response_factory_address_(response_factory_address),
is_decoupled_(is_decoupled), shm_pool_(shm_pool), pb_cancel_(pb_cancel),
closed_(false), number_of_response_sent_(0)
is_decoupled_(is_decoupled),
requested_output_names_(requested_output_names), shm_pool_(shm_pool),
pb_cancel_(pb_cancel), closed_(false), number_of_response_sent_(0)
{
}

Expand All @@ -71,17 +74,23 @@ ResponseSender::~ResponseSender()
PYTHONSTUB_DecoupledResponseFactoryCleanup);
}

void
ResponseSender::UpdateStateAndCounters(
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
bool
ResponseSender::IsDecoupled() const
{
if (is_decoupled_ == nullptr) {
// TODO: Can a model access the response sender on a BLS infer request?
throw PythonBackendException(
"Unable to send response. Response sender has no reference to the "
"decoupled state of the model.");
}
bool is_decoupled = *is_decoupled_;
return *is_decoupled_;
}

void
ResponseSender::UpdateStateAndCounters(
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
{
bool is_decoupled = IsDecoupled();

std::lock_guard<std::mutex> lk(mu_);

Expand Down Expand Up @@ -110,6 +119,16 @@ ResponseSender::UpdateStateAndCounters(
number_of_response_sent_++;
}

void
ResponseSender::PruneNonRequestedOutputs(
const std::shared_ptr<InferResponse>& infer_response) const
{
// TODO: should this be limited to non decoupled only?
if (!IsDecoupled() && infer_response) {
kthui marked this conversation as resolved.
Show resolved Hide resolved
infer_response->PruneOutputTensors(requested_output_names_);
}
}

void
ResponseSender::Send(
std::shared_ptr<InferResponse> infer_response, const uint32_t flags)
Expand All @@ -123,6 +142,7 @@ ResponseSender::Send(

CheckResponseSenderArguments(infer_response, flags);
UpdateStateAndCounters(infer_response, flags);
PruneNonRequestedOutputs(infer_response);

std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();

Expand Down
8 changes: 7 additions & 1 deletion src/response_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class ResponseSender {
public:
ResponseSender(
intptr_t request_address, intptr_t response_factory_address,
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
bool const* is_decoupled,
const std::set<std::string>& requested_output_names,
std::unique_ptr<SharedMemoryManager>& shm_pool,
const std::shared_ptr<PbCancel>& pb_cancel);
~ResponseSender();
void Send(std::shared_ptr<InferResponse> response, const uint32_t flags);
Expand All @@ -48,12 +50,16 @@ class ResponseSender {
void Close();

private:
bool IsDecoupled() const;
void UpdateStateAndCounters(
const std::shared_ptr<InferResponse>& response, const uint32_t flags);
void PruneNonRequestedOutputs(
const std::shared_ptr<InferResponse>& infer_response) const;

intptr_t request_address_;
intptr_t response_factory_address_;
bool const* is_decoupled_;
std::set<std::string> requested_output_names_;
std::unique_ptr<SharedMemoryManager>& shm_pool_;
std::shared_ptr<PbCancel> pb_cancel_;

Expand Down
Loading