Skip to content

Commit

Permalink
Merge pull request #299 from madgik/bug/stability_issues
Browse files Browse the repository at this point in the history
Bug/stability issues
  • Loading branch information
ThanKarab authored Jan 18, 2021
2 parents 98c7f60 + 27b72fc commit 6b67910
Show file tree
Hide file tree
Showing 41 changed files with 234 additions and 144 deletions.
2 changes: 1 addition & 1 deletion Exareme-Docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN apt install -y --no-install-recommends python2
RUN ln -s /usr/bin/python2 /usr/bin/python

# Installing Exareme requirements
RUN apt install -y openjdk-8-jdk curl jq
RUN apt install -y openjdk-8-jdk curl jq iputils-ping

# Installing pip
RUN curl -O https://raw.githubusercontent.com/pypa/get-pip/master/get-pip.py
Expand Down
104 changes: 61 additions & 43 deletions Exareme-Docker/files/root/exareme/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ export CONSUL_ACTIVE_WORKERS_PATH="active_workers"
CONSUL_CONNECTION_MAX_ATTEMPTS=20
CONSUL_WAIT_FOR_MASTER_IP_MAX_ATTEMPTS=20
EXAREME_NODE_STARTUP_HEALTH_CHECK_MAX_ATTEMPTS=10
EXAREME_NODE_HEALTH_CHECK_TIMEOUT=30
PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=10
EXAREME_NODE_HEALTH_CHECK_TIMEOUT=60
MASTER_NODE_REACHABLE_TIMEOUT=5
PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=3
PERIODIC_EXAREME_NODES_HEALTH_CHECK_INTERVAL=120
EXAREME_HEALTH_CHECK_AWAIT_TIME=20
PERIODIC_TEMP_FILES_REMOVAL=300

if [[ -z ${CONSULURL} ]]; then
Expand Down Expand Up @@ -87,7 +89,7 @@ convertCSVsToDB() {
# Skip convertion if flag is false
if [[ ${CONVERT_CSVS} == "FALSE" ]]; then
echo "$(timestamp) CSV convertion turned off. "
return 0
return 0
fi

# Removing all previous .db files from the DOCKER_DATA_FOLDER
Expand Down Expand Up @@ -116,9 +118,9 @@ exaremeNodesHealthCheck() {
echo "$(timestamp) HEALTH CHECK for node with IP ${NODE_IP} and name ${NODE_NAME} ."

if [[ "${FEDERATION_ROLE}" == "master" ]]; then
check=$(curl -s -X POST --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} ${NODE_IP}:9090/mining/query/HEALTH_CHECK)
check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${NODE_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}")
else
check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} ${MASTER_IP}:9092/check/worker?NODE_IP=${NODE_IP})
check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${MASTER_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}")
fi

