Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for exareme hanging when containers fail. #304

Merged
merged 4 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions Exareme-Docker/files/root/exareme/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ CONSUL_WAIT_FOR_MASTER_IP_MAX_ATTEMPTS=20
EXAREME_NODE_STARTUP_HEALTH_CHECK_MAX_ATTEMPTS=10
EXAREME_NODE_HEALTH_CHECK_TIMEOUT=60
MASTER_NODE_REACHABLE_TIMEOUT=5
PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=3
PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=5
PERIODIC_EXAREME_NODES_HEALTH_CHECK_INTERVAL=120
EXAREME_HEALTH_CHECK_AWAIT_TIME=60
PERIODIC_TEMP_FILES_REMOVAL=300
Expand Down Expand Up @@ -118,7 +118,7 @@ exaremeNodesHealthCheck() {
echo "$(timestamp) HEALTH CHECK for node with IP ${NODE_IP} and name ${NODE_NAME} ."

if [[ "${FEDERATION_ROLE}" == "master" ]]; then
check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${NODE_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}")
check=$(curl -s -X POST --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} ${NODE_IP}:9090/mining/query/HEALTH_CHECK)
else
check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${MASTER_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}")
fi
Expand Down Expand Up @@ -255,11 +255,6 @@ if [[ "${FEDERATION_ROLE}" == "master" ]]; then
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_MASTER_PATH}/${NODE_NAME} <<<${NODE_IP}
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP}

if ! startupExaremeNodesHealthCheck; then
echo "$(timestamp) HEALTH CHECK algorithm failed. Switch ENVIRONMENT_TYPE to 'DEV' to see error messages coming from EXAREME. Exiting..."
exit 1
fi

periodicExaremeNodesHealthCheck &

else ##### Running bootstrap on a worker node #####
Expand All @@ -271,16 +266,16 @@ else ##### Running bootstrap on a worker node #####

echo "$(timestamp) Starting Exareme on worker node with IP: ${NODE_IP} and nodeName: ${NODE_NAME}"
. ./start-worker.sh

# Updating consul with node IP
echo -e "\n$(timestamp) Updating consul with worker node IP."
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP}


if ! startupExaremeNodesHealthCheck; then
echo "$(timestamp) HEALTH CHECK algorithm failed. Switch ENVIRONMENT_TYPE to 'DEV' to see error messages coming from EXAREME. Exiting..."
exit 1
fi

# Updating consul with node IP
echo -e "\n$(timestamp) Updating consul with worker node IP."
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP}

periodicExaremeNodesHealthCheck &

