Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
Signed-off-by: gengjun-git <[email protected]>
  • Loading branch information
gengjun-git committed Feb 19, 2025
1 parent ebe413f commit ace0455
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 93 deletions.
46 changes: 28 additions & 18 deletions bin/stop_be.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ export_shared_envvars

pidfile=$PID_DIR/be.pid

sig=9
SIG=9
TIME_OUT=60

OPTS=$(getopt \
-n $0 \
-o 'g' \
-l 'graceful' \
-l 'timeout' \
-- "$@")

eval set -- "$OPTS"

usage() {
echo "
Expand All @@ -57,15 +67,13 @@ Options:
exit 0
}

for arg in "$@"
do
case $arg in
--help|-h)
usage
;;
--graceful|-g)
sig=15
;;
while true; do
case "$1" in
--timeout) TIME_OUT=$2 ; shift 2 ;;
--help|-h) usage ; shift ;;
--graceful|-g) SIG=15 ; shift ;;
--) shift ; break ;;
*) echo "Internal error" ; exit 1 ;;
esac
done

Expand All @@ -87,14 +95,16 @@ if [ -f $pidfile ]; then
exit 1
fi

if kill -0 $pid >/dev/null 2>&1; then
kill -${sig} $pid > /dev/null 2>&1
if [ $? -ne 0 ]; then
exit 1
fi
while kill -0 $pid >/dev/null 2>&1; do
start_ts=$(date +%s)
while kill -0 $pid >/dev/null 2>&1; do
if [ $(($(date +%s) - $start_ts)) -gt $TIME_OUT ]; then
kill -9 $pid
echo "graceful exit timeout, forced termination of the process"
break
else
sleep 1
done
fi
fi
done

rm $pidfile
fi
62 changes: 39 additions & 23 deletions bin/stop_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@
curdir=`dirname "$0"`
curdir=`cd "$curdir"; pwd`

OPTS=$(getopt \
-n $0 \
-o '' \
-l 'timeout' \
-- "$@")

eval set -- "$OPTS"

TIME_OUT=60
while true; do
case "$1" in
--timeout) TIME_OUT=$2 ; shift 2;;
--) shift ; break ;;
*) echo "Internal error" ; exit 1 ;;
esac
done

export STARROCKS_HOME=`cd "$curdir/.."; pwd`
# compatible with DORIS_HOME: DORIS_HOME still be using in config on the user side, so set DORIS_HOME to the meaningful value in case of wrong envs.
export DORIS_HOME="$STARROCKS_HOME"
Expand All @@ -31,29 +48,28 @@ export_env_from_conf $STARROCKS_HOME/conf/fe.conf
pidfile=$PID_DIR/fe.pid

if [ -f $pidfile ]; then
pid=`cat $pidfile`
pidcomm=`ps -p $pid -o comm=`
pid=`cat $pidfile`
pidcomm=`ps -p $pid -o comm=`

if [ "java" != "$pidcomm" ]; then
echo "ERROR: pid process may not be fe. "
fi

if kill $pid > /dev/null 2>&1; then
while true
do
# check if fe proc is still alive
if ps -p $pid > /dev/null; then
echo "waiting fe to stop, pid: $pid"
sleep 2
else
echo "stop $pidcomm, and remove pid file. "
# This file will also be deleted within the process
if [ -f $pidfile ]; then
rm $pidfile
fi
break
fi
done
fi
if [ "java" != "$pidcomm" ]; then
echo "ERROR: pid process may not be fe. "
fi

kill $pid > /dev/null 2>&1
if [ $? -ne 0 ]; then
exit 1
fi

start_ts=$(date +%s)
while kill -0 $pid >/dev/null 2>&1; do
if [ $(($(date +%s) - $start_ts)) -gt $TIME_OUT ]; then
kill -9 $pid
echo "graceful exit timeout, forced termination of the process"
break
else
sleep 1
fi
done
rm $pidfile
fi

108 changes: 72 additions & 36 deletions fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@
import com.starrocks.common.Log4jConfig;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.Version;
import com.starrocks.ha.BDBHA;
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.ha.StateChangeExecutor;
import com.starrocks.http.HttpServer;
import com.starrocks.journal.Journal;
import com.starrocks.journal.JournalWriter;
import com.starrocks.journal.bdbje.BDBEnvironment;
import com.starrocks.journal.bdbje.BDBJEJournal;
import com.starrocks.journal.bdbje.BDBTool;
Expand All @@ -53,25 +56,30 @@
import com.starrocks.qe.CoordinatorMonitor;
import com.starrocks.qe.QeService;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.GracefulExitFlag;
import com.starrocks.server.RunMode;
import com.starrocks.service.ExecuteEnv;
import com.starrocks.service.FrontendOptions;
import com.starrocks.service.FrontendThriftServer;
import com.starrocks.service.arrow.flight.sql.ArrowFlightSqlService;
import com.starrocks.staros.StarMgrServer;
import com.starrocks.system.Frontend;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import sun.misc.Signal;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.nio.channels.FileLock;
import java.util.List;

