Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly named query #1739

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion benchmark/GroupByHashMapBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
#include <random>

#include "../benchmark/infrastructure/Benchmark.h"
#include "../test/engine/ValuesForTesting.h"
#include "../test/util/IdTableHelpers.h"
#include "../test/util/IndexTestHelpers.h"
#include "engine/GroupBy.h"
#include "engine/Sort.h"
#include "engine/Values.h"
#include "engine/ValuesForTesting.h"
#include "engine/sparqlExpressions/AggregateExpression.h"
#include "engine/sparqlExpressions/GroupConcatExpression.h"
#include "engine/sparqlExpressions/LiteralExpression.h"
Expand Down
4 changes: 3 additions & 1 deletion src/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ add_library(engine
CartesianProductJoin.cpp TextIndexScanForWord.cpp TextIndexScanForEntity.cpp
TextLimit.cpp LazyGroupBy.cpp GroupByHashMapOptimization.cpp SpatialJoin.cpp
CountConnectedSubgraphs.cpp SpatialJoinAlgorithms.cpp PathSearch.cpp ExecuteUpdate.cpp
Describe.cpp GraphStoreProtocol.cpp)
Describe.cpp GraphStoreProtocol.cpp
NamedQueryCache.cpp
QueryExecutionContext.cpp)
qlever_target_link_libraries(engine util index parser sparqlExpressions http SortPerformanceEstimator Boost::iostreams s2)
8 changes: 5 additions & 3 deletions src/engine/CheckUsePatternTrick.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@
} else if constexpr (std::is_same_v<T, p::Service>) {
return ad_utility::contains(arg.visibleVariables_, variable);
} else {
static_assert(
std::is_same_v<T, p::TransPath> || std::is_same_v<T, p::PathQuery> ||
std::is_same_v<T, p::Describe> || std::is_same_v<T, p::SpatialQuery>);
static_assert(std::is_same_v<T, p::TransPath> ||
std::is_same_v<T, p::PathQuery> ||
std::is_same_v<T, p::Describe> ||
std::is_same_v<T, p::SpatialQuery> ||
std::is_same_v<T, p::NamedCachedQuery>);

Check warning on line 79 in src/engine/CheckUsePatternTrick.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/CheckUsePatternTrick.cpp#L75-L79

Added lines #L75 - L79 were not covered by tests
// The `TransPath` is set up later in the query planning, when this
// function should not be called anymore.
AD_FAIL();
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Describe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

#include "engine/Describe.h"

#include "../../test/engine/ValuesForTesting.h"
#include "engine/IndexScan.h"
#include "engine/Join.h"
#include "engine/ValuesForTesting.h"

// _____________________________________________________________________________
Describe::Describe(QueryExecutionContext* qec,
Expand Down
34 changes: 34 additions & 0 deletions src/engine/NamedQueryCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Johannes Kalmbach <[email protected]>

#include "engine/NamedQueryCache.h"

// _____________________________________________________________________________
std::shared_ptr<ValuesForTesting> NamedQueryCache ::getOperation(
const Key& key, QueryExecutionContext* ctx) const {
const auto& [table, map, sortedOn] = get(key);

Check warning on line 10 in src/engine/NamedQueryCache.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/NamedQueryCache.cpp#L9-L10

Added lines #L9 - L10 were not covered by tests
// TODO<joka921> we should get rid of the copies for the IdTable (and
// probably the other members) especially for larger results).
return std::make_shared<ValuesForTesting>(ctx, table.clone(), map);
}

Check warning on line 14 in src/engine/NamedQueryCache.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/NamedQueryCache.cpp#L13-L14

Added lines #L13 - L14 were not covered by tests

// _____________________________________________________________________________
auto NamedQueryCache::get(const Key& key) const -> const Value& {
auto l = cache_.wlock();
auto it = l->find(key);

Check warning on line 19 in src/engine/NamedQueryCache.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/NamedQueryCache.cpp#L17-L19

Added lines #L17 - L19 were not covered by tests
if (it == l->end()) {
throw std::runtime_error{
absl::StrCat("The named query with the name \"", key,
"\" was not pinned to the named query cache")};
}
return it->second;
}

Check warning on line 26 in src/engine/NamedQueryCache.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/NamedQueryCache.cpp#L21-L26

Added lines #L21 - L26 were not covered by tests

// _____________________________________________________________________________
void NamedQueryCache::store(const Key& key, Value value) {
(*cache_.wlock()).insert_or_assign(key, std::move(value));
}

Check warning on line 31 in src/engine/NamedQueryCache.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/NamedQueryCache.cpp#L29-L31

Added lines #L29 - L31 were not covered by tests

// _____________________________________________________________________________
void NamedQueryCache::clear() { cache_.wlock()->clear(); }

Check warning on line 34 in src/engine/NamedQueryCache.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/NamedQueryCache.cpp#L34

Added line #L34 was not covered by tests
46 changes: 46 additions & 0 deletions src/engine/NamedQueryCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Johannes Kalmbach <[email protected]>
#pragma once

#include "engine/ValuesForTesting.h"
#include "util/Synchronized.h"

// A simple threadsafe cache that associates query results with an explicit
// name.
class NamedQueryCache {
public:
// The cache value. It stores all the information required to construct a
// proper `QueryExecutionTree` later on.
struct Value {
IdTable result_;
VariableToColumnMap varToColMap_;
std::vector<ColumnIndex> resultSortedOn_;
};
using Key = std::string;
using Cache = ad_utility::HashMap<std::string, Value>;

private:
ad_utility::Synchronized<Cache> cache_;

public:
// Store an explicit query result with a given `key`. Previously stored
// `value`s with the same `key` are overwritten.
void store(const Key& key, Value value);

// Clear the cache.
void clear();

// Retrieve the query result that is associated with the `key`.
// Throw an exception if the `key` doesn't exist.
const Value& get(const Key& key) const;

// Retrieve the query result with the given `key` and convert it into an
// explicit `ValuesForTesting` operation that can be used as part of a
// `QueryExecutionTree`.
// TODO<joka921> This can be done more efficiently if we implement a dedicated
// operation for this use case, `ValuesForTesting` currently incurs one
// (unneeded) copy per query execution.
std::shared_ptr<ValuesForTesting> getOperation(
const Key& key, QueryExecutionContext* ctx) const;
};
22 changes: 22 additions & 0 deletions src/engine/QueryExecutionContext.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
// Author: Johannes Kalmbach <[email protected]>

#include "engine/QueryExecutionContext.h"

// _____________________________________________________________________________
QueryExecutionContext::QueryExecutionContext(
const Index& index, QueryResultCache* const cache,
ad_utility::AllocatorWithLimit<Id> allocator,
SortPerformanceEstimator sortPerformanceEstimator,
NamedQueryCache* namedCache,
std::function<void(std::string)> updateCallback, const bool pinSubtrees,
const bool pinResult)
: _pinSubtrees(pinSubtrees),
_pinResult(pinResult),
_index(index),
_subtreeCache(cache),
_allocator(std::move(allocator)),
_sortPerformanceEstimator(sortPerformanceEstimator),
updateCallback_(std::move(updateCallback)),
namedQueryCache_{namedCache} {}
24 changes: 15 additions & 9 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
};
};

