Skip to content

Commit

Permalink
GH-2930: Support for query cancelation for spatial property functions…
Browse files Browse the repository at this point in the history
… by creating iterators lazily.
  • Loading branch information
Lorenz Buehmann authored and Aklakan committed Jan 8, 2025
1 parent 5114670 commit 9a5c75f
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset
// See also query substitution handled in QueryExecBuilder
this.initialBinding = initialToEngine;

// Cancel signal may originate from an e.c. an update execution.
// Cancel signal may originate from an e.g. an update execution.
this.cancelSignal = Context.getOrSetCancelSignal(context);

init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.jena.geosparql.geo.topological;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.*;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.geosparql.geof.topological.GenericFilterFunction;
import org.apache.jena.geosparql.implementation.GeometryWrapper;
import org.apache.jena.geosparql.implementation.index.QueryRewriteIndex;
Expand All @@ -37,9 +36,7 @@
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;
import org.apache.jena.sparql.engine.iterator.QueryIterConcat;
import org.apache.jena.sparql.engine.iterator.QueryIterNullIterator;
import org.apache.jena.sparql.engine.iterator.QueryIterSingleton;
import org.apache.jena.sparql.engine.iterator.*;
import org.apache.jena.sparql.expr.ExprEvalException;
import org.apache.jena.sparql.pfunction.PFuncSimple;
import org.apache.jena.sparql.util.FmtUtils;
Expand Down Expand Up @@ -87,10 +84,8 @@ public QueryIterator execEvaluated(Binding binding, Node subject, Node predicate
}

private QueryIterator bothBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

Graph graph = execCxt.getActiveGraph();
QueryRewriteIndex queryRewriteIndex = QueryRewriteIndex.retrieve(execCxt);

Boolean isPositiveResult = queryRewrite(graph, subject, predicate, object, queryRewriteIndex);
if (isPositiveResult) {
//Filter function test succeded so retain binding.
Expand All @@ -99,47 +94,37 @@ private QueryIterator bothBound(Binding binding, Node subject, Node predicate, N
//Filter function test failed so null result.
return QueryIterNullIterator.create(execCxt);
}

}

private QueryIterator bothUnbound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
Var subjectVar = Var.alloc(subject.getName());

Graph graph = execCxt.getActiveGraph();

//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
ExtendedIterator<Triple> subjectTriples;
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
subjectTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
} else if (graph.contains(null, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(null, RDF.type.asNode(), Geo.GEOMETRY_NODE)) {
ExtendedIterator<Triple> featureTriples = graph.find(null, RDF.type.asNode(), Geo.FEATURE_NODE);
ExtendedIterator<Triple> geometryTriples = graph.find(null, RDF.type.asNode(), Geo.GEOMETRY_NODE);
subjectTriples = featureTriples.andThen(geometryTriples);
} else {
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
subjectTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
}

//Bind all the Spatial Objects or Geo Predicates once as the subject and search for corresponding Objects.
while (subjectTriples.hasNext()) {
Triple subjectTriple = subjectTriples.next();
Node boundSubject = subjectTriple.getSubject();
Binding subjectBind = BindingFactory.binding(binding, subjectVar, boundSubject);
QueryIterator queryIter = oneBound(subjectBind, boundSubject, predicate, object, execCxt);
queryIterConcat.add(queryIter);
}
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);
ExtendedIterator<Binding> iterator = spatialTriples
.mapWith(Triple::getSubject)
.mapWith(node -> BindingFactory.binding(binding, subjectVar, node));

QueryIter queryIter = QueryIter.flatMap(
QueryIterPlainWrapper.create(iterator, execCxt),
b -> oneBound(b, b.get(subjectVar), predicate, object, execCxt),
execCxt
);
return queryIter;
}

return queryIterConcat;
private static boolean isCancelled(ExecutionContext execCxt) {
return execCxt.getCancelSignal() != null && execCxt.getCancelSignal().get();
}

private QueryIterator oneBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

Graph graph = execCxt.getActiveGraph();
Node boundNode;
Node unboundNode;
Boolean isSubjectBound;
boolean isSubjectBound;
if (subject.isConcrete()) {
//Subject is bound, object is unbound.
boundNode = subject;
Expand All @@ -152,15 +137,18 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No
isSubjectBound = false;
}

