Skip to content

Commit

Permalink
initial RuntimeInformation support for service queries
Browse files Browse the repository at this point in the history
  • Loading branch information
UNEXENU committed Feb 12, 2025
1 parent 2fad765 commit e4490ce
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 85 deletions.
20 changes: 13 additions & 7 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,8 @@ void Operation::updateRuntimeInformationOnSuccess(
// Therefore, for each child of this operation the correct runtime is
// available.
_runtimeInfo->children_.clear();
for (auto* child : getChildren()) {
AD_CONTRACT_CHECK(child);
_runtimeInfo->children_.push_back(
child->getRootOperation()->getRuntimeInfoPointer());
for (auto child : getRuntimeInfoChildren()) {
_runtimeInfo->children_.push_back(child);
}
}
signalQueryUpdate();
Expand Down Expand Up @@ -470,8 +468,8 @@ void Operation::updateRuntimeInformationWhenOptimizedOut(
// _______________________________________________________________________
void Operation::updateRuntimeInformationOnFailure(Milliseconds duration) {
_runtimeInfo->children_.clear();
for (auto child : getChildren()) {
_runtimeInfo->children_.push_back(child->getRootOperation()->_runtimeInfo);
for (auto child : getRuntimeInfoChildren()) {
_runtimeInfo->children_.push_back(child);
}

_runtimeInfo->totalTime_ = duration;
Expand Down Expand Up @@ -598,7 +596,6 @@ const vector<ColumnIndex>& Operation::getResultSortedOn() const {
}

// _____________________________________________________________________________

void Operation::signalQueryUpdate() const {
if (_executionContext && _executionContext->areWebsocketUpdatesEnabled()) {
_executionContext->signalQueryUpdate(*_rootRuntimeInfo);
Expand All @@ -625,3 +622,12 @@ uint64_t Operation::getSizeEstimate() {
return getSizeEstimateBeforeLimit();
}
}

// _____________________________________________________________________________
cppcoro::generator<std::shared_ptr<RuntimeInformation>>
Operation::getRuntimeInfoChildren() {
for (auto child : getChildren()) {
AD_CONTRACT_CHECK(child);
co_yield child->getRootOperation()->getRuntimeInfoPointer();
}
}
5 changes: 5 additions & 0 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Operation {
return {interm.begin(), interm.end()};
}

// Get access to the children's RuntimeInfo. Required for the `Service`, as
// it's children can't be accessed using `getChildren()` above.
virtual cppcoro::generator<std::shared_ptr<RuntimeInformation>>
getRuntimeInfoChildren();

// recursively collect all Warnings generated by all descendants
vector<string> collectWarnings() const;

Expand Down
81 changes: 81 additions & 0 deletions src/engine/RuntimeInformation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,29 @@ std::string_view RuntimeInformation::toString(Status status) {
AD_FAIL();
}

// __________________________________________________________________________
RuntimeInformation::Status RuntimeInformation::fromString(
std::string_view str) {
if (str == "fully materialized") {
return fullyMaterialized;
} else if (str == "lazily materialized") {
return lazilyMaterialized;
} else if (str == "in progress") {
return inProgress;

Check warning on line 196 in src/engine/RuntimeInformation.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/RuntimeInformation.cpp#L196

Added line #L196 was not covered by tests
} else if (str == "not started") {
return notStarted;
} else if (str == "optimized out") {
return optimizedOut;
} else if (str == "failed") {
return failed;
} else if (str == "failed because child failed") {
return failedBecauseChildFailed;
} else if (str == "cancelled") {
return cancelled;
}

Check warning on line 207 in src/engine/RuntimeInformation.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/RuntimeInformation.cpp#L206-L207

Added lines #L206 - L207 were not covered by tests
AD_FAIL();
}

// ________________________________________________________________________________________________________________
void to_json(nlohmann::ordered_json& j,
const std::shared_ptr<RuntimeInformation>& rti) {
Expand Down Expand Up @@ -220,6 +243,64 @@ void to_json(nlohmann::ordered_json& j,
{"time_query_planning", rti.timeQueryPlanning.count()}};
}

// __________________________________________________________________________
void from_json(const nlohmann::json& j, RuntimeInformation& rti) {
// Helper lambdas to ignore missing key or invalid value.
auto tryGet = [&j]<typename T>(T& dst, std::string_view key) {
try {
j.at(key).get_to(dst);
} catch (const nlohmann::json::exception& e) {
}
};
using namespace std::chrono;
auto tryGetTime = [&j](microseconds& dst, std::string_view key) {
try {
dst =
duration_cast<microseconds>(milliseconds(j.at(key).get<uint64_t>()));
} catch (const nlohmann::json::exception& e) {
}
};

auto cacheStatusFromString = [](std::string_view str) {
using ad_utility::CacheStatus;
if (str == "cached_not_pinned") {
return CacheStatus::cachedNotPinned;

Check warning on line 267 in src/engine/RuntimeInformation.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/RuntimeInformation.cpp#L267

Added line #L267 was not covered by tests
} else if (str == "cached_pinned") {
return CacheStatus::cachedPinned;
} else if (str == "computed") {
return CacheStatus::computed;
} else if (str == "not_in_cache_not_computed") {
return CacheStatus::notInCacheAndNotComputed;
} else {
throw std::runtime_error(
"Unknown string value was encountered in `fromString(CacheStatus)`");
}

Check warning on line 277 in src/engine/RuntimeInformation.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/RuntimeInformation.cpp#L273-L277

Added lines #L273 - L277 were not covered by tests
};

tryGet(rti.descriptor_, "description");
tryGet(rti.numRows_, "result_rows");
tryGet(rti.numCols_, "result_cols");
tryGet(rti.columnNames_, "column_names");
tryGetTime(rti.totalTime_, "total_time");
tryGetTime(rti.originalTotalTime_, "original_total_time");
tryGetTime(rti.originalOperationTime_, "original_operation_time");
if (auto it = j.find("cache_status"); it != j.end()) {
rti.cacheStatus_ = cacheStatusFromString(it->get<std::string_view>());
}
tryGet(rti.details_, "details");
tryGet(rti.costEstimate_, "estimated_total_cost");
tryGet(rti.multiplicityEstimates_, "estimated_column_multiplicities");
tryGet(rti.sizeEstimate_, "estimated_size");
if (auto it = j.find("status"); it != j.end()) {
rti.status_ = RuntimeInformation::fromString(it->get<std::string>());
}
if (auto it = j.find("children"); it != j.end()) {
for (const auto& child : *it) {
rti.children_.push_back(std::make_shared<RuntimeInformation>(child));
}
}
}

// __________________________________________________________________________
void RuntimeInformation::addLimitOffsetRow(const LimitOffsetClause& l,
bool fullResultIsNotCached) {
Expand Down
5 changes: 5 additions & 0 deletions src/engine/RuntimeInformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class RuntimeInformation {
/// library to allow for implicit conversion.
friend void to_json(nlohmann::ordered_json& j, const RuntimeInformation& rti);

// Import from json. Missing keys or invalid values are ignored.
friend void from_json(const nlohmann::json& j, RuntimeInformation& rti);

/// Set `columnNames_` from a `VariableToColumnMap`. The former is a vector
/// (convenient for this class), the latter is a hash map (appropriate for
/// the rest of the code).
Expand Down Expand Up @@ -138,6 +141,8 @@ class RuntimeInformation {

static std::string_view toString(Status status);

static Status fromString(std::string_view str);

// A helper function for printing the details as a string.
static void formatDetailValue(std::ostream& out, std::string_view key,
const nlohmann::json& value);
Expand Down
33 changes: 29 additions & 4 deletions src/engine/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
// ____________________________________________________________________________
Service::Service(QueryExecutionContext* qec,
parsedQuery::Service parsedServiceClause,
GetResultFunction getResultFunction)
NetworkFunctions networkFunctions)
: Operation{qec},
parsedServiceClause_{std::move(parsedServiceClause)},
getResultFunction_{std::move(getResultFunction)} {}
networkFunctions_{std::move(networkFunctions)} {}

// ____________________________________________________________________________
std::string Service::getCacheKeyImpl() const {
Expand Down Expand Up @@ -125,6 +125,31 @@ ProtoResult Service::computeResultImpl([[maybe_unused]] bool requestLaziness) {
ad_utility::httpUtils::Url serviceUrl{
asStringViewUnsafe(parsedServiceClause_.serviceIri_.getContent())};

// Receive updates about the RuntimeInformation from the service endpoint.
const std::string queryId = ad_utility::UuidGenerator()();
auto updateRuntimeInformation = [&]() {
try {
const std::string target = absl::StrCat("/watch/", queryId);
for (const auto& msg :
networkFunctions_.getRuntimeInfoFunction_(serviceUrl, target)) {
childRuntimeInformation_ =
std::make_shared<RuntimeInformation>(nlohmann::json::parse(msg));
}
} catch (const boost::beast::system_error& se) {
// If the endpoint closes the connection we have received all messages
// -> ignore the error.
if (se.code() != boost::beast::websocket::error::closed) {
LOG(ERROR) << "SERVICE Websocket client: " << se.what() << '\n';
}
} catch (std::exception& e) {
LOG(ERROR) << "SERVICE Websocket client: " << e.what() << '\n';
}

Check warning on line 146 in src/engine/Service.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Service.cpp#L142-L146

Added lines #L142 - L146 were not covered by tests
};
if (!runtimeInfoThread_) {
runtimeInfoThread_ =
std::make_unique<std::thread>(updateRuntimeInformation);
}

// Construct the query to be sent to the SPARQL endpoint.
std::string variablesForSelectClause = absl::StrJoin(
parsedServiceClause_.visibleVariables_, " ", Variable::AbslFormatter);
Expand All @@ -138,10 +163,10 @@ ProtoResult Service::computeResultImpl([[maybe_unused]] bool requestLaziness) {
<< ", target: " << serviceUrl.target() << ")" << std::endl
<< serviceQuery << std::endl;

HttpOrHttpsResponse response = getResultFunction_(
HttpOrHttpsResponse response = networkFunctions_.getResultFunction_(
serviceUrl, cancellationHandle_, boost::beast::http::verb::post,
serviceQuery, "application/sparql-query",
"application/sparql-results+json");
"application/sparql-results+json", {{"Query-Id"sv, queryId}});

auto throwErrorWithContext = [this, &response](std::string_view sv) {
std::string ctx;
Expand Down
44 changes: 38 additions & 6 deletions src/engine/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ class Service : public Operation {
public:
// The type of the function used to obtain the results, see below.
using GetResultFunction = std::function<HttpOrHttpsResponse(
const ad_utility::httpUtils::Url&,
ad_utility::SharedCancellationHandle handle,
const ad_utility::httpUtils::Url&, ad_utility::SharedCancellationHandle,
const boost::beast::http::verb&, std::string_view, std::string_view,
std::string_view)>;
std::string_view,
const std::unordered_map<std::string_view, std::string_view>&)>;

// The type of the function used to obtain the RuntimeInformation.
using GetRuntimeInfoFunction = std::function<cppcoro::generator<std::string>(
const ad_utility::httpUtils::Url&, std::string_view)>;

// Information on a Sibling operation.
struct SiblingInfo {
Expand All @@ -44,16 +48,29 @@ class Service : public Operation {
std::string cacheKey_;
};

struct NetworkFunctions {
GetResultFunction getResultFunction_;
GetRuntimeInfoFunction getRuntimeInfoFunction_;
};

private:
// The parsed SERVICE clause.
parsedQuery::Service parsedServiceClause_;

// The function used to obtain the result from the remote endpoint.
GetResultFunction getResultFunction_;
// The functions used to obtain the result and runtime information from the
// remote endpoint.
NetworkFunctions networkFunctions_;

// Optional sibling information to be used in `getSiblingValuesClause`.
std::optional<SiblingInfo> siblingInfo_;

// RuntimeInformation of the service-query computation on the endpoint.
std::shared_ptr<RuntimeInformation> childRuntimeInformation_;

// Thread for the websocket-client retrieving `childRuntimeInformation_` from
// the endpoint.
std::unique_ptr<std::thread> runtimeInfoThread_;

public:
// Construct from parsed Service clause.
//
Expand All @@ -62,7 +79,15 @@ class Service : public Operation {
// but in our tests (`ServiceTest`) we use a mock function that does not
// require a running `HttpServer`.
Service(QueryExecutionContext* qec, parsedQuery::Service parsedServiceClause,
GetResultFunction getResultFunction = sendHttpOrHttpsRequest);
NetworkFunctions networkFunctions = {
.getResultFunction_ = sendHttpOrHttpsRequest,
.getRuntimeInfoFunction_ = readHttpOrHttpsWebsocketStream});

~Service() {
if (runtimeInfoThread_) {
runtimeInfoThread_->join();
}
}

// Methods inherited from base class `Operation`.
std::string getDescriptor() const override;
Expand All @@ -83,6 +108,13 @@ class Service : public Operation {
// A SERVICE clause has no children.
vector<QueryExecutionTree*> getChildren() override { return {}; }

cppcoro::generator<std::shared_ptr<RuntimeInformation>>
getRuntimeInfoChildren() final {
if (childRuntimeInformation_) {
co_yield childRuntimeInformation_;
}
}

// Convert the given binding to TripleComponent.
TripleComponent bindingToTripleComponent(
const nlohmann::json& binding,
Expand Down
2 changes: 1 addition & 1 deletion src/util/LazyJsonParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class LazyJsonParser {

// Context for the 3 parsing sections.
struct BeforeArrayPath {
// Indices of the latest parsed literal, used to add keys to the curPath_.
// Indices of the latest parsed literal, used to add keys to the `curPath_`.
struct LiteralView {
size_t start_{0};
size_t length_{0};
Expand Down
Loading

0 comments on commit e4490ce

Please sign in to comment.