// Forward declaration because of cyclic dependencies
class NamedQueryCache;

// The key for the `QueryResultCache` below. It consists of a `string` (the
// actual cache key of a `QueryExecutionTree` and the index of the
// `LocatedTriplesSnapshot` that was used to create the corresponding value.
Expand All @@ -89,6 +92,9 @@
using QueryResultCache = ad_utility::ConcurrentCache<
ad_utility::LRUCache<QueryCacheKey, CacheValue, CacheValue::SizeGetter>>;

// Forward declaration because of cyclic dependency
class NamedQueryCache;

// Execution context for queries.
// Holds references to index and engine, implements caching.
class QueryExecutionContext {
Expand All @@ -97,17 +103,10 @@
const Index& index, QueryResultCache* const cache,
ad_utility::AllocatorWithLimit<Id> allocator,
SortPerformanceEstimator sortPerformanceEstimator,
NamedQueryCache* namedCache,
std::function<void(std::string)> updateCallback =
[](std::string) { /* No-op by default for testing */ },
const bool pinSubtrees = false, const bool pinResult = false)
: _pinSubtrees(pinSubtrees),
_pinResult(pinResult),
_index(index),
_subtreeCache(cache),
_allocator(std::move(allocator)),
_costFactors(),
_sortPerformanceEstimator(sortPerformanceEstimator),
updateCallback_(std::move(updateCallback)) {}
bool pinSubtrees = false, bool pinResult = false);