public class StarRocksFE {
private static final Logger LOG = LogManager.getLogger(StarRocksFE.class);
Expand Down Expand Up @@ -177,10 +185,10 @@ public static void start(String starRocksDir, String pidDir, String[] args) {

ThreadPoolManager.registerAllThreadPoolMetric();

addShutdownHook();

RestoreClusterSnapshotMgr.finishRestoring();

handleGracefulExit(qeService);

LOG.info("FE started successfully");

while (!stopped) {
Expand All @@ -195,6 +203,68 @@ public static void start(String starRocksDir, String pidDir, String[] args) {
System.exit(0);
}

private static void handleGracefulExit(QeService qeService) {
Signal.handle(new Signal("USR1"), sig -> {
if (canGracefulExit()) {
LOG.info("start to handle graceful exit");
GracefulExitFlag.markGracefulExit();

// stop mysql server, new connection will be rejected
qeService.stop();

// transfer leader if current node is leader
try {
transferLeader();
} catch (InterruptedException e) {
LOG.warn("handle graceful exit failed");
System.exit(-1);
}

// Wait for the query to complete

LOG.info("handle graceful exit successfully");
}

System.exit(0);
});
}

private static void transferLeader() throws InterruptedException {
if (GlobalStateMgr.getCurrentState().isLeader()) {
JournalWriter journalWriter = GlobalStateMgr.getCurrentState().getJournalWriter();
// stop journal writer
journalWriter.stopAndWait();
Journal journal = GlobalStateMgr.getCurrentState().getJournal();

// transfer leader
if (journal instanceof BDBJEJournal) {
BDBEnvironment bdbEnvironment = ((BDBJEJournal) journal).getBdbEnvironment();
if (bdbEnvironment != null) {
// close bdb env, leader election will be triggered
bdbEnvironment.close();
// wait for new leader
while (true) {
try {
InetSocketAddress address = GlobalStateMgr.getCurrentState().getHaProtocol().getLeader();
LOG.info("leader is transferred to {}", address);
break;
} catch (Exception e) {
Thread.sleep(100L);
}
}
}
}

journalWriter.setLeaderTransferred();
}
}

private static boolean canGracefulExit() {
List<Frontend> frontends = GlobalStateMgr.getCurrentState().getNodeMgr().getFrontends(FrontendNodeType.FOLLOWER);
long aliveCnt = frontends.stream().filter(Frontend::isAlive).count();
return aliveCnt >= (frontends.size()) / 2 + 1;
}

/*
* -v --version
* Print the version of StarRocks Frontend
Expand Down Expand Up @@ -370,38 +440,4 @@ private static boolean createAndLockPidFile(String pidFilePath) {

return false;
}

// NOTE: To avoid dead lock
// 1. never call System.exit in shutdownHook
// 2. shutdownHook cannot have lock conflict with the function calling System.exit
private static void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("start to execute shutdown hook");
try {
Thread t = new Thread(() -> {
try {
Journal journal = GlobalStateMgr.getCurrentState().getJournal();
if (journal instanceof BDBJEJournal) {
BDBEnvironment bdbEnvironment = ((BDBJEJournal) journal).getBdbEnvironment();
if (bdbEnvironment != null) {
bdbEnvironment.flushVLSNMapping();
}
}
} catch (Throwable e) {
LOG.warn("flush vlsn mapping failed", e);
}
});

t.start();

// it is necessary to set shutdown timeout,
// because in addition to kill by user, System.exit(-1) will trigger the shutdown hook too,
// if no timeout and shutdown hook blocked indefinitely, Fe will fall into a catastrophic state.
t.join(30000);
} catch (Throwable e) {
LOG.warn("shut down hook failed", e);
}
LOG.info("shutdown hook end");
}));
}
}
20 changes: 13 additions & 7 deletions fe/fe-core/src/main/java/com/starrocks/http/rest/HealthAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import com.starrocks.http.BaseResponse;
import com.starrocks.http.IllegalArgException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.GracefulExitFlag;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;

public class HealthAction extends RestBaseAction {
public HealthAction(ActionController controller) {
Expand All @@ -53,13 +55,17 @@ public static void registerAction(ActionController controller)

@Override
public void execute(BaseRequest request, BaseResponse response) {
response.setContentType("application/json");
if (GracefulExitFlag.isGracefulExit()) {
sendResult(request, response, HttpResponseStatus.INTERNAL_SERVER_ERROR);
} else {
response.setContentType("application/json");

RestResult result = new RestResult();
result.addResultEntry("total_backend_num",
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber());
result.addResultEntry("online_backend_num",
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getAliveBackendNumber());
sendResult(request, response, result);
RestResult result = new RestResult();
result.addResultEntry("total_backend_num",
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber());
result.addResultEntry("online_backend_num",
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getAliveBackendNumber());
sendResult(request, response, result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.journal;

public class DrainingJournalTask extends JournalTask {
public DrainingJournalTask() {
super(System.nanoTime(), null, 0);
}
}
Loading

0 comments on commit ace0455

Please sign in to comment.