Skip to content

Commit

Permalink
[ALLUXIO-3327] Initial implementation of job management service (Allu…
Browse files Browse the repository at this point in the history
…xio#7910)

* [ALLUXIO-3327] Initial implementation of job management service

* [ALLUXIO-3327] Update alluxio-config.sh

* Fix compilation

* client changes + minor versioning changes
  • Loading branch information
apc999 authored Oct 2, 2018
1 parent 775e1c3 commit 89f6eba
Show file tree
Hide file tree
Showing 218 changed files with 39,154 additions and 208 deletions.
10 changes: 10 additions & 0 deletions assembly/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<artifactId>alluxio-core-server-worker</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-job-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-job-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-keyvalue-server</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions bin/alluxio
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ function printUsage {
echo -e " fs \t Command line tool for interacting with the Alluxio filesystem."
echo -e " fsadmin \t Command line tool for use by Alluxio filesystem admins."
echo -e " getConf [key] \t Look up a configuration key, or print all configuration."
echo -e " job \t Command line tool for interacting with the job service."
echo -e " loadufs \t Load existing files in underlayer filesystem into Alluxio."
echo -e " logLevel \t Set or get log level of Alluxio servers."
echo -e " runClass \t Run the main method of an Alluxio class."
Expand Down Expand Up @@ -294,6 +295,11 @@ function main {
ALLUXIO_SHELL_JAVA_OPTS+=" -Dalluxio.conf.validation.enabled=false"
runJavaClass "$@"
;;
"job")
CLASS="alluxio.cli.job.JobShell"
CLASSPATH=${ALLUXIO_CLIENT_CLASSPATH}
runJavaClass "$@"
;;
"loadufs")
echo "The \"alluxio loadufs <AlluxioURI> <UfsURI>\" command is deprecated since version 1.1." >&2
echo "Use the \"alluxio fs mount <AlluxioURI> <UfsURI>\" command followed by the \"alluxio fs ls -R <AlluxioURI>\" command instead." >&2
Expand Down
70 changes: 69 additions & 1 deletion bin/alluxio-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ BIN=$(cd "$( dirname "$( readlink "$0" || echo "$0" )" )"; pwd)
USAGE="Usage: alluxio-start.sh [-hNwm] [-i backup] ACTION [MOPT] [-f]
Where ACTION is one of:
all [MOPT] \tStart all masters, proxies, and workers.
job_master \tStart the job master on this node.
job_masters \tStart job masters on master nodes.
job_worker \tStart a job worker on this node.
job_workers \tStart job workers on worker nodes.
local [MOPT] \tStart all processes locally.
master \tStart the local master on this node.
secondary_master \tStart the local secondary master on this node.
Expand Down Expand Up @@ -49,7 +53,11 @@ MOPT (Mount Option) is one of:
hdfs://mycluster/alluxio_backups/alluxio-journal-YYYY-MM-DD-timestamp.gz.
-m launch monitor process to ensure the target processes come up.
-N do not try to kill previous running processes before starting new ones.
-w wait for processes to end before returning."
-w wait for processes to end before returning.
Supported environment variables:
ALLUXIO_JOB_WORKER_COUNT - identifies how many job workers to start per node (default = 1)"

ensure_dirs() {
if [[ ! -d "${ALLUXIO_LOGS_DIR}" ]]; then
Expand Down Expand Up @@ -160,6 +168,50 @@ stop() {
${BIN}/alluxio-stop.sh $1
}

start_job_master() {
if [[ "$1" == "-f" ]]; then
${LAUNCHER} "${BIN}/alluxio" format
fi

if [[ ${ALLUXIO_MASTER_SECONDARY} != "true" ]]; then
if [[ -z ${ALLUXIO_JOB_MASTER_JAVA_OPTS} ]] ; then
ALLUXIO_JOB_MASTER_JAVA_OPTS=${ALLUXIO_JAVA_OPTS}
fi

echo "Starting job master @ $(hostname -f). Logging to ${ALLUXIO_LOGS_DIR}"
(nohup ${JAVA} -cp ${CLASSPATH} \
${ALLUXIO_JOB_MASTER_JAVA_OPTS} \
alluxio.master.AlluxioJobMaster > ${ALLUXIO_LOGS_DIR}/job_master.out 2>&1) &
fi
}

start_job_masters() {
${LAUNCHER} "${BIN}/alluxio-masters.sh" "${BIN}/alluxio-start.sh" "job_master"
}

start_job_worker() {
if [[ -z ${ALLUXIO_JOB_WORKER_JAVA_OPTS} ]] ; then
ALLUXIO_JOB_WORKER_JAVA_OPTS=${ALLUXIO_JAVA_OPTS}
fi

echo "Starting job worker @ $(hostname -f). Logging to ${ALLUXIO_LOGS_DIR}"
(nohup ${JAVA} -cp ${CLASSPATH} \
${ALLUXIO_JOB_WORKER_JAVA_OPTS} \
alluxio.worker.AlluxioJobWorker > ${ALLUXIO_LOGS_DIR}/job_worker.out 2>&1) &
ALLUXIO_JOB_WORKER_JAVA_OPTS+=" -Dalluxio.job.worker.rpc.port=0 -Dalluxio.job.worker.web.port=0"
local nworkers=${ALLUXIO_JOB_WORKER_COUNT:-1}
for (( c = 1; c < ${nworkers}; c++ )); do
echo "Starting job worker #$((c+1)) @ $(hostname -f). Logging to ${ALLUXIO_LOGS_DIR}"
(nohup ${JAVA} -cp ${CLASSPATH} \
${ALLUXIO_JOB_WORKER_JAVA_OPTS} \
alluxio.worker.AlluxioJobWorker > ${ALLUXIO_LOGS_DIR}/job_worker.out 2>&1) &
done
}

start_job_workers() {
${LAUNCHER} "${BIN}/alluxio-workers.sh" "${BIN}/alluxio-start.sh" "job_worker"
}

start_logserver() {
if [[ ! -d "${ALLUXIO_LOGSERVER_LOGS_DIR}" ]]; then
echo "ALLUXIO_LOGSERVER_LOGS_DIR: ${ALLUXIO_LOGSERVER_LOGS_DIR}"
Expand Down Expand Up @@ -455,19 +507,35 @@ main() {
case "${ACTION}" in
all)
start_masters "${FORMAT}"
start_job_masters
sleep 2
start_workers "${MOPT}"
start_job_workers
start_proxies
;;
local)
start_master "${FORMAT}"
ALLUXIO_MASTER_SECONDARY=true
start_master
ALLUXIO_MASTER_SECONDARY=false
start_job_master
sleep 2
start_worker "${MOPT}"
start_job_worker
start_proxy
;;
job_master)
start_job_master
;;
job_masters)
start_job_masters
;;
job_worker)
start_job_worker
;;
job_workers)
start_job_workers
;;
master)
start_master "${FORMAT}"
;;
Expand Down
36 changes: 36 additions & 0 deletions bin/alluxio-stop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ BIN=$(cd "$( dirname "$( readlink "$0" || echo "$0" )" )"; pwd)
USAGE="Usage: alluxio-stop.sh [-h] [component]
Where component is one of:
all \tStop all masters, proxies, and workers.
job_master \tStop local job master.
job_masters \tStop job masters on master nodes.
job_worker \tStop local job worker.
job_workers \tStop job workers on worker nodes.
local \tStop all processes locally.
master \tStop local primary master.
secondary_master \tStop local secondary master.
Expand All @@ -31,6 +35,22 @@ Where component is one of:
-h display this help."

