diff --git a/Exareme-Docker/files/root/exareme/bootstrap.sh b/Exareme-Docker/files/root/exareme/bootstrap.sh index 62c2c53cf..46ed522bd 100755 --- a/Exareme-Docker/files/root/exareme/bootstrap.sh +++ b/Exareme-Docker/files/root/exareme/bootstrap.sh @@ -14,7 +14,7 @@ CONSUL_WAIT_FOR_MASTER_IP_MAX_ATTEMPTS=20 EXAREME_NODE_STARTUP_HEALTH_CHECK_MAX_ATTEMPTS=10 EXAREME_NODE_HEALTH_CHECK_TIMEOUT=60 MASTER_NODE_REACHABLE_TIMEOUT=5 -PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=3 +PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=5 PERIODIC_EXAREME_NODES_HEALTH_CHECK_INTERVAL=120 EXAREME_HEALTH_CHECK_AWAIT_TIME=60 PERIODIC_TEMP_FILES_REMOVAL=300 @@ -118,7 +118,7 @@ exaremeNodesHealthCheck() { echo "$(timestamp) HEALTH CHECK for node with IP ${NODE_IP} and name ${NODE_NAME} ." if [[ "${FEDERATION_ROLE}" == "master" ]]; then - check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${NODE_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}") + check=$(curl -s -X POST --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} ${NODE_IP}:9090/mining/query/HEALTH_CHECK) else check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${MASTER_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}") fi @@ -255,11 +255,6 @@ if [[ "${FEDERATION_ROLE}" == "master" ]]; then curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_MASTER_PATH}/${NODE_NAME} <<<${NODE_IP} curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP} - if ! startupExaremeNodesHealthCheck; then - echo "$(timestamp) HEALTH CHECK algorithm failed. Switch ENVIRONMENT_TYPE to 'DEV' to see error messages coming from EXAREME. Exiting..." - exit 1 - fi - periodicExaremeNodesHealthCheck & else ##### Running bootstrap on a worker node ##### @@ -271,16 +266,16 @@ else ##### Running bootstrap on a worker node ##### echo "$(timestamp) Starting Exareme on worker node with IP: ${NODE_IP} and nodeName: ${NODE_NAME}" . ./start-worker.sh - - # Updating consul with node IP - echo -e "\n$(timestamp) Updating consul with worker node IP." - curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP} - + if ! startupExaremeNodesHealthCheck; then echo "$(timestamp) HEALTH CHECK algorithm failed. Switch ENVIRONMENT_TYPE to 'DEV' to see error messages coming from EXAREME. Exiting..." exit 1 fi + # Updating consul with node IP + echo -e "\n$(timestamp) Updating consul with worker node IP." + curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP} + periodicExaremeNodesHealthCheck & periodicReachableMasterNodeCheck & diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/AdpDBClientQueryStatus.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/AdpDBClientQueryStatus.java index 14c9f4001..4010a06f1 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/AdpDBClientQueryStatus.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/AdpDBClientQueryStatus.java @@ -28,8 +28,8 @@ public interface AdpDBClientQueryStatus { void registerListener(AdpDBQueryListener listener) throws RemoteException; - InputStream getResult() throws RemoteException; + String getResult() throws RemoteException; - InputStream getResult(DataSerialization ds) throws RemoteException; + String getResult(DataSerialization ds) throws RemoteException; } diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/rmi/RmiAdpDBClientQueryStatus.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/rmi/RmiAdpDBClientQueryStatus.java index a692c7a8d..2e627f04e 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/rmi/RmiAdpDBClientQueryStatus.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/client/rmi/RmiAdpDBClientQueryStatus.java @@ -21,7 +21,9 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.rmi.RemoteException; +import java.util.Arrays; import java.util.HashMap; +import java.util.concurrent.FutureTask; /** * @author alex @@ -35,7 +37,8 @@ public class RmiAdpDBClientQueryStatus implements AdpDBClientQueryStatus { private String lastStatus; private TimeFormat timeF; private boolean finished; - private InputStream result; + private boolean error; + private String result; public RmiAdpDBClientQueryStatus(AdpDBQueryID queryId, AdpDBClientProperties properties, AdpDBQueryExecutionPlan plan, AdpDBStatus status) { @@ -45,6 +48,7 @@ public RmiAdpDBClientQueryStatus(AdpDBQueryID queryId, AdpDBClientProperties pro this.lastStatus = null; this.timeF = new TimeFormat(TimeUnit.min); this.finished = false; + this.error = false; result = null; } @@ -61,13 +65,6 @@ public boolean hasFinished() throws RemoteException { if (!status.hasFinished() && !status.hasError()) return false; - try { - String algorithmResult = IOUtils.toString(getResult(DataSerialization.summary), StandardCharsets.UTF_8); - log.info("Algorithm with queryId " + getQueryID().getQueryID() + " terminated. Result: \n " + algorithmResult); - } catch (IOException e) { - log.error("Could not read the algorithm result table." + getQueryID()); - } - finished = true; return true; } @@ -113,7 +110,7 @@ public void registerListener(AdpDBQueryListener listener) throws RemoteException } @Override - public InputStream getResult() throws RemoteException { + public String getResult() throws RemoteException { return getResult(DataSerialization.ldjson); } @@ -129,14 +126,27 @@ public InputStream getResult() throws RemoteException { * @throws RemoteException */ @Override - public InputStream getResult(DataSerialization ds) throws RemoteException { + public String getResult(DataSerialization ds) throws RemoteException { // The registry should be updated the 1st time we fetch a result stream. if (result == null) { updateRegistry(); } - result = new RmiAdpDBClient(AdpDBManagerLocator.getDBManager(), properties) + InputStream resultStream = new RmiAdpDBClient(AdpDBManagerLocator.getDBManager(), properties) .readTable(plan.getResultTables().get(0).getName(), ds); + + FutureTask getResultFromStream; + try { + getResultFromStream = new FutureTask<>(() -> + IOUtils.toString(resultStream, StandardCharsets.UTF_8)); + + new Thread(getResultFromStream).start(); + result = getResultFromStream.get(30, java.util.concurrent.TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Error reading the result table! QueryID:" + status.getQueryID().getQueryID(), e); + throw new RemoteException("Could not read the result table!"); + } + return result; } diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBArtJobMonitor.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBArtJobMonitor.java index f08d2f57f..393f7ea87 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBArtJobMonitor.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBArtJobMonitor.java @@ -60,10 +60,9 @@ public void run() { statusManager.getStatistics(status.getId()).setTotalDataTransfers(stats.getTotalData()); while (!sessionManager.hasFinished() && !sessionManager.hasError()) { - boolean updateProgressStatistics = updateProgressStatistics(); if (updateProgressStatistics) { - log.info("Session is running... ID: " + sessionPlan.getSessionID().getLongId() + log.info("Session is updating... ID: " + sessionPlan.getSessionID().getLongId() + " , QueryID: " + queryID.getQueryID()); log.debug("Update listeners ..."); synchronized (listeners) { @@ -89,13 +88,15 @@ public void run() { .setAdpEngineStatistics(statsManager.getStatistics()); if (sessionManager != null && !sessionManager.hasError()) { - log.debug("Session finished, closing! ID: " + sessionPlan.getSessionID().getLongId() + log.info("Session finished, closing! ID: " + sessionPlan.getSessionID().getLongId() + " , QueryID: " + queryID.getQueryID()); statusManager.setFinished(status.getId()); } else { + log.info("Session error! ID: " + sessionPlan.getSessionID().getLongId() + + " , QueryID: " + queryID.getQueryID()); statusManager.setError(status.getId(), sessionManager.getErrorList().get(0)); } - log.debug("Session closing! ID: "+ sessionPlan.getSessionID().getLongId() + log.debug("Session closing! ID: " + sessionPlan.getSessionID().getLongId() + " , QueryID: " + queryID.getQueryID()); sessionPlan.close(); @@ -104,7 +105,6 @@ public void run() { log.error("Cannot monitor job, sessionID: " + sessionPlan.getSessionID().getLongId()); log.error("Cannot monitor job, queryID: " + status.getQueryID().getQueryID(), e); } finally { - log.debug("Terminate listeners ( " + listeners.size() + ")..."); synchronized (listeners) { for (AdpDBQueryListener l : listeners) { l.terminated(queryID, status); diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBExecutorRemote.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBExecutorRemote.java index b0bd57fb4..513297fe2 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBExecutorRemote.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/executor/remote/AdpDBExecutorRemote.java @@ -149,7 +149,6 @@ public AdpDBStatus executeScript(AdpDBQueryExecutionPlan execPlan, AdpDBClientPr AdpDBArtJobMonitor monitor = new AdpDBArtJobMonitor(sessionPlan, status, statusManager, execPlan.getQueryID()); monitors.put(execPlan.getQueryID(), monitor); - executor.submit(monitor); statusArray.add(status); return status; diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/handler/NIterativeAlgorithmResultEntity.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/handler/NIterativeAlgorithmResultEntity.java index dde8e356e..04d23ade2 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/handler/NIterativeAlgorithmResultEntity.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/handler/NIterativeAlgorithmResultEntity.java @@ -47,11 +47,6 @@ public NIterativeAlgorithmResultEntity(IterativeAlgorithmState iterativeAlgorith this.dataSerialization = dataSerialization; } - private final static String user_error = new String("text/plain+user_error"); - private final static String error = new String("text/plain+error"); - private final static String warning = new String("text/plain+warning"); - - /** * @param encoder is used to save the output * @param ioctrl will be used from the iterativeAlgorithmState, when the algorithm is complete, @@ -87,9 +82,11 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOEx if (!finalizeQueryStatus.hasError() && finalizeQueryStatus.hasFinished()) { if (channel == null) { + String result = iterativeAlgorithmState.getAdpDBClientQueryStatus().getResult(dataSerialization); + log.info("Iterative algorithm with key " + iterativeAlgorithmState.getAlgorithmKey() + + " terminated. Result: \n " + result); channel = Channels.newChannel( - iterativeAlgorithmState.getAdpDBClientQueryStatus() - .getResult(dataSerialization)); + new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8))); } // Reading from the channel to the buffer, flip is required by the API channel.read(buffer); @@ -183,11 +180,13 @@ public void closeQuery() throws IOException { finalizeQueryStatus.close(); finalizeQueryStatus = null; } + if (iterativeAlgorithmState != null) + iterativeAlgorithmState.releaseLock(); iterativeAlgorithmState = null; } @Override - public void close() throws IOException { + public void close() { } diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/IterationsEventHandler.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/IterationsEventHandler.java index 7c28a362b..0d9c009f0 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/IterationsEventHandler.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/IterationsEventHandler.java @@ -3,7 +3,6 @@ import madgik.exareme.common.app.engine.AdpDBQueryID; import madgik.exareme.master.client.AdpDBClientQueryStatus; import madgik.exareme.master.engine.iterations.scheduler.IterationsDispatcher; -import madgik.exareme.master.engine.iterations.scheduler.events.phaseCompletion.PhaseCompletionEventHandler; import madgik.exareme.master.engine.iterations.state.IterationsStateManager; import madgik.exareme.master.engine.iterations.state.IterativeAlgorithmState; import madgik.exareme.utils.eventProcessor.EventHandler; @@ -56,8 +55,9 @@ protected AdpDBClientQueryStatus submitQueryAndUpdateExecutionPhase( ias.getAlgorithmKey(), dflScript); - log.info("New Iterative phase: " + currentPhase); - log.info("Executing Iterative DFL Script: \n" + dflScript); + log.info("New Iterative phase: " + currentPhase + " for algorithm: " + ias.getAlgorithmKey() + + " with queryID: " + queryStatus.getQueryID().getQueryID()); + log.debug("Executing Iterative DFL Script: \n" + dflScript); ias.setCurrentExecutionPhase(currentPhase); diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/phaseCompletion/PhaseCompletionEventHandler.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/phaseCompletion/PhaseCompletionEventHandler.java index 785bb9bb9..7127c098b 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/phaseCompletion/PhaseCompletionEventHandler.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/iterations/scheduler/events/phaseCompletion/PhaseCompletionEventHandler.java @@ -109,8 +109,7 @@ public void handle(PhaseCompletionEvent event, EventProcessor proc) throws Remot String terminationConditionResult; try { - InputStream previousResultStream = ias.getAdpDBClientQueryStatus().getResult(); - terminationConditionResult = IOUtils.toString(previousResultStream, StandardCharsets.UTF_8); + terminationConditionResult = ias.getAdpDBClientQueryStatus().getResult(); } catch (IOException e) { throw new IterationsStateFatalException( "Could not read the termination_condition result table.", ias.getAlgorithmKey()); diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/scheduler/AdpDBQueryScheduler.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/scheduler/AdpDBQueryScheduler.java index fd785d00c..15036ca9e 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/scheduler/AdpDBQueryScheduler.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/scheduler/AdpDBQueryScheduler.java @@ -29,6 +29,7 @@ import madgik.exareme.utils.eventProcessor.EventProcessor; import java.rmi.RemoteException; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/statusMgr/AdpDBJobSession.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/statusMgr/AdpDBJobSession.java index 990e4a499..9d8d3a67d 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/statusMgr/AdpDBJobSession.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/engine/statusMgr/AdpDBJobSession.java @@ -8,17 +8,14 @@ import madgik.exareme.common.app.engine.AdpDBStatistics; import madgik.exareme.master.engine.AdpDBManagerLocator; import madgik.exareme.worker.art.executionEngine.session.ExecutionEngineSessionPlan; -import org.apache.log4j.Logger; import java.rmi.RemoteException; -import java.util.Arrays; import java.util.Map; /** * @author herald */ public class AdpDBJobSession { - private static final Logger log = Logger.getLogger(AdpDBJobSession.class); private boolean finished = false; private boolean error = false; private Exception exception = null; diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/HBP/HBPQueryHandler.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/HBP/HBPQueryHandler.java index cb5281154..8778a1368 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/HBP/HBPQueryHandler.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/HBP/HBPQueryHandler.java @@ -13,8 +13,8 @@ import madgik.exareme.master.engine.iterations.handler.NIterativeAlgorithmResultEntity; import madgik.exareme.master.engine.iterations.state.IterativeAlgorithmState; import madgik.exareme.master.gateway.ExaremeGatewayUtils; -import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.RequestException; import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.BadUserInputException; +import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.RequestException; import madgik.exareme.master.gateway.async.handler.entity.NQueryResultEntity; import madgik.exareme.master.queryProcessor.HBP.AlgorithmProperties; import madgik.exareme.master.queryProcessor.HBP.Algorithms; @@ -160,7 +160,7 @@ private void handleHBPAlgorithmExecution(HttpRequest request, HttpResponse respo queryStatus = dbClient.query(algorithmKey, dfl); log.info("Algorithm " + algorithmKey + " with queryID " - + queryStatus.getQueryID() + " execution started."); + + queryStatus.getQueryID().getQueryID() + " execution started."); log.debug("DFL Script: \n " + dfl); BasicHttpEntity entity = new NQueryResultEntity(queryStatus, ds, diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryResultEntity.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryResultEntity.java index 649d6a632..a0dd23fde 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryResultEntity.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryResultEntity.java @@ -9,14 +9,13 @@ import org.apache.http.nio.entity.HttpAsyncContentProducer; import org.apache.log4j.Logger; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; -/** - * TODO flush output before suspend - */ public class NQueryResultEntity extends BasicHttpEntity implements HttpAsyncContentProducer { private static final Logger log = Logger.getLogger(NQueryResultEntity.class); @@ -55,7 +54,11 @@ public void produceContent(ContentEncoder encoder, IOControl iocontrol) if (!queryStatus.hasError()) { if (channel == null) { - channel = Channels.newChannel(queryStatus.getResult(format)); + String result = queryStatus.getResult(format); + log.info("Algorithm with queryId " + queryStatus.getQueryID().getQueryID() + + " terminated. Result: \n " + result); + channel = Channels.newChannel( + new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8))); } channel.read(buffer); buffer.flip(); @@ -68,32 +71,27 @@ public void produceContent(ContentEncoder encoder, IOControl iocontrol) close(); } } else { - log.trace("|" + queryStatus.getError() + "|"); if (queryStatus.getError().contains("ExaremeError:")) { String data = queryStatus.getError().substring(queryStatus.getError().lastIndexOf("ExaremeError:") + "ExaremeError:".length()).replaceAll("\\s", " "); - //type could be error, user_error, warning regarding the error occurred along the process String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, user_error); logErrorMessage(result); encoder.write(ByteBuffer.wrap(result.getBytes())); encoder.complete(); } else if (queryStatus.getError().contains("PrivacyError")) { String data = "The Experiment could not run with the input provided because there are insufficient data."; - //type could be error, user_error, warning regarding the error occurred along the process String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, warning); logErrorMessage(result); encoder.write(ByteBuffer.wrap(result.getBytes())); encoder.complete(); } else if (queryStatus.getError().contains("java.rmi.RemoteException")) { String data = "One or more containers are not responding. Please inform the system administrator."; - //type could be error, user_error, warning regarding the error occurred along the process String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, error); logErrorMessage(result); encoder.write(ByteBuffer.wrap(result.getBytes())); encoder.complete(); } else { - log.info("Exception from madis: " + queryStatus.getError()); + log.info("Exception when running the query: " + queryStatus.getError()); String data = "Something went wrong. Please inform the system administrator."; - //type could be error, user_error, warning regarding the error occurred along the process String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, error); logErrorMessage(result); encoder.write(ByteBuffer.wrap(result.getBytes())); @@ -110,12 +108,11 @@ public boolean isRepeatable() { } public void closeQuery() throws IOException { - log.info("Closing from Query Result : " + queryStatus.getQueryID().getQueryID()); queryStatus.close(); } @Override - public void close() throws IOException { + public void close() { } diff --git a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryStatusEntity.java b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryStatusEntity.java index 2986eccd6..0db185cf9 100644 --- a/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryStatusEntity.java +++ b/Exareme-Docker/src/exareme/exareme-master/src/main/java/madgik/exareme/master/gateway/async/handler/entity/NQueryStatusEntity.java @@ -69,7 +69,7 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl) } @Override - public void close() throws IOException { + public void close() { } diff --git a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/containerStatusMgr/PeriodicContainersStatusCheck.java b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/containerStatusMgr/PeriodicContainersStatusCheck.java index 90f10e831..2d1b1775e 100644 --- a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/containerStatusMgr/PeriodicContainersStatusCheck.java +++ b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/containerStatusMgr/PeriodicContainersStatusCheck.java @@ -31,7 +31,7 @@ public PeriodicContainersStatusCheck(PlanEventScheduler planEventScheduler) { } - public void addConainerToCheck(EntityName container) { + public void addContainerToCheck(EntityName container) { log.debug("Adding container to check: " + container); containersToCheck.add(container); } diff --git a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/DynamicPlanManager.java b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/DynamicPlanManager.java index aa73a8e3c..fd2bf4c36 100644 --- a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/DynamicPlanManager.java +++ b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/DynamicPlanManager.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @author herald @@ -34,6 +36,7 @@ public class DynamicPlanManager implements PlanSessionManagerInterface { private final HashMap elasticTreeSessions = new HashMap<>(); private EventProcessor eventProcessor = null; private long sessionCount = 0; + ReentrantLock sessionCountLock = new ReentrantLock(); private long containerSessionCount = 0; /* ROOT sessions */ private Map schedulerMap = null; @@ -71,11 +74,13 @@ public void setExecutionEngine(ExecutionEngine executionEngine) { @Override public void createGlobalScheduler() throws RemoteException { + sessionCountLock.lock(); PlanSessionID sessionID = new PlanSessionID(sessionCount); PlanSessionReportID reportID = new PlanSessionReportID(sessionCount); - reportID.reportManagerProxy = executionEngine.getPlanSessionReportManagerProxy(reportID); sessionCount++; + sessionCountLock.unlock(); + reportID.reportManagerProxy = executionEngine.getPlanSessionReportManagerProxy(reportID); PlanEventScheduler eventScheduler = new PlanEventScheduler(sessionID, reportID, eventProcessor, this, resourceManager, registryProxy); @@ -86,12 +91,13 @@ public void createGlobalScheduler() throws RemoteException { @Override public PlanSessionID createNewSession() throws RemoteException { - // TODO Is this thread safe? + sessionCountLock.lock(); PlanSessionID sessionID = new PlanSessionID(sessionCount); PlanSessionReportID reportID = new PlanSessionReportID(sessionCount); - reportID.reportManagerProxy = executionEngine.getPlanSessionReportManagerProxy(reportID); sessionCount++; + sessionCountLock.unlock(); + reportID.reportManagerProxy = executionEngine.getPlanSessionReportManagerProxy(reportID); PlanEventScheduler eventScheduler = new PlanEventScheduler(sessionID, reportID, eventProcessor, this, resourceManager, registryProxy); @@ -105,8 +111,7 @@ public PlanSessionID createNewSession() throws RemoteException { } @Override - public ContainerSessionID createContainerSession(PlanSessionID planSessionID) - throws RemoteException { + public ContainerSessionID createContainerSession(PlanSessionID planSessionID) { ContainerSessionID containerSessionID = new ContainerSessionID(containerSessionCount); containerSessionCount++; LinkedList containerSessionIDs = containerSessionMap.get(planSessionID); diff --git a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventScheduler.java b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventScheduler.java index f3b014da5..1c2945a50 100644 --- a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventScheduler.java +++ b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventScheduler.java @@ -301,7 +301,6 @@ private void validatePlan(ExecutionPlan plan) throws RemoteException { public void execute(ExecutionPlan plan) throws RemoteException { lock.lock(); try { - log.debug("Plan submitted for execution ..."); EditableExecutionPlan newPlan = preprocessPlan(plan); validatePlan(newPlan); log.debug("PlanAfterPreprocessing: " + newPlan.toString()); diff --git a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventSchedulerState.java b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventSchedulerState.java index ebb019489..69a70e32f 100644 --- a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventSchedulerState.java +++ b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/PlanEventSchedulerState.java @@ -308,7 +308,7 @@ public void addAllContainerCheck() { for (String containerName : plan.iterateContainers()) { try { if (!containerName.contains("any")) { - pcsc.addConainerToCheck(plan.getContainer(containerName)); + pcsc.addContainerToCheck(plan.getContainer(containerName)); } } catch (SemanticError semanticError) { semanticError.printStackTrace(); diff --git a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/event/planTermination/PlanTerminationEventHandler.java b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/event/planTermination/PlanTerminationEventHandler.java index 364e011fb..98cc934ff 100644 --- a/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/event/planTermination/PlanTerminationEventHandler.java +++ b/Exareme-Docker/src/exareme/exareme-worker/src/main/java/madgik/exareme/worker/art/executionEngine/dynamicExecutionEngine/event/planTermination/PlanTerminationEventHandler.java @@ -33,8 +33,8 @@ public void preProcess(PlanTerminationEvent event, PlanEventSchedulerState state try { log.debug("closing session of container : " + proxy.getEntityName().getName()); proxy.destroySessions(state.getPlanSessionID()); - } catch (RemoteException e) { - // state.addException(e); + } catch (Exception e) { + log.error("Cannot close the sessions for proxy: " + proxy, e); // throw new ServerException("Cannot close all sessions", e); } }