diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java index f79786f4991..5f68edfe5f6 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java @@ -111,7 +111,7 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset // See also query substitution handled in QueryExecBuilder this.initialBinding = initialToEngine; - // Cancel signal may originate from an e.c. an update execution. + // Cancel signal may originate from an e.g. an update execution. this.cancelSignal = Context.getOrSetCancelSignal(context); init(); diff --git a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java index 2c2d4e7f65a..819465f9f94 100644 --- a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java +++ b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java @@ -17,10 +17,9 @@ */ package org.apache.jena.geosparql.geo.topological; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; +import java.util.*; + +import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.geosparql.geof.topological.GenericFilterFunction; import org.apache.jena.geosparql.implementation.GeometryWrapper; import org.apache.jena.geosparql.implementation.index.QueryRewriteIndex; @@ -37,9 +36,7 @@ import org.apache.jena.sparql.engine.QueryIterator; import org.apache.jena.sparql.engine.binding.Binding; import org.apache.jena.sparql.engine.binding.BindingFactory; -import org.apache.jena.sparql.engine.iterator.QueryIterConcat; -import org.apache.jena.sparql.engine.iterator.QueryIterNullIterator; -import org.apache.jena.sparql.engine.iterator.QueryIterSingleton; +import org.apache.jena.sparql.engine.iterator.*; import org.apache.jena.sparql.expr.ExprEvalException; import org.apache.jena.sparql.pfunction.PFuncSimple; import org.apache.jena.sparql.util.FmtUtils; @@ -87,10 +84,8 @@ public QueryIterator execEvaluated(Binding binding, Node subject, Node predicate } private QueryIterator bothBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) { - Graph graph = execCxt.getActiveGraph(); QueryRewriteIndex queryRewriteIndex = QueryRewriteIndex.retrieve(execCxt); - Boolean isPositiveResult = queryRewrite(graph, subject, predicate, object, queryRewriteIndex); if (isPositiveResult) { //Filter function test succeded so retain binding. @@ -99,39 +94,29 @@ private QueryIterator bothBound(Binding binding, Node subject, Node predicate, N //Filter function test failed so null result. return QueryIterNullIterator.create(execCxt); } - } private QueryIterator bothUnbound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) { - - QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt); Var subjectVar = Var.alloc(subject.getName()); Graph graph = execCxt.getActiveGraph(); //Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present. - ExtendedIterator subjectTriples; - if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) { - subjectTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE); - } else if (graph.contains(null, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(null, RDF.type.asNode(), Geo.GEOMETRY_NODE)) { - ExtendedIterator featureTriples = graph.find(null, RDF.type.asNode(), Geo.FEATURE_NODE); - ExtendedIterator geometryTriples = graph.find(null, RDF.type.asNode(), Geo.GEOMETRY_NODE); - subjectTriples = featureTriples.andThen(geometryTriples); - } else { - //Check for Geo Predicate Features in the Graph if no GeometryLiterals found. - subjectTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null); - } - - //Bind all the Spatial Objects or Geo Predicates once as the subject and search for corresponding Objects. - while (subjectTriples.hasNext()) { - Triple subjectTriple = subjectTriples.next(); - Node boundSubject = subjectTriple.getSubject(); - Binding subjectBind = BindingFactory.binding(binding, subjectVar, boundSubject); - QueryIterator queryIter = oneBound(subjectBind, boundSubject, predicate, object, execCxt); - queryIterConcat.add(queryIter); - } + ExtendedIterator spatialTriples = findSpatialTriples(graph); + ExtendedIterator iterator = spatialTriples + .mapWith(Triple::getSubject) + .mapWith(node -> BindingFactory.binding(binding, subjectVar, node)); + + QueryIter queryIter = QueryIter.flatMap( + QueryIterPlainWrapper.create(iterator, execCxt), + b -> oneBound(b, b.get(subjectVar), predicate, object, execCxt), + execCxt + ); + return queryIter; + } - return queryIterConcat; + private static boolean isCancelled(ExecutionContext execCxt) { + return execCxt.getCancelSignal() != null && execCxt.getCancelSignal().get(); } private QueryIterator oneBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) { @@ -139,7 +124,7 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No Graph graph = execCxt.getActiveGraph(); Node boundNode; Node unboundNode; - Boolean isSubjectBound; + boolean isSubjectBound; if (subject.isConcrete()) { //Subject is bound, object is unbound. boundNode = subject; @@ -152,7 +137,10 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No isSubjectBound = false; } - if (!(boundNode.isLiteral() || graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) { + if (!(boundNode.isLiteral() || + graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || + graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || + graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) { if (!graph.contains(boundNode, SpatialExtension.GEO_LAT_NODE, null)) { //Bound node is not a Feature or a Geometry or has Geo predicates so exit. return QueryIterNullIterator.create(execCxt); @@ -160,7 +148,7 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No } boolean isSpatialIndex = SpatialIndex.isDefined(execCxt); - QueryIterConcat queryIterConcat; + QueryIter queryIterConcat; if (!isSpatialIndex || filterFunction.isDisjoint() || filterFunction.isDisconnected()) { //Disjointed so retrieve all cases. queryIterConcat = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt); @@ -172,13 +160,33 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No return queryIterConcat; } - private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) { + private QueryIter findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) { //Prepare the results. Var unboundVar = Var.alloc(unboundNode.getName()); - QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt); //Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present. + ExtendedIterator spatialTriples = findSpatialTriples(graph); + + ExtendedIterator iterator = spatialTriples. + mapWith(Triple::getSubject). + mapWith(node -> BindingFactory.binding(binding, unboundVar, node)); + return QueryIter.flatMap( + QueryIterPlainWrapper.create(iterator, execCxt), + b -> { + Node spatialNode = b.get(unboundVar); + QueryIterator iter; + if (isSubjectBound) { + iter = bothBound(b, boundNode, predicate, spatialNode, execCxt); + } else { + iter = bothBound(b, spatialNode, predicate, boundNode, execCxt); + } + return iter; + }, + execCxt); + } + + private static ExtendedIterator findSpatialTriples(Graph graph) { ExtendedIterator spatialTriples; if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) { spatialTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE); @@ -190,21 +198,7 @@ private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, B //Check for Geo Predicate Features in the Graph if no GeometryLiterals found. spatialTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null); } - - while (spatialTriples.hasNext()) { - Triple spatialTriple = spatialTriples.next(); - Node spatialNode = spatialTriple.getSubject(); - Binding newBind = BindingFactory.binding(binding, unboundVar, spatialNode); - QueryIterator queryIter; - if (isSubjectBound) { - queryIter = bothBound(newBind, boundNode, predicate, spatialNode, execCxt); - } else { - queryIter = bothBound(newBind, spatialNode, predicate, boundNode, execCxt); - } - queryIterConcat.add(queryIter); - } - - return queryIterConcat; + return spatialTriples; } private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException { @@ -238,39 +232,54 @@ private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Envelope searchEnvelope = transformedGeom.getEnvelope(); HashSet features = spatialIndex.query(searchEnvelope); - //Check each of the Features that match the search. - for (Resource feature : features) { - Node featureNode = feature.asNode(); - - //Ensure not already an asserted node. - if (!assertedNodes.contains(featureNode)) { - - Binding newBind = BindingFactory.binding(binding, unboundVar, featureNode); - QueryIterator queryIter; - if (isSubjectBound) { - queryIter = bothBound(newBind, boundNode, predicate, featureNode, execCxt); - } else { - queryIter = bothBound(newBind, featureNode, predicate, boundNode, execCxt); - } - queryIterConcat.add(queryIter); - } - - //Also test all Geometry of the Features. All, some or one Geometry may have matched. - List featureGeometryTriples = G.listSP(graph, feature.asNode(), Geo.HAS_GEOMETRY_NODE); - for ( Node geomNode : featureGeometryTriples) { - //Ensure not already an asserted node. - if (!assertedNodes.contains(geomNode)) { - Binding newBind = BindingFactory.binding(binding, unboundVar, geomNode); - QueryIterator queryIter; - if (isSubjectBound) { - queryIter = bothBound(newBind, boundNode, predicate, geomNode, execCxt); - } else { - queryIter = bothBound(newBind, geomNode, predicate, boundNode, execCxt); + // Check each of the Features that match the search. + QueryIterator queryIterator = QueryIterPlainWrapper.create( + Iter.map(features.iterator(), + feature -> BindingFactory.binding(binding, unboundVar, feature.asNode())), + execCxt); + + queryIterator = QueryIter.flatMap(queryIterator, b -> { + Node featureNode = b.get(unboundVar); + QueryIterConcat featureIterConcat = new QueryIterConcat(execCxt); + + // Check Features directly if not already asserted + if (!assertedNodes.contains(featureNode)) { + QueryIterator tmpIter; + if (isSubjectBound) { + tmpIter = bothBound(b, boundNode, predicate, featureNode, execCxt); + } else { + tmpIter = bothBound(b, featureNode, predicate, boundNode, execCxt); + } + featureIterConcat.add(tmpIter); } - queryIterConcat.add(queryIter); - } - } - } + + // Also test all Geometry of the Features. All, some or one Geometry may have matched. + ExtendedIterator featureGeometries = G.iterSP(graph, featureNode, Geo.HAS_GEOMETRY_NODE); + QueryIterator geometriesQueryIterator = QueryIterPlainWrapper.create( + Iter.map( + Iter.filter( // omit asserted + featureGeometries, + geometry -> !assertedNodes.contains(geometry) + ), + geometryNode -> BindingFactory.binding(binding, unboundVar, geometryNode)), + execCxt); + geometriesQueryIterator = QueryIter.flatMap( + geometriesQueryIterator, + b2 -> { + Node geomNode = b2.get(unboundVar); + + if (isSubjectBound) { + return bothBound(b2, boundNode, predicate, geomNode, execCxt); + } else { + return bothBound(b2, geomNode, predicate, boundNode, execCxt); + } + }, + execCxt); + featureIterConcat.add(geometriesQueryIterator); + return featureIterConcat; + }, + execCxt); + queryIterConcat.add(queryIterator); return queryIterConcat; } catch (MismatchedDimensionException | TransformException | FactoryException | SpatialIndexException ex) { diff --git a/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java b/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java new file mode 100644 index 00000000000..8fe892d0a1e --- /dev/null +++ b/jena-geosparql/src/test/java/org/apache/jena/geosparql/geo/topological/CancelQueryTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.geosparql.geo.topological; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; +import java.util.stream.IntStream; + +import org.apache.jena.geosparql.configuration.GeoSPARQLConfig; +import org.apache.jena.geosparql.implementation.datatype.WKTDatatype; +import org.apache.jena.geosparql.implementation.index.IndexConfiguration; +import org.apache.jena.geosparql.implementation.vocabulary.Geo; +import org.apache.jena.geosparql.spatial.SpatialIndex; +import org.apache.jena.geosparql.spatial.SpatialIndexException; +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.NodeFactory; +import org.apache.jena.query.Dataset; +import org.apache.jena.query.DatasetFactory; +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryCancelledException; +import org.apache.jena.query.QueryExecution; +import org.apache.jena.query.QueryFactory; +import org.apache.jena.query.ResultSet; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.sparql.graph.GraphFactory; +import org.apache.jena.vocabulary.RDF; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class CancelQueryTest { + + private final int numGeometries; + + public CancelQueryTest(int numGeometries) { + this.numGeometries = numGeometries; + } + + @Parameterized.Parameters(name = "number of geometries: {0}") + public static List sizes() { + return List.of( + 31_623 // The square root of 1 billion approximated as an integer. + ); + } + + public static Graph createSpatialGraph(int numGeometries) { + Graph graph = GraphFactory.createDefaultGraph(); + + IntStream.range(0, numGeometries).forEach(i -> { + // features + graph.add(NodeFactory.createURI("http://www.example.org/r" + i), RDF.type.asNode(), Geo.FEATURE_NODE); + // geometries + graph.add(NodeFactory.createURI("http://www.example.org/r" + i), + Geo.HAS_GEOMETRY_PROP.asNode(), + NodeFactory.createURI("http://www.example.org/r" + i + "/geometry")); + // geo:Geometry type triples + graph.add(NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"), + RDF.type.asNode(), + Geo.GEOMETRY_NODE); + // geometry WKT literals + graph.add(NodeFactory.createURI("http://www.example.org/r" + i + "/geometry"), + Geo.AS_WKT_PROP.asNode(), + NodeFactory.createLiteralDT("POINT(2 2)", WKTDatatype.INSTANCE)); + }); + // System.out.printf("created graph with %d triples and %d geometries\n", graph.size(), numGeometries); + + return graph; + } + + @Test(timeout = 10000) + public void test_cancel_spatial_property_function1() { + GeoSPARQLConfig.setup(IndexConfiguration.IndexOption.MEMORY, Boolean.TRUE); + + long cancelDelayMillis = 1000; + boolean useIndex = true; + + // Create a dataset with spatial triples + Graph graph = createSpatialGraph(numGeometries); + Model model = ModelFactory.createModelForGraph(graph); + Dataset ds = DatasetFactory.create(model); + + // create spatial index + if (useIndex){ + try { + SpatialIndex index = SpatialIndex.buildSpatialIndex(ds); + SpatialIndex.setSpatialIndex(ds, index); + } catch (SpatialIndexException e) { + throw new RuntimeException(e); + } + } + + // Create a query that queries for spatial relation with both sides being unbound, thus, pairwise comparison would be needed + Query query = QueryFactory.create("PREFIX geo: SELECT * { ?a geo:sfIntersects ?b . }"); + Callable qeFactory = () -> QueryExecution.dataset(ds).query(query).build();//.timeout(2000, TimeUnit.MILLISECONDS).build(); + + runAsyncAbort(cancelDelayMillis, qeFactory, CancelQueryTest::doCount); + } + + private static long doCount(QueryExecution qe) { + // System.out.println("Executing query ..."); + long counter = 0; + try (QueryExecution qe2 = qe) { + ResultSet rs = qe2.execSelect(); + while (rs.hasNext()) { + rs.next(); + ++counter; + } + } finally { + // System.out.println("Aborted after seeing " + counter + " bindings"); + } + return counter; + } + + public static void runAsyncAbort(long cancelDelayMillis, Callable qeFactory, Function processor) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + try (QueryExecution qe = qeFactory.call()){ + Future future = executorService.submit(() -> processor.apply(qe)); + try { + Thread.sleep(cancelDelayMillis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // System.out.println("Aborting query execution: " + qe); + qe.abort(); + try { + future.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof QueryCancelledException)) { + e.printStackTrace(); + } + Assert.assertEquals(QueryCancelledException.class, cause.getClass()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to build a query execution", e); + } finally { + // System.out.println("Completed: " + qe); + executorService.shutdownNow(); + } + } +} +