stop_job_master() {
${LAUNCHER} "${BIN}/alluxio" "killAll" "alluxio.master.AlluxioJobMaster"
}

stop_job_masters() {
${LAUNCHER} "${BIN}/alluxio-masters.sh" "${BIN}/alluxio-stop.sh" "job_master"
}

stop_job_worker() {
${LAUNCHER} "${BIN}/alluxio" "killAll" "alluxio.worker.AlluxioJobWorker"
}

stop_job_workers() {
${LAUNCHER} "${BIN}/alluxio-workers.sh" "${BIN}/alluxio-stop.sh" "job_worker"
}

stop_master() {
if [[ ${ALLUXIO_MASTER_SECONDARY} == "true" ]]; then
${LAUNCHER} "${BIN}/alluxio" "killAll" "alluxio.master.AlluxioSecondaryMaster"
Expand Down Expand Up @@ -70,17 +90,33 @@ WHAT=${1:--h}
case "${WHAT}" in
all)
stop_proxies
stop_job_workers
stop_workers
stop_job_masters
stop_masters
;;
local)
stop_proxy
stop_job_worker
stop_job_master
stop_worker
ALLUXIO_MASTER_SECONDARY=true
stop_master
ALLUXIO_MASTER_SECONDARY=false
stop_master
;;
job_master)
stop_job_master
;;
job_masters)
stop_job_masters
;;
job_worker)
stop_job_worker
;;
job_workers)
stop_job_workers
;;
master)
stop_master
;;
Expand Down
33 changes: 33 additions & 0 deletions conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ log4j.appender.Console.Target=System.out
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n


# Appender for Job Master
log4j.appender.JOB_MASTER_LOGGER=org.apache.log4j.RollingFileAppender
log4j.appender.JOB_MASTER_LOGGER.File=${alluxio.logs.dir}/job_master.log
log4j.appender.JOB_MASTER_LOGGER.MaxFileSize=10MB
log4j.appender.JOB_MASTER_LOGGER.MaxBackupIndex=100
log4j.appender.JOB_MASTER_LOGGER.layout=org.apache.log4j.PatternLayout
log4j.appender.JOB_MASTER_LOGGER.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M) - %m%n

# Appender for Job Workers
log4j.appender.JOB_WORKER_LOGGER=org.apache.log4j.RollingFileAppender
log4j.appender.JOB_WORKER_LOGGER.File=${alluxio.logs.dir}/job_worker.log
log4j.appender.JOB_WORKER_LOGGER.MaxFileSize=10MB
log4j.appender.JOB_WORKER_LOGGER.MaxBackupIndex=100
log4j.appender.JOB_WORKER_LOGGER.layout=org.apache.log4j.PatternLayout
log4j.appender.JOB_WORKER_LOGGER.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M) - %m%n