QueryResultCache& getQueryTreeCache() { return *_subtreeCache; }

Expand Down Expand Up @@ -151,6 +150,11 @@
return areWebsocketUpdatesEnabled_;
}

NamedQueryCache& namedQueryCache() {
AD_CORRECTNESS_CHECK(namedQueryCache_ != nullptr);
return *namedQueryCache_;
}

Check warning on line 156 in src/engine/QueryExecutionContext.h

View check run for this annotation

Codecov / codecov/patch

src/engine/QueryExecutionContext.h#L153-L156

Added lines #L153 - L156 were not covered by tests

private:
const Index& _index;

Expand All @@ -170,4 +174,6 @@
// mutex.
bool areWebsocketUpdatesEnabled_ =
RuntimeParameters().get<"websocket-updates-enabled">();

NamedQueryCache* namedQueryCache_ = nullptr;
};
12 changes: 12 additions & 0 deletions src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <type_traits>
#include <variant>

#include "NamedQueryCache.h"
#include "backports/algorithm.h"
#include "engine/Bind.h"
#include "engine/CartesianProductJoin.h"
Expand Down Expand Up @@ -2408,6 +2409,8 @@
visitDescribe(arg);
} else if constexpr (std::is_same_v<T, p::SpatialQuery>) {
visitSpatialSearch(arg);
} else if constexpr (std::is_same_v<T, p::NamedCachedQuery>) {
visitNamedCachedQuery(arg);
} else {
static_assert(std::is_same_v<T, p::BasicGraphPattern>);
visitBasicGraphPattern(arg);
Expand Down Expand Up @@ -2581,6 +2584,15 @@
visitGroupOptionalOrMinus(std::move(candidatesOut));
}

// _____________________________________________________________________________
void QueryPlanner::GraphPatternPlanner::visitNamedCachedQuery(
parsedQuery::NamedCachedQuery& arg) {
auto candidate = SubtreePlan{
planner_._qec, planner_._qec->namedQueryCache().getOperation(
arg.validateAndGetIdentifier(), planner_._qec)};
visitGroupOptionalOrMinus(std::vector{std::move(candidate)});
}

Check warning on line 2594 in src/engine/QueryPlanner.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/QueryPlanner.cpp#L2589-L2594

Added lines #L2589 - L2594 were not covered by tests