if [[ -z ${check} ]]; then
Expand All @@ -144,51 +146,68 @@ exaremeNodesHealthCheck() {
# Exareme health check on startup
startupExaremeNodesHealthCheck() {
# If health check fails then try again until it succeeds or close the container.
if ! exaremeNodesHealthCheck; then
attempts=0
while ! exaremeNodesHealthCheck; do
if [[ $attempts -ge $EXAREME_NODE_STARTUP_HEALTH_CHECK_MAX_ATTEMPTS ]]; then
echo -e "\n$(timestamp) HEALTH CHECK FAILED. Closing the container."
return 1 # Exiting
fi
echo "$(timestamp) HEALTH CHECK failed. Trying again..."
attempts=$(($attempts + 1))
sleep 5
done
fi
attempts=0
while ! exaremeNodesHealthCheck; do
if [[ $attempts -ge $EXAREME_NODE_STARTUP_HEALTH_CHECK_MAX_ATTEMPTS ]]; then
echo -e "\n$(timestamp) HEALTH CHECK FAILED. Closing the container."
return 1 # Exiting
fi
echo "$(timestamp) HEALTH CHECK failed. Trying again..."
attempts=$(($attempts + 1))
sleep $EXAREME_HEALTH_CHECK_AWAIT_TIME
done
echo "$(timestamp) HEALTH CHECK successful on NODE_IP: $NODE_IP"
return 0
}

# Periodic check for exareme's health.
# If it fails shutdown the container
periodicExaremeNodesHealthCheck() {
# If consul doesn't have master node's IP it means that it restarted. The nodes should restart.
if ! getMasterIPFromConsul; then
pkill -f 1 # Closing main bootstrap.sh process to stop the container.
fi

# Make a health check every 5 minutes.
while true; do
sleep $PERIODIC_EXAREME_NODES_HEALTH_CHECK_INTERVAL

# If consul doesn't have master node's IP it means that it restarted. The nodes should restart.
if ! getMasterIPFromConsul; then
pkill -f 1 # Closing main bootstrap.sh process to stop the container.
fi

# If health check fails then try again until it succeeds or close the container.
if ! exaremeNodesHealthCheck; then
attempts=0
while ! exaremeNodesHealthCheck; do
if [[ $attempts -ge $PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES ]]; then
echo -e "\n$(timestamp) HEALTH CHECK FAILED. Closing the container."
pkill -f 1 # Closing main bootstrap.sh process to stop the container.
fi
echo "$(timestamp) HEALTH CHECK failed. Trying again..."
attempts=$(($attempts + 1))
sleep 5
done
fi
attempts=0
while ! exaremeNodesHealthCheck; do
if [[ $attempts -ge $PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES ]]; then
echo -e "\n$(timestamp) HEALTH CHECK FAILED. Closing the container."
pkill -f 1 # Closing main bootstrap.sh process to stop the container.
fi
echo "$(timestamp) HEALTH CHECK failed. Trying again..."
attempts=$(($attempts + 1))
sleep $EXAREME_HEALTH_CHECK_AWAIT_TIME
done
echo "$(timestamp) HEALTH CHECK successful on NODE_IP: $NODE_IP"
done
}

# Periodic check that the master node is reachable
# If it fails shutdown the container
periodicReachableMasterNodeCheck() {
# If consul doesn't have master node's IP it means that it restarted. The nodes should restart.
if ! getMasterIPFromConsul; then
pkill -f 1 # Closing main bootstrap.sh process to stop the container.
fi

# Check that master is reachable every 5 seconds.
while true; do
sleep 5

if ! ping -c 5 -W 2 $MASTER_IP &>/dev/null ; then
echo -e "\n$(timestamp) MASTER NODE IS UNREACHABLE. Closing the container."
pkill -f 1 # Closing main bootstrap.sh process to stop the container.
fi

#echo "$(timestamp) HEALTH CHECK successful from NODE_IP: $NODE_IP, MASTER NODE IS REACHABLE"
done
}

# Periodic deletion of temp files
startTempFilesDeletionTask() {
while true; do
Expand Down Expand Up @@ -218,6 +237,13 @@ echo "Madis Server started."

waitForConsulToStart

# Prepare datasets from CSVs to SQLite db files
convertCSVsToDB

# Updating consul with node's datasets.
echo "$(timestamp) Updating consul with node's datasets."
./set-local-datasets.sh

# Running bootstrap on a master node
if [[ "${FEDERATION_ROLE}" == "master" ]]; then

Expand All @@ -236,9 +262,6 @@ if [[ "${FEDERATION_ROLE}" == "master" ]]; then

periodicExaremeNodesHealthCheck &

# Prepare datasets from CSVs to SQLite db files
convertCSVsToDB

else ##### Running bootstrap on a worker node #####

if ! getMasterIPFromConsul; then
Expand All @@ -260,15 +283,10 @@ else ##### Running bootstrap on a worker node #####

periodicExaremeNodesHealthCheck &

# Prepare datasets from CSVs to SQLite db files
convertCSVsToDB
periodicReachableMasterNodeCheck &

fi

# Updating consul with node's datasets.
echo "$(timestamp) Updating consul with node's datasets."
./set-local-datasets.sh

startTempFilesDeletionTask &

# Creating the python log file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public boolean hasFinished() throws RemoteException {

try {
String algorithmResult = IOUtils.toString(getResult(DataSerialization.summary), StandardCharsets.UTF_8);
log.info("Algorithm with queryId" + getQueryID() + " terminated. Result: \n " + algorithmResult);
log.info("Algorithm with queryId " + getQueryID().getQueryID() + " terminated. Result: \n " + algorithmResult);
} catch (IOException e) {
log.error("Could not read the algorithm result table." + getQueryID());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,12 @@ public void run() {
statusManager.getStatistics(status.getId()).setTotalOperators(stats.getTotalProc());
statusManager.getStatistics(status.getId()).setTotalDataTransfers(stats.getTotalData());

while (!sessionManager.hasFinished() && !sessionManager.hasError()) {

while (sessionManager.hasFinished() == false && sessionManager.hasError() == false) {

Thread.sleep(100 * statsUpdateSecs);
boolean updateProgressStatistics = updateProgressStatistics();
sessionManager = sessionPlan.getPlanSessionStatusManagerProxy();
statsManager = sessionPlan.getPlanSessionStatisticsManagerProxy();
if (sessionManager == null || statsManager == null) {
log.info("--+ error");
}
if (updateProgressStatistics) {
log.info("Session is running...");
log.info("Session is running... ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
log.debug("Update listeners ...");
synchronized (listeners) {
for (AdpDBQueryListener l : listeners) {
Expand All @@ -80,21 +74,35 @@ public void run() {
}
}

Thread.sleep(100 * statsUpdateSecs);

// Reload the managers
sessionManager = sessionPlan.getPlanSessionStatusManagerProxy();
statsManager = sessionPlan.getPlanSessionStatisticsManagerProxy();
if (sessionManager == null || statsManager == null) {
log.error("Session Manager or stats Manager null! " + sessionManager + ", " + statsManager);
}
}

updateProgressStatistics();
statusManager.getStatistics(status.getId())
.setAdpEngineStatistics(statsManager.getStatistics());

if (sessionManager != null && sessionManager.hasError() == false) {
if (sessionManager != null && !sessionManager.hasError()) {
log.debug("Session finished, closing! ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
statusManager.setFinished(status.getId());
} else {
statusManager.setError(status.getId(), sessionManager.getErrorList().get(0));
}
log.debug("Session closing! ID: "+ sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
sessionPlan.close();

} catch (Exception e) {
statusManager.setError(status.getId(), e);
log.error("Cannot monitor job!", e);
log.error("Cannot monitor job, sessionID: " + sessionPlan.getSessionID().getLongId());
log.error("Cannot monitor job, queryID: " + status.getQueryID().getQueryID(), e);
} finally {
log.debug("Terminate listeners ( " + listeners.size() + ")...");
synchronized (listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOEx
this.buffer.compact();
if (i < 1 && !buffering) {
encoder.complete();
closeQuery();
close();
}
} else {
encoder.write(ByteBuffer.wrap(
finalizeQueryStatus.getError().getBytes()));
encoder.complete();
closeQuery();
close();
}
} else {
// Algorithm execution failed, notify the client.
Expand Down Expand Up @@ -164,15 +168,16 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOEx
encoder.write(buffer);
this.buffer.compact();
encoder.complete();
closeQuery();
close();
}
} finally {
if (iterativeAlgorithmState != null)
iterativeAlgorithmState.releaseLock();
}
}

@Override
public void close() throws IOException {
public void closeQuery() throws IOException {
if (finalizeQueryStatus != null) {
// Case in which algorithm execution failed
finalizeQueryStatus.close();
Expand All @@ -181,6 +186,11 @@ public void close() throws IOException {
iterativeAlgorithmState = null;
}

@Override
public void close() throws IOException {

}

@Override
public boolean isRepeatable() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.log4j.Logger;

import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.Map;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ private void handleHBPAlgorithmExecution(HttpRequest request, HttpResponse respo
AdpDBClientFactory.createDBClient(manager, clientProperties);
queryStatus = dbClient.query(algorithmKey, dfl);

log.debug("Algorithm " + algorithmKey + " with queryID "
+ queryStatus.getQueryID() + " execution started. DFL Script: \n " + dfl);
log.info("Algorithm " + algorithmKey + " with queryID "
+ queryStatus.getQueryID() + " execution started.");
log.debug("DFL Script: \n " + dfl);

BasicHttpEntity entity = new NQueryResultEntity(queryStatus, ds,
ExaremeGatewayUtils.RESPONSE_BUFFER_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,21 @@ private void handleInternal(
String target = request.getRequestLine().getUri();
final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {

response.setStatusCode(HttpStatus.SC_NOT_FOUND);
NStringEntity entity = new NStringEntity(
"<html><body><h1>File" +
" not found</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");

} else if (!file.canRead() || file.isDirectory() || !file.getCanonicalPath().startsWith(this.docRoot.getCanonicalPath())) {

response.setStatusCode(HttpStatus.SC_FORBIDDEN);
NStringEntity entity = new NStringEntity(
"<html><body><h1>Access denied</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());
log.debug("Cannot read file " + file.getPath());

} else if (file.getPath().endsWith(".ser")) {
HttpCoreContext coreContext = HttpCoreContext.adapt(context);
Expand Down
Loading

0 comments on commit 6b67910

Please sign in to comment.