# Appender for Master
log4j.appender.MASTER_LOGGER=org.apache.log4j.RollingFileAppender
log4j.appender.MASTER_LOGGER.File=${alluxio.logs.dir}/master.log
Expand Down Expand Up @@ -69,6 +86,22 @@ log4j.appender.WORKER_LOGGER.MaxBackupIndex=100
log4j.appender.WORKER_LOGGER.layout=org.apache.log4j.PatternLayout
log4j.appender.WORKER_LOGGER.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n

# Remote appender for Job Master
log4j.appender.REMOTE_JOB_MASTER_LOGGER=org.apache.log4j.net.SocketAppender
log4j.appender.REMOTE_JOB_MASTER_LOGGER.Port=${alluxio.logserver.port}
log4j.appender.REMOTE_JOB_MASTER_LOGGER.RemoteHost=${alluxio.logserver.hostname}
log4j.appender.REMOTE_JOB_MASTER_LOGGER.ReconnectionDelay=10000
log4j.appender.REMOTE_JOB_MASTER_LOGGER.filter.ID=alluxio.AlluxioRemoteLogFilter
log4j.appender.REMOTE_JOB_MASTER_LOGGER.filter.ID.ProcessType=JOB_MASTER

# Remote appender for Job Workers
log4j.appender.REMOTE_JOB_WORKER_LOGGER=org.apache.log4j.net.SocketAppender
log4j.appender.REMOTE_JOB_WORKER_LOGGER.Port=${alluxio.logserver.port}
log4j.appender.REMOTE_JOB_WORKER_LOGGER.RemoteHost=${alluxio.logserver.hostname}
log4j.appender.REMOTE_JOB_WORKER_LOGGER.ReconnectionDelay=10000
log4j.appender.REMOTE_JOB_WORKER_LOGGER.filter.ID=alluxio.AlluxioRemoteLogFilter
log4j.appender.REMOTE_JOB_WORKER_LOGGER.filter.ID.ProcessType=JOB_WORKER

# Remote appender for Master
log4j.appender.REMOTE_MASTER_LOGGER=org.apache.log4j.net.SocketAppender
log4j.appender.REMOTE_MASTER_LOGGER.Port=${alluxio.logserver.port}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,52 @@ public BlockOutStream getOutStream(long blockId, long blockSize, OutStreamOption
WorkerNetAddress address;
FileWriteLocationPolicy locationPolicy = Preconditions.checkNotNull(options.getLocationPolicy(),
PreconditionMessage.FILE_WRITE_LOCATION_POLICY_UNSPECIFIED);
address = locationPolicy.getWorkerForNextBlock(getEligibleWorkers(), blockSize);
if (address == null) {
throw new UnavailableException(
ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(blockSize));
java.util.Set<BlockWorkerInfo> blockWorkers;
blockWorkers = com.google.common.collect.Sets.newHashSet(getEligibleWorkers());
// The number of initial copies depends on the write type: if ASYNC_THROUGH, it is the property
// "alluxio.user.file.replication.durable" before data has been persisted; otherwise
// "alluxio.user.file.replication.min"
int initialReplicas = (options.getWriteType() == alluxio.client.WriteType.ASYNC_THROUGH
&& options.getReplicationDurable() > options.getReplicationMin())
? options.getReplicationDurable() : options.getReplicationMin();
if (initialReplicas <= 1) {
address = locationPolicy.getWorkerForNextBlock(blockWorkers, blockSize);
if (address == null) {
throw new UnavailableException(
ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(blockSize));
}
return getOutStream(blockId, blockSize, address, options);
}

// Group different block workers by their hostnames
java.util.Map<String, java.util.Set<BlockWorkerInfo>> blockWorkersByHost =
new java.util.HashMap<>();
for (BlockWorkerInfo blockWorker : blockWorkers) {
String hostName = blockWorker.getNetAddress().getHost();
if (blockWorkersByHost.containsKey(hostName)) {
blockWorkersByHost.get(hostName).add(blockWorker);
} else {
blockWorkersByHost.put(hostName, com.google.common.collect.Sets.newHashSet(blockWorker));
}
}

// Select N workers on different hosts where N is the value of initialReplicas for this block
List<WorkerNetAddress> workerAddressList = new java.util.ArrayList<>();
for (int i = 0; i < initialReplicas; i++) {
address = locationPolicy.getWorkerForNextBlock(blockWorkers, blockSize);
if (address == null) {
break;
}
workerAddressList.add(address);
blockWorkers.removeAll(blockWorkersByHost.get(address.getHost()));
}
if (workerAddressList.size() < initialReplicas) {
throw new alluxio.exception.status.ResourceExhaustedException(String.format(
"Not enough workers for replications, %d workers selected but %d required",
workerAddressList.size(), initialReplicas));
}
return getOutStream(blockId, blockSize, address, options);
return BlockOutStream
.createReplicatedBlockOutStream(mContext, blockId, blockSize, workerAddressList, options);
}

/**
Expand Down
Loading

0 comments on commit 89f6eba

Please sign in to comment.