Skip to content

Commit

Permalink
GH-4328 fedx unclosed iteration (part 1) (#4343)
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad authored Dec 27, 2022
2 parents fdceb5d + 7b38087 commit 63cbd23
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,15 @@ public E getNextElement() throws T {
checkException();
return take;
} catch (InterruptedException e) {
checkException();
close();
Thread.currentThread().interrupt();
try {
checkException();
} finally {
try {
close();
} finally {
Thread.currentThread().interrupt();
}
}
return null;
}
}
Expand Down Expand Up @@ -221,6 +227,7 @@ public void checkException() throws T {
}
}
}

}

private boolean isAfterLast(E take) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public GraphQueryResult evaluate() throws QueryEvaluationException {
try {
conn.flushTransactionState(Protocol.Action.QUERY);
return client.sendGraphQuery(queryLanguage, queryString, baseURI, dataset, getIncludeInferred(),
getMaxExecutionTime(), ((WeakReference) null), getBindingsArray());
getMaxExecutionTime(), ((WeakReference<?>) null), getBindingsArray());
} catch (IOException | RepositoryException | MalformedQueryException e) {
throw new HTTPQueryEvaluationException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public TupleQueryResult evaluate() throws QueryEvaluationException {
conn.flushTransactionState(Protocol.Action.QUERY);

return client.sendTupleQuery(queryLanguage, queryString, baseURI, dataset, getIncludeInferred(),
getMaxExecutionTime(), ((WeakReference) null), getBindingsArray());
getMaxExecutionTime(), ((WeakReference<?>) null), getBindingsArray());
} catch (IOException | RepositoryException | MalformedQueryException e) {
throw new HTTPQueryEvaluationException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public GraphQueryResult evaluate() throws QueryEvaluationException {
// TODO getQueryString() already inserts bindings, use emptybindingset
// as last argument?
return client.sendGraphQuery(queryLanguage, getQueryString(), baseURI, dataset, getIncludeInferred(),
getMaxExecutionTime(), ((WeakReference) null), getBindingsArray());
getMaxExecutionTime(), ((WeakReference<?>) null), getBindingsArray());
} catch (IOException | RepositoryException | MalformedQueryException e) {
throw new QueryEvaluationException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public TupleQueryResult evaluate() throws QueryEvaluationException {
SPARQLProtocolSession client = getHttpClient();
try {
return client.sendTupleQuery(QueryLanguage.SPARQL, getQueryString(), baseURI, dataset, getIncludeInferred(),
getMaxExecutionTime(), ((WeakReference) null), getBindingsArray());
getMaxExecutionTime(), ((WeakReference<?>) null), getBindingsArray());
} catch (IOException | RepositoryException | MalformedQueryException e) {
throw new QueryEvaluationException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public class FedXConnection extends AbstractSailConnection {
*/
private WriteStrategy writeStrategy;

public FedXConnection(FedX federation, FederationContext federationContext)
throws SailException {
public FedXConnection(FedX federation, FederationContext federationContext) throws SailException {
super(federation);
this.federation = federation;
this.federationContext = federationContext;
Expand All @@ -100,9 +99,8 @@ public void setTransactionSettings(TransactionSetting... settings) {
}

@Override
protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(
TupleExpr query, Dataset dataset, BindingSet bindings,
boolean includeInferred) throws SailException {
protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(TupleExpr query,
Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException {

final TupleExpr originalQuery = query;

Expand Down Expand Up @@ -132,8 +130,8 @@ protected CloseableIteration<? extends BindingSet, QueryEvaluationException> eva
FederationEvaluationStatistics stats = new FederationEvaluationStatistics(queryInfo, dataset);
query = strategy.optimize(query, stats, bindings);
} catch (Exception e) {
log.warn("Exception occured during optimization (Query: " + queryInfo.getQueryID() + "): "
+ e.getMessage());
log.warn(
"Exception occured during optimization (Query: " + queryInfo.getQueryID() + "): " + e.getMessage());
log.debug("Details: ", e);
throw new SailException(e);
}
Expand Down Expand Up @@ -250,8 +248,7 @@ public CloseableIteration<Resource, QueryEvaluationException> performTask() thro
try (RepositoryConnection conn = e.getConnection()) {
// we need to materialize the contexts as they are only accessible
// while the connection is open
return new CollectionIteration<>(
Iterations.asList(conn.getContextIDs()));
return new CollectionIteration<>(Iterations.asList(conn.getContextIDs()));
}
}

Expand All @@ -269,13 +266,12 @@ public void cancel() {
// execute the union in a separate thread
federationContext.getManager().getExecutor().execute(union);

return new DistinctIteration<>(
new ExceptionConvertingIteration<>(union) {
@Override
protected SailException convert(Exception e) {
return new SailException(e);
}
});
return new DistinctIteration<>(new ExceptionConvertingIteration<>(union) {
@Override
protected SailException convert(Exception e) {
return new SailException(e);
}
});
}

@Override
Expand All @@ -286,8 +282,7 @@ protected String getNamespaceInternal(String prefix) throws SailException {
}

@Override
protected CloseableIteration<? extends Namespace, SailException> getNamespacesInternal()
throws SailException {
protected CloseableIteration<? extends Namespace, SailException> getNamespacesInternal() throws SailException {
// do not support this feature, but also do not throw an exception
// as this method is expected for the RDF4J workbench to work
return new EmptyIteration<>();
Expand Down Expand Up @@ -330,8 +325,7 @@ protected SailException convert(Exception e) {
}

@Override
protected void addStatementInternal(Resource subj, IRI pred, Value obj,
Resource... contexts) throws SailException {
protected void addStatementInternal(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException {
try {
getWriteStrategyInternal().addStatement(subj, pred, obj, contexts);
} catch (RepositoryException e) {
Expand All @@ -345,8 +339,8 @@ protected void removeNamespaceInternal(String prefix) throws SailException {
}

@Override
protected void removeStatementsInternal(Resource subj, IRI pred, Value obj,
Resource... contexts) throws SailException {
protected void removeStatementsInternal(Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailException {
try {
getWriteStrategyInternal().removeStatement(subj, pred, obj, contexts);
} catch (RepositoryException e) {
Expand Down Expand Up @@ -383,8 +377,8 @@ protected long sizeInternal(Resource... contexts) throws SailException {
}
}
if (errorEndpoints.size() > 0) {
throw new SailException("Could not determine size for members " + errorEndpoints +
"(Supported for NativeStore and RemoteRepository only). Computed size: " + size);
throw new SailException("Could not determine size for members " + errorEndpoints
+ "(Supported for NativeStore and RemoteRepository only). Computed size: " + size);
}
return size;
}
Expand Down Expand Up @@ -500,8 +494,8 @@ public boolean pendingRemovals() {
}

@Override
public Explanation explain(Explanation.Level level, TupleExpr tupleExpr, Dataset dataset,
BindingSet bindings, boolean includeInferred, int timeoutSeconds) {
public Explanation explain(Explanation.Level level, TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
boolean includeInferred, int timeoutSeconds) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,18 +385,18 @@ protected static Set<String> findQueryPrefixes(String queryString) {

HashSet<String> res = new HashSet<>();

Scanner sc = new Scanner(queryString);
while (true) {
while (sc.findInLine(prefixPattern) != null) {
MatchResult m = sc.match();
res.add(m.group(1));
}
if (!sc.hasNextLine()) {
break;
try (Scanner sc = new Scanner(queryString)) {
while (true) {
while (sc.findInLine(prefixPattern) != null) {
MatchResult m = sc.match();
res.add(m.group(1));
}
if (!sc.hasNextLine()) {
break;
}
sc.nextLine();
}
sc.nextLine();
}
sc.close();
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,16 +509,17 @@ protected QueryEvaluationStep prepareNJoin(NJoin join, QueryEvaluationContext co

return bindings -> {
boolean completed = false;
CloseableIteration<BindingSet, QueryEvaluationException> result = resultProvider.evaluate(bindings);
CloseableIteration<BindingSet, QueryEvaluationException> result = null;
try {
for (int i = 1, n = join.getNumberOfArguments(); i < n; i++) {
result = resultProvider.evaluate(bindings);

for (int i = 1, n = join.getNumberOfArguments(); i < n; i++) {
result = executeJoin(joinScheduler, result, join.getArg(i), join.getJoinVariables(i), bindings,
join.getQueryInfo());
}
completed = true;
} finally {
if (!completed) {
if (!completed && result != null) {
result.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ public CloseableIteration<BindingSet, QueryEvaluationException> getStatements(
// TODO we need to fix this here: if the dataset contains FROM NAMED, we cannot use
// the API and require to write as query

RepositoryResult<Statement> repoResult = conn.getStatements((Resource) subjValue, (IRI) predValue, objValue,
queryInfo.getIncludeInferred(), FedXUtil.toContexts(stmt, queryInfo.getDataset()));

RepositoryResult<Statement> repoResult = null;
try {
repoResult = conn.getStatements((Resource) subjValue, (IRI) predValue, objValue,
queryInfo.getIncludeInferred(), FedXUtil.toContexts(stmt, queryInfo.getDataset()));

// XXX implementation remark and TODO taken from Sesame
// The same variable might have been used multiple times in this
// StatementPattern, verify value equality in those cases.
Expand All @@ -94,7 +95,9 @@ public CloseableIteration<BindingSet, QueryEvaluationException> getStatements(
resultHolder.set(filteredRes);
}
} catch (Throwable t) {
repoResult.close();
if (repoResult != null) {
repoResult.close();
}
throw t;
}

Expand All @@ -109,9 +112,11 @@ public CloseableIteration<Statement, QueryEvaluationException> getStatements(

return withConnection((conn, resultHolder) -> {

RepositoryResult<Statement> repoResult = conn.getStatements(subj, pred, obj,
queryInfo.getIncludeInferred(), contexts);
RepositoryResult<Statement> repoResult = null;
try {
repoResult = conn.getStatements(subj, pred, obj,
queryInfo.getIncludeInferred(), contexts);

// XXX implementation remark and TODO taken from Sesame
// The same variable might have been used multiple times in this
// StatementPattern, verify value equality in those cases.
Expand All @@ -123,7 +128,9 @@ protected QueryEvaluationException convert(Exception arg0) {
}
});
} catch (Throwable t) {
repoResult.close();
if (repoResult != null) {
repoResult.close();
}
throw t;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@
/**
* A triple source to be used for (remote) SPARQL endpoints.
* <p>
*
* <p>
* This triple source supports the {@link SparqlEndpointConfiguration} for defining whether ASK queries are to be used
* for source selection.
*
* <p>
* The query result of {@link #getStatements(String, BindingSet, FilterValueExpr, QueryInfo)} is wrapped in a
* {@link ConsumingIteration} to avoid blocking behavior..
*
* @author Andreas Schwarte
*
*/
public class SparqlTripleSource extends TripleSourceBase {

Expand Down Expand Up @@ -109,8 +108,7 @@ public boolean hasStatements(StatementPattern stmt,
applyMaxExecutionTimeUpperBound(query);

monitorRemoteRequest();
boolean hasStatements = query.evaluate();
return hasStatements;
return query.evaluate();
} catch (Throwable ex) {
// convert into QueryEvaluationException with additional info
throw ExceptionUtil.traceExceptionSourceAndRepair(endpoint, ex, "Subquery: " + queryString);
Expand All @@ -126,9 +124,7 @@ public boolean hasStatements(StatementPattern stmt,

monitorRemoteRequest();
try (TupleQueryResult qRes = query.evaluate()) {

boolean hasStatements = qRes.hasNext();
return hasStatements;
return qRes.hasNext();
} catch (Throwable ex) {
// convert into QueryEvaluationException with additional info
throw ExceptionUtil.traceExceptionSourceAndRepair(endpoint, ex, "Subquery: " + queryString);
Expand Down Expand Up @@ -157,8 +153,7 @@ public boolean hasStatements(ExclusiveTupleExpr expr,
monitorRemoteRequest();
try (TupleQueryResult qRes = query.evaluate()) {

boolean hasStatements = qRes.hasNext();
return hasStatements;
return qRes.hasNext();
} catch (Throwable ex) {
// convert into QueryEvaluationException with additional info
throw ExceptionUtil.traceExceptionSourceAndRepair(endpoint, ex, "Subquery: " + queryString);
Expand Down Expand Up @@ -192,9 +187,10 @@ public CloseableIteration<Statement, QueryEvaluationException> getStatements(

return withConnection((conn, resultHolder) -> {
monitorRemoteRequest();
RepositoryResult<Statement> repoResult = conn.getStatements(subj, pred, obj,
queryInfo.getIncludeInferred(), contexts);
RepositoryResult<Statement> repoResult = null;
try {
repoResult = conn.getStatements(subj, pred, obj,
queryInfo.getIncludeInferred(), contexts);
resultHolder.set(new ExceptionConvertingIteration<>(repoResult) {
@Override
protected QueryEvaluationException convert(Exception ex) {
Expand All @@ -205,7 +201,9 @@ protected QueryEvaluationException convert(Exception ex) {
}
});
} catch (Throwable t) {
repoResult.close();
if (repoResult != null) {
repoResult.close();
}
throw t;
}

Expand Down
Loading

0 comments on commit 63cbd23

Please sign in to comment.