periodicReachableMasterNodeCheck &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public interface AdpDBClientQueryStatus {

void registerListener(AdpDBQueryListener listener) throws RemoteException;

InputStream getResult() throws RemoteException;
String getResult() throws RemoteException;

InputStream getResult(DataSerialization ds) throws RemoteException;
String getResult(DataSerialization ds) throws RemoteException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.FutureTask;

/**
* @author alex
Expand All @@ -35,7 +37,8 @@ public class RmiAdpDBClientQueryStatus implements AdpDBClientQueryStatus {
private String lastStatus;
private TimeFormat timeF;
private boolean finished;
private InputStream result;
private boolean error;
private String result;

public RmiAdpDBClientQueryStatus(AdpDBQueryID queryId, AdpDBClientProperties properties,
AdpDBQueryExecutionPlan plan, AdpDBStatus status) {
Expand All @@ -45,6 +48,7 @@ public RmiAdpDBClientQueryStatus(AdpDBQueryID queryId, AdpDBClientProperties pro
this.lastStatus = null;
this.timeF = new TimeFormat(TimeUnit.min);
this.finished = false;
this.error = false;
result = null;
}

Expand All @@ -61,13 +65,6 @@ public boolean hasFinished() throws RemoteException {
if (!status.hasFinished() && !status.hasError())
return false;

try {
String algorithmResult = IOUtils.toString(getResult(DataSerialization.summary), StandardCharsets.UTF_8);
log.info("Algorithm with queryId " + getQueryID().getQueryID() + " terminated. Result: \n " + algorithmResult);
} catch (IOException e) {
log.error("Could not read the algorithm result table." + getQueryID());
}

finished = true;
return true;
}
Expand Down Expand Up @@ -113,7 +110,7 @@ public void registerListener(AdpDBQueryListener listener) throws RemoteException
}

@Override
public InputStream getResult() throws RemoteException {
public String getResult() throws RemoteException {
return getResult(DataSerialization.ldjson);
}

Expand All @@ -129,14 +126,27 @@ public InputStream getResult() throws RemoteException {
* @throws RemoteException
*/
@Override
public InputStream getResult(DataSerialization ds) throws RemoteException {
public String getResult(DataSerialization ds) throws RemoteException {

// The registry should be updated the 1st time we fetch a result stream.
if (result == null) {
updateRegistry();
}
result = new RmiAdpDBClient(AdpDBManagerLocator.getDBManager(), properties)
InputStream resultStream = new RmiAdpDBClient(AdpDBManagerLocator.getDBManager(), properties)
.readTable(plan.getResultTables().get(0).getName(), ds);

FutureTask<String> getResultFromStream;
try {
getResultFromStream = new FutureTask<>(() ->
IOUtils.toString(resultStream, StandardCharsets.UTF_8));

new Thread(getResultFromStream).start();
result = getResultFromStream.get(30, java.util.concurrent.TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Error reading the result table! QueryID:" + status.getQueryID().getQueryID(), e);
throw new RemoteException("Could not read the result table!");
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ public void run() {
statusManager.getStatistics(status.getId()).setTotalDataTransfers(stats.getTotalData());

while (!sessionManager.hasFinished() && !sessionManager.hasError()) {

boolean updateProgressStatistics = updateProgressStatistics();
if (updateProgressStatistics) {
log.info("Session is running... ID: " + sessionPlan.getSessionID().getLongId()
log.info("Session is updating... ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
log.debug("Update listeners ...");
synchronized (listeners) {
Expand All @@ -89,13 +88,15 @@ public void run() {
.setAdpEngineStatistics(statsManager.getStatistics());

if (sessionManager != null && !sessionManager.hasError()) {
log.debug("Session finished, closing! ID: " + sessionPlan.getSessionID().getLongId()
log.info("Session finished, closing! ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
statusManager.setFinished(status.getId());
} else {
log.info("Session error! ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
statusManager.setError(status.getId(), sessionManager.getErrorList().get(0));
}
log.debug("Session closing! ID: "+ sessionPlan.getSessionID().getLongId()
log.debug("Session closing! ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
sessionPlan.close();

Expand All @@ -104,7 +105,6 @@ public void run() {
log.error("Cannot monitor job, sessionID: " + sessionPlan.getSessionID().getLongId());
log.error("Cannot monitor job, queryID: " + status.getQueryID().getQueryID(), e);
} finally {
log.debug("Terminate listeners ( " + listeners.size() + ")...");
synchronized (listeners) {
for (AdpDBQueryListener l : listeners) {
l.terminated(queryID, status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ public AdpDBStatus executeScript(AdpDBQueryExecutionPlan execPlan, AdpDBClientPr
AdpDBArtJobMonitor monitor =
new AdpDBArtJobMonitor(sessionPlan, status, statusManager, execPlan.getQueryID());
monitors.put(execPlan.getQueryID(), monitor);

executor.submit(monitor);
statusArray.add(status);
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ public NIterativeAlgorithmResultEntity(IterativeAlgorithmState iterativeAlgorith
this.dataSerialization = dataSerialization;
}

private final static String user_error = new String("text/plain+user_error");
private final static String error = new String("text/plain+error");
private final static String warning = new String("text/plain+warning");


/**
* @param encoder is used to save the output
* @param ioctrl will be used from the iterativeAlgorithmState, when the algorithm is complete,
Expand Down Expand Up @@ -87,9 +82,11 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOEx
if (!finalizeQueryStatus.hasError() &&
finalizeQueryStatus.hasFinished()) {
if (channel == null) {
String result = iterativeAlgorithmState.getAdpDBClientQueryStatus().getResult(dataSerialization);
log.info("Iterative algorithm with key " + iterativeAlgorithmState.getAlgorithmKey()
+ " terminated. Result: \n " + result);
channel = Channels.newChannel(
iterativeAlgorithmState.getAdpDBClientQueryStatus()
.getResult(dataSerialization));
new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8)));
}
// Reading from the channel to the buffer, flip is required by the API
channel.read(buffer);
Expand Down Expand Up @@ -183,11 +180,13 @@ public void closeQuery() throws IOException {
finalizeQueryStatus.close();
finalizeQueryStatus = null;
}
if (iterativeAlgorithmState != null)
iterativeAlgorithmState.releaseLock();
iterativeAlgorithmState = null;
}

@Override
public void close() throws IOException {
public void close() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import madgik.exareme.common.app.engine.AdpDBQueryID;
import madgik.exareme.master.client.AdpDBClientQueryStatus;
import madgik.exareme.master.engine.iterations.scheduler.IterationsDispatcher;
import madgik.exareme.master.engine.iterations.scheduler.events.phaseCompletion.PhaseCompletionEventHandler;
import madgik.exareme.master.engine.iterations.state.IterationsStateManager;
import madgik.exareme.master.engine.iterations.state.IterativeAlgorithmState;
import madgik.exareme.utils.eventProcessor.EventHandler;
Expand Down Expand Up @@ -56,8 +55,9 @@ protected AdpDBClientQueryStatus submitQueryAndUpdateExecutionPhase(
ias.getAlgorithmKey(),
dflScript);

log.info("New Iterative phase: " + currentPhase);
log.info("Executing Iterative DFL Script: \n" + dflScript);
log.info("New Iterative phase: " + currentPhase + " for algorithm: " + ias.getAlgorithmKey() +
" with queryID: " + queryStatus.getQueryID().getQueryID());
log.debug("Executing Iterative DFL Script: \n" + dflScript);

ias.setCurrentExecutionPhase(currentPhase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public void handle(PhaseCompletionEvent event, EventProcessor proc) throws Remot

String terminationConditionResult;
try {
InputStream previousResultStream = ias.getAdpDBClientQueryStatus().getResult();
terminationConditionResult = IOUtils.toString(previousResultStream, StandardCharsets.UTF_8);
terminationConditionResult = ias.getAdpDBClientQueryStatus().getResult();
} catch (IOException e) {
throw new IterationsStateFatalException(
"Could not read the termination_condition result table.", ias.getAlgorithmKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import madgik.exareme.utils.eventProcessor.EventProcessor;

import java.rmi.RemoteException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
import madgik.exareme.common.app.engine.AdpDBStatistics;
import madgik.exareme.master.engine.AdpDBManagerLocator;
import madgik.exareme.worker.art.executionEngine.session.ExecutionEngineSessionPlan;
import org.apache.log4j.Logger;

import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.Map;

/**
* @author herald
*/
public class AdpDBJobSession {
private static final Logger log = Logger.getLogger(AdpDBJobSession.class);
private boolean finished = false;
private boolean error = false;
private Exception exception = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import madgik.exareme.master.engine.iterations.handler.NIterativeAlgorithmResultEntity;
import madgik.exareme.master.engine.iterations.state.IterativeAlgorithmState;
import madgik.exareme.master.gateway.ExaremeGatewayUtils;
import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.RequestException;
import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.BadUserInputException;
import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.RequestException;
import madgik.exareme.master.gateway.async.handler.entity.NQueryResultEntity;
import madgik.exareme.master.queryProcessor.HBP.AlgorithmProperties;
import madgik.exareme.master.queryProcessor.HBP.Algorithms;
Expand Down Expand Up @@ -160,7 +160,7 @@ private void handleHBPAlgorithmExecution(HttpRequest request, HttpResponse respo
queryStatus = dbClient.query(algorithmKey, dfl);

log.info("Algorithm " + algorithmKey + " with queryID "
+ queryStatus.getQueryID() + " execution started.");
+ queryStatus.getQueryID().getQueryID() + " execution started.");
log.debug("DFL Script: \n " + dfl);

BasicHttpEntity entity = new NQueryResultEntity(queryStatus, ds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
import org.apache.http.nio.entity.HttpAsyncContentProducer;
import org.apache.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;

/**
* TODO flush output before suspend
*/
public class NQueryResultEntity extends BasicHttpEntity implements HttpAsyncContentProducer {

private static final Logger log = Logger.getLogger(NQueryResultEntity.class);
Expand Down Expand Up @@ -55,7 +54,11 @@ public void produceContent(ContentEncoder encoder, IOControl iocontrol)

if (!queryStatus.hasError()) {
if (channel == null) {
channel = Channels.newChannel(queryStatus.getResult(format));
String result = queryStatus.getResult(format);
log.info("Algorithm with queryId " + queryStatus.getQueryID().getQueryID()
+ " terminated. Result: \n " + result);
channel = Channels.newChannel(
new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8)));
}
channel.read(buffer);
buffer.flip();
Expand All @@ -68,32 +71,27 @@ public void produceContent(ContentEncoder encoder, IOControl iocontrol)
close();
}
} else {
log.trace("|" + queryStatus.getError() + "|");
if (queryStatus.getError().contains("ExaremeError:")) {
String data = queryStatus.getError().substring(queryStatus.getError().lastIndexOf("ExaremeError:") + "ExaremeError:".length()).replaceAll("\\s", " ");
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, user_error);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
encoder.complete();
} else if (queryStatus.getError().contains("PrivacyError")) {
String data = "The Experiment could not run with the input provided because there are insufficient data.";
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, warning);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
encoder.complete();
} else if (queryStatus.getError().contains("java.rmi.RemoteException")) {
String data = "One or more containers are not responding. Please inform the system administrator.";
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, error);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
encoder.complete();
} else {
log.info("Exception from madis: " + queryStatus.getError());
log.info("Exception when running the query: " + queryStatus.getError());
String data = "Something went wrong. Please inform the system administrator.";
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, error);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
Expand All @@ -110,12 +108,11 @@ public boolean isRepeatable() {
}

public void closeQuery() throws IOException {
log.info("Closing from Query Result : " + queryStatus.getQueryID().getQueryID());
queryStatus.close();
}

@Override
public void close() throws IOException {
public void close() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl)
}

@Override
public void close() throws IOException {
public void close() {

}

Expand Down
Loading