if (!(boundNode.isLiteral() || graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
if (!(boundNode.isLiteral() ||
graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) ||
graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) ||
graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
if (!graph.contains(boundNode, SpatialExtension.GEO_LAT_NODE, null)) {
//Bound node is not a Feature or a Geometry or has Geo predicates so exit.
return QueryIterNullIterator.create(execCxt);
}
}

boolean isSpatialIndex = SpatialIndex.isDefined(execCxt);
QueryIterConcat queryIterConcat;
QueryIter queryIterConcat;
if (!isSpatialIndex || filterFunction.isDisjoint() || filterFunction.isDisconnected()) {
//Disjointed so retrieve all cases.
queryIterConcat = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
Expand All @@ -172,13 +160,33 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No
return queryIterConcat;
}

private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {
private QueryIter findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {

//Prepare the results.
Var unboundVar = Var.alloc(unboundNode.getName());
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);

//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);

ExtendedIterator<Binding> iterator = spatialTriples.
mapWith(Triple::getSubject).
mapWith(node -> BindingFactory.binding(binding, unboundVar, node));
return QueryIter.flatMap(
QueryIterPlainWrapper.create(iterator, execCxt),
b -> {
Node spatialNode = b.get(unboundVar);
QueryIterator iter;
if (isSubjectBound) {
iter = bothBound(b, boundNode, predicate, spatialNode, execCxt);
} else {
iter = bothBound(b, spatialNode, predicate, boundNode, execCxt);
}
return iter;
},
execCxt);
}

private static ExtendedIterator<Triple> findSpatialTriples(Graph graph) {
ExtendedIterator<Triple> spatialTriples;
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
spatialTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
Expand All @@ -190,21 +198,7 @@ private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, B
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
spatialTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
}

while (spatialTriples.hasNext()) {
Triple spatialTriple = spatialTriples.next();
Node spatialNode = spatialTriple.getSubject();
Binding newBind = BindingFactory.binding(binding, unboundVar, spatialNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, spatialNode, execCxt);
} else {
queryIter = bothBound(newBind, spatialNode, predicate, boundNode, execCxt);
}
queryIterConcat.add(queryIter);
}

return queryIterConcat;
return spatialTriples;
}

private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException {
Expand Down Expand Up @@ -238,39 +232,54 @@ private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode,
Envelope searchEnvelope = transformedGeom.getEnvelope();
HashSet<Resource> features = spatialIndex.query(searchEnvelope);

//Check each of the Features that match the search.
for (Resource feature : features) {
Node featureNode = feature.asNode();

//Ensure not already an asserted node.
if (!assertedNodes.contains(featureNode)) {

Binding newBind = BindingFactory.binding(binding, unboundVar, featureNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, featureNode, execCxt);
} else {
queryIter = bothBound(newBind, featureNode, predicate, boundNode, execCxt);
}
queryIterConcat.add(queryIter);
}

//Also test all Geometry of the Features. All, some or one Geometry may have matched.
List<Node> featureGeometryTriples = G.listSP(graph, feature.asNode(), Geo.HAS_GEOMETRY_NODE);
for ( Node geomNode : featureGeometryTriples) {
//Ensure not already an asserted node.
if (!assertedNodes.contains(geomNode)) {
Binding newBind = BindingFactory.binding(binding, unboundVar, geomNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, geomNode, execCxt);
} else {
queryIter = bothBound(newBind, geomNode, predicate, boundNode, execCxt);
// Check each of the Features that match the search.
QueryIterator queryIterator = QueryIterPlainWrapper.create(
Iter.map(features.iterator(),
feature -> BindingFactory.binding(binding, unboundVar, feature.asNode())),
execCxt);

queryIterator = QueryIter.flatMap(queryIterator, b -> {
Node featureNode = b.get(unboundVar);
QueryIterConcat featureIterConcat = new QueryIterConcat(execCxt);

// Check Features directly if not already asserted
if (!assertedNodes.contains(featureNode)) {
QueryIterator tmpIter;
if (isSubjectBound) {
tmpIter = bothBound(b, boundNode, predicate, featureNode, execCxt);
} else {
tmpIter = bothBound(b, featureNode, predicate, boundNode, execCxt);
}
featureIterConcat.add(tmpIter);
}
queryIterConcat.add(queryIter);
}
}
}

// Also test all Geometry of the Features. All, some or one Geometry may have matched.
ExtendedIterator<Node> featureGeometries = G.iterSP(graph, featureNode, Geo.HAS_GEOMETRY_NODE);
QueryIterator geometriesQueryIterator = QueryIterPlainWrapper.create(
Iter.map(
Iter.filter( // omit asserted
featureGeometries,
geometry -> !assertedNodes.contains(geometry)
),
geometryNode -> BindingFactory.binding(binding, unboundVar, geometryNode)),
execCxt);
geometriesQueryIterator = QueryIter.flatMap(
geometriesQueryIterator,
b2 -> {
Node geomNode = b2.get(unboundVar);

if (isSubjectBound) {
return bothBound(b2, boundNode, predicate, geomNode, execCxt);
} else {
return bothBound(b2, geomNode, predicate, boundNode, execCxt);
}
},
execCxt);
featureIterConcat.add(geometriesQueryIterator);
return featureIterConcat;
},
execCxt);
queryIterConcat.add(queryIterator);

return queryIterConcat;
} catch (MismatchedDimensionException | TransformException | FactoryException | SpatialIndexException ex) {
Expand Down
Loading

0 comments on commit 9a5c75f

Please sign in to comment.