// _______________________________________________________________
void QueryPlanner::GraphPatternPlanner::visitUnion(parsedQuery::Union& arg) {
// TODO<joka921> here we could keep all the candidates, and create a
Expand Down
1 change: 1 addition & 0 deletions src/engine/QueryPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class QueryPlanner {
void visitTransitivePath(parsedQuery::TransPath& transitivePath);
void visitPathSearch(parsedQuery::PathQuery& config);
void visitSpatialSearch(parsedQuery::SpatialQuery& config);
void visitNamedCachedQuery(parsedQuery::NamedCachedQuery& config);
void visitUnion(parsedQuery::Union& un);
void visitSubquery(parsedQuery::Subquery& subquery);
void visitDescribe(parsedQuery::Describe& describe);
Expand Down
40 changes: 30 additions & 10 deletions src/engine/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,16 +806,21 @@
auto [cancellationHandle, cancelTimeoutOnDestruction] =
setupCancellationHandle(messageSender.getQueryId(), timeLimit);

// Do the query planning. This creates a `QueryExecutionTree`, which will
// then be used to process the query.
// Figure out, whether the query is to be pinned in the cache (either
// implicitly, or explicitly as a named query).
auto [pinSubtrees, pinResult] = determineResultPinning(params);
std::optional<std::string> pinNamed =
ad_utility::url_parser::checkParameter(params, "pin-named-query", {});

Check warning on line 813 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L812-L813

Added lines #L812 - L813 were not covered by tests
LOG(INFO) << "Processing the following SPARQL query:"
<< (pinResult ? " [pin result]" : "")
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< (pinNamed ? absl::StrCat(" [pin named as ]", pinNamed.value())
: "")
<< "\n"

Check warning on line 819 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L818-L819

Added lines #L818 - L819 were not covered by tests
<< query.query_ << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
sortPerformanceEstimator_, &namedQueryCache_,
std::ref(messageSender), pinSubtrees, pinResult);

Check warning on line 823 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L822-L823

Added lines #L822 - L823 were not covered by tests

// The usage of an `optional` here is required because of a limitation in
// Boost::Asio which forces us to use default-constructible result types with
Expand Down Expand Up @@ -866,10 +871,25 @@
qet.getRootOperation()->getLimit()._offset);
limitOffset._offset -= qet.getRootOperation()->getLimit()._offset;

// This actually processes the query and sends the result in the requested
// format.
co_await sendStreamableResponse(request, send, mediaType, plannedQuery, qet,
requestTimer, cancellationHandle);
if (pinNamed.has_value()) {
// The query is to be pinned in the named cache. In this case we don't
// return the result, but only pin it.
auto result = qet.getResult(false);
auto t =
NamedQueryCache::Value(result->idTable().clone(),
qet.getVariableColumns(), result->sortedBy());
qec.namedQueryCache().store(pinNamed.value(), std::move(t));

Check warning on line 881 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L877-L881

Added lines #L877 - L881 were not covered by tests

auto response = ad_utility::httpUtils::createOkResponse(
"Successfully pinned the query result", request,
ad_utility::MediaType::textPlain);
co_await send(response);
} else {

Check warning on line 887 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L883-L887

Added lines #L883 - L887 were not covered by tests
// This actually processes the query and sends the result in the requested
// format.
co_await sendStreamableResponse(request, send, mediaType, plannedQuery, qet,
requestTimer, cancellationHandle);
}

Check warning on line 892 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L890-L892

Added lines #L890 - L892 were not covered by tests

// Print the runtime info. This needs to be done after the query
// was computed.
Expand Down Expand Up @@ -957,8 +977,8 @@
<< (pinSubtrees ? " [pin subresults]" : "") << "\n"
<< update.update_ << std::endl;
QueryExecutionContext qec(index_, &cache_, allocator_,
sortPerformanceEstimator_, std::ref(messageSender),
pinSubtrees, pinResult);
sortPerformanceEstimator_, &namedQueryCache_,
std::ref(messageSender), pinSubtrees, pinResult);

Check warning on line 981 in src/engine/Server.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Server.cpp#L980-L981

Added lines #L980 - L981 were not covered by tests
auto plannedQuery =
setupPlannedQuery(update.datasetClauses_, update.update_, qec,
cancellationHandle, timeLimit, requestTimer);
Expand Down
2 changes: 2 additions & 0 deletions src/engine/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "ExecuteUpdate.h"
#include "engine/Engine.h"
#include "engine/NamedQueryCache.h"
#include "engine/QueryExecutionContext.h"
#include "engine/QueryExecutionTree.h"
#include "engine/SortPerformanceEstimator.h"
Expand Down Expand Up @@ -68,6 +69,7 @@ class Server {
unsigned short port_;
std::string accessToken_;
QueryResultCache cache_;
NamedQueryCache namedQueryCache_;
ad_utility::AllocatorWithLimit<Id> allocator_;
SortPerformanceEstimator sortPerformanceEstimator_;
Index index_;
Expand Down
Loading
Loading