Skip to content

Commit

Permalink
WIP: Add SLURM scheduler and CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
Schlevidon committed Oct 10, 2024
1 parent ed198a5 commit 16ff5f9
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 36 deletions.
102 changes: 73 additions & 29 deletions hpc/LoadBalancer.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
#include "LoadBalancer.hpp"
#include <iostream>
#include <string>
#include "../lib/umbridge.h"

#include <algorithm>
#include <chrono>
#include <thread>
#include <filesystem>
#include <algorithm>

#include <unistd.h>
#include <limits.h>
#include <iostream>
#include <string>

#include "../lib/umbridge.h"

void clear_url(std::string directory) {
void clear_url(const std::string& directory) {
for (auto& file : std::filesystem::directory_iterator(directory)) {
if (std::regex_match(file.path().filename().string(), std::regex("url-\\d+\\.txt"))) {
std::filesystem::remove(file);
Expand All @@ -30,43 +27,91 @@ void launch_hq_with_alloc_queue() {
std::system("hq_scripts/allocation_queue.sh");
}

std::string get_arg(const std::vector<std::string>& args, const std::string& arg_name) {
// Check if a string matches the format --<arg_name>=...
const std::string search_string = "--" + arg_name + "=";
auto check_format = [&search_string](const std::string& s) {
return (s.length() > search_string.length()) && (s.rfind(search_string, 0) == 0);
};

int main(int argc, char *argv[])
{
// Return value of the argument or empty string if not found
if (const auto it = std::find_if(args.begin(), args.end(), check_format); it != args.end()) {
return it->substr(search_string.length());
}

return "";
}


int main(int argc, char* argv[]) {
clear_url("urls");

launch_hq_with_alloc_queue();
// Process command line args
std::vector<std::string> args(argv + 1, argv + argc);

// Read environment variables for configuration
char const *port_cstr = std::getenv("PORT");
int port = 0;
if (port_cstr == NULL)
{
std::cout << "Environment variable PORT not set! Using port 4242 as default." << std::endl;
port = 4242;
// Scheduler used by the load balancer (currently either SLURM or HyperQueue)
std::string scheduler = get_arg(args, "scheduler");
// Specifying a scheduler is mandatory since this should be a conscious choice by the user
if (scheduler.empty()) {
std::cerr << "Missing required argument: --scheduler=[hyperqueue | slurm]" << std::endl;
std::exit(-1);
}
else
{
port = atoi(port_cstr);

// Delay for job submissions in milliseconds
std::string delay_str = get_arg(args, "delay-ms");
std::chrono::milliseconds delay = std::chrono::milliseconds::zero();
if (!delay_str.empty()) {
delay = std::chrono::milliseconds(std::stoi(delay_str));
}

// Load balancer port
std::string port_str = get_arg(args, "port");
int port = 4242;
if (port_str.empty()) {
std::cout << "Argument --port not set! Using port 4242 as default." << std::endl;
} else {
port = std::stoi(port_str);
}


// Assemble job manager
std::unique_ptr<JobSubmitter> job_submitter;
std::string script_dir;
if (scheduler == "hyperqueue") {
launch_hq_with_alloc_queue();
job_submitter = std::make_unique<HyperQueueSubmitter>(delay);
script_dir = "hq_scripts";
} else if (scheduler == "slurm") {
job_submitter = std::make_unique<SlurmSubmitter>(delay);
script_dir = "slurm_scripts";
} else {
std::cerr << "Unrecognized value for argument --scheduler: "
<< "Expected hyperqueue or slurm but got " << scheduler << " instead." << std::endl;
std::exit(-1);
}

JobScriptLocator locator {"hq_scripts", "job.sh", "job_", ".sh"};
// Only filesystem communication is implemented. May implement network-based communication in the future.
// Directory which stores URL files and polling cycle currently hard-coded.
std::unique_ptr<JobCommunicatorFactory> comm_factory
= std::make_unique<FilesystemCommunicatorFactory>("urls", std::chrono::milliseconds(500));

// Location of job scripts and naming currently hard-corded.
JobScriptLocator locator {script_dir, "job.sh", "job_", ".sh"};

std::shared_ptr<JobManager> job_manager = std::make_shared<CommandJobManager>(
std::make_unique<HyperQueueSubmitter>(std::chrono::milliseconds(100)),
std::make_unique<FilesystemCommunicatorFactory>("urls", std::chrono::milliseconds(100)),
locator
);
std::move(job_submitter), std::move(comm_factory), locator);


// Initialize load balancer for each available model on the model server.
std::vector<std::string> model_names = job_manager->getModelNames();

// Inform the user about the available models and the job scripts that will be used.
locator.printModelJobScripts(model_names);

// Prepare models and serve via network
std::vector<LoadBalancer> LB_vector;
for (auto model_name : model_names)
{
// Set up and serve model
LB_vector.emplace_back(model_name, job_manager);
}

Expand All @@ -75,6 +120,5 @@ int main(int argc, char *argv[])
std::transform(LB_vector.begin(), LB_vector.end(), LB_ptr_vector.begin(),
[](LoadBalancer& obj) { return &obj; });

std::cout << "Load balancer running port" << port << std::endl;
umbridge::serveModels(LB_ptr_vector, "0.0.0.0", port, true, false);
}
36 changes: 34 additions & 2 deletions hpc/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class HyperQueueJob : public Job

// Makes HQ output "<job id>\n"
command.addOption("--output-mode=quiet");
std::cout << "Running command: " << command.toString() << std::endl;
id = get_command_output(command.toString());

remove_trailing_newline(id);
Expand Down Expand Up @@ -196,6 +195,7 @@ class HyperQueueSubmitter : public JobSubmitter
return job;
}
private:
// HyperQueue environment variables: --env=KEY1=VAL1 --env=KEY2=VAL2 ...
std::vector<std::string> env_to_options(const std::map<std::string, std::string>& env) const
{
std::vector<std::string> options;
Expand All @@ -216,7 +216,39 @@ class HyperQueueSubmitter : public JobSubmitter

class SlurmSubmitter : public JobSubmitter
{
public:
SlurmSubmitter(std::chrono::milliseconds submission_delay)
: submission_delay(submission_delay) {}

std::unique_ptr<Job> submit(const std::string& job_script, const std::map<std::string, std::string>& env) override
{
// Add optional delay to job submissions to prevent issues in some cases.
if (submission_delay > std::chrono::milliseconds::zero()) {
std::lock_guard lock(submission_mutex);
std::this_thread::sleep_for(submission_delay);
}

// Submit job
std::vector<std::string> options = env_to_options(env);
std::unique_ptr<Job> job = std::make_unique<SlurmJob>(options, job_script);
return job;
}
private:
// SLURM environment variables: --export=KEY1=VAL1,KEY2=VAL2,...
std::vector<std::string> env_to_options(const std::map<std::string, std::string>& env) const
{
// By default include all SLURM_* and SPANK option environment variables.
std::string env_option = "--export=ALL";

for (const auto& [key, val] : env) {
env_option += "," + key + "=" + val;
}

return {env_option};
}

std::chrono::milliseconds submission_delay = std::chrono::milliseconds::zero();
std::mutex submission_mutex;
};

class JobCommunicator
Expand Down Expand Up @@ -341,7 +373,7 @@ struct JobScriptLocator
std::cout << "Available models and corresponding job-scripts:\n";
for (const std::string& model_name : model_names) {
std::filesystem::path used_job_script = selectJobScript(model_name);
std::cout << "* Model '" << model_name << "' --> '" << used_job_script << std::endl;
std::cout << "* Model '" << model_name << "' --> '" << used_job_script.string() << "'" << std::endl;
}
std::cout << std::endl;

Expand Down
9 changes: 4 additions & 5 deletions hpc/hq_scripts/job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Launch model server, send back server URL
# and wait to ensure that HQ won't schedule any more jobs to this allocation.

function get_avaliable_port {
function get_available_port {
# Define the range of ports to select from
MIN_PORT=1024
MAX_PORT=65535
Expand All @@ -26,7 +26,7 @@ function get_avaliable_port {
echo $port
}

port=$(get_avaliable_port)
port=$(get_available_port)
export PORT=$port

# Assume that server sets the port according to the environment variable 'PORT'.
Expand All @@ -43,8 +43,7 @@ done
echo "Model server responded"

# Write server URL to file identified by HQ job ID.
load_balancer_dir="."
mkdir -p "$load_balancer_dir/urls"
echo "http://$host:$port" > "$load_balancer_dir/urls/url-$HQ_JOB_ID.txt"
mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR
echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt"

sleep infinity # keep the job occupied
48 changes: 48 additions & 0 deletions hpc/slurm_scripts/job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#! /bin/bash

#SBATCH --partition=devel
#SBATCH --ntasks=1
#SBATCH --time=00:05:00


# Launch model server, send back server URL and wait so that SLURM does not cancel the allocation.

function get_available_port {
# Define the range of ports to select from
MIN_PORT=1024
MAX_PORT=65535

# Generate a random port number
port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1)

# Check if the port is in use
while lsof -Pi :$port -sTCP:LISTEN -t >/dev/null; do
# If the port is in use, generate a new random port number
port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1)
done

echo $port
}

port=$(get_available_port)
export PORT=$port

# Assume that server sets the port according to the environment variable 'PORT'.
# Otherwise the job script will be stuck waiting for model server's response.
./testmodel & # CHANGE ME!


host=$(hostname -I | awk '{print $1}')

echo "Waiting for model server to respond at $host:$port..."
while ! curl -s "http://$host:$port/Info" > /dev/null; do
sleep 1
done
echo "Model server responded"

# Write server URL to file identified by HQ job ID.
load_balancer_dir="."
mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR
echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$SLURM_JOB_ID.txt"

sleep infinity # keep the job occupied

0 comments on commit 16ff5f9

Please sign in to comment.