diff --git a/hpc/LoadBalancer.cpp b/hpc/LoadBalancer.cpp index 391380c..c75ee4f 100644 --- a/hpc/LoadBalancer.cpp +++ b/hpc/LoadBalancer.cpp @@ -1,17 +1,14 @@ #include "LoadBalancer.hpp" -#include -#include +#include "../lib/umbridge.h" + +#include #include -#include #include -#include - -#include -#include +#include +#include -#include "../lib/umbridge.h" -void clear_url(std::string directory) { +void clear_url(const std::string& directory) { for (auto& file : std::filesystem::directory_iterator(directory)) { if (std::regex_match(file.path().filename().string(), std::regex("url-\\d+\\.txt"))) { std::filesystem::remove(file); @@ -30,32 +27,80 @@ void launch_hq_with_alloc_queue() { std::system("hq_scripts/allocation_queue.sh"); } +std::string get_arg(const std::vector& args, const std::string& arg_name) { + // Check if a string matches the format --=... + const std::string search_string = "--" + arg_name + "="; + auto check_format = [&search_string](const std::string& s) { + return (s.length() > search_string.length()) && (s.rfind(search_string, 0) == 0); + }; -int main(int argc, char *argv[]) -{ + // Return value of the argument or empty string if not found + if (const auto it = std::find_if(args.begin(), args.end(), check_format); it != args.end()) { + return it->substr(search_string.length()); + } + + return ""; +} + + +int main(int argc, char* argv[]) { clear_url("urls"); - launch_hq_with_alloc_queue(); + // Process command line args + std::vector args(argv + 1, argv + argc); - // Read environment variables for configuration - char const *port_cstr = std::getenv("PORT"); - int port = 0; - if (port_cstr == NULL) - { - std::cout << "Environment variable PORT not set! Using port 4242 as default." << std::endl; - port = 4242; + // Scheduler used by the load balancer (currently either SLURM or HyperQueue) + std::string scheduler = get_arg(args, "scheduler"); + // Specifying a scheduler is mandatory since this should be a conscious choice by the user + if (scheduler.empty()) { + std::cerr << "Missing required argument: --scheduler=[hyperqueue | slurm]" << std::endl; + std::exit(-1); } - else - { - port = atoi(port_cstr); + + // Delay for job submissions in milliseconds + std::string delay_str = get_arg(args, "delay-ms"); + std::chrono::milliseconds delay = std::chrono::milliseconds::zero(); + if (!delay_str.empty()) { + delay = std::chrono::milliseconds(std::stoi(delay_str)); + } + + // Load balancer port + std::string port_str = get_arg(args, "port"); + int port = 4242; + if (port_str.empty()) { + std::cout << "Argument --port not set! Using port 4242 as default." << std::endl; + } else { + port = std::stoi(port_str); + } + + + // Assemble job manager + std::unique_ptr job_submitter; + std::string script_dir; + if (scheduler == "hyperqueue") { + launch_hq_with_alloc_queue(); + job_submitter = std::make_unique(delay); + script_dir = "hq_scripts"; + } else if (scheduler == "slurm") { + job_submitter = std::make_unique(delay); + script_dir = "slurm_scripts"; + } else { + std::cerr << "Unrecognized value for argument --scheduler: " + << "Expected hyperqueue or slurm but got " << scheduler << " instead." << std::endl; + std::exit(-1); } - JobScriptLocator locator {"hq_scripts", "job.sh", "job_", ".sh"}; + // Only filesystem communication is implemented. May implement network-based communication in the future. + // Directory which stores URL files and polling cycle currently hard-coded. + std::unique_ptr comm_factory + = std::make_unique("urls", std::chrono::milliseconds(500)); + + // Location of job scripts and naming currently hard-corded. + JobScriptLocator locator {script_dir, "job.sh", "job_", ".sh"}; + std::shared_ptr job_manager = std::make_shared( - std::make_unique(std::chrono::milliseconds(100)), - std::make_unique("urls", std::chrono::milliseconds(100)), - locator - ); + std::move(job_submitter), std::move(comm_factory), locator); + // Initialize load balancer for each available model on the model server. std::vector model_names = job_manager->getModelNames(); @@ -63,10 +108,10 @@ int main(int argc, char *argv[]) // Inform the user about the available models and the job scripts that will be used. locator.printModelJobScripts(model_names); + // Prepare models and serve via network std::vector LB_vector; for (auto model_name : model_names) { - // Set up and serve model LB_vector.emplace_back(model_name, job_manager); } @@ -75,6 +120,5 @@ int main(int argc, char *argv[]) std::transform(LB_vector.begin(), LB_vector.end(), LB_ptr_vector.begin(), [](LoadBalancer& obj) { return &obj; }); - std::cout << "Load balancer running port" << port << std::endl; umbridge::serveModels(LB_ptr_vector, "0.0.0.0", port, true, false); } diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 0169280..861d221 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -117,7 +117,6 @@ class HyperQueueJob : public Job // Makes HQ output "\n" command.addOption("--output-mode=quiet"); - std::cout << "Running command: " << command.toString() << std::endl; id = get_command_output(command.toString()); remove_trailing_newline(id); @@ -196,6 +195,7 @@ class HyperQueueSubmitter : public JobSubmitter return job; } private: + // HyperQueue environment variables: --env=KEY1=VAL1 --env=KEY2=VAL2 ... std::vector env_to_options(const std::map& env) const { std::vector options; @@ -216,7 +216,39 @@ class HyperQueueSubmitter : public JobSubmitter class SlurmSubmitter : public JobSubmitter { +public: + SlurmSubmitter(std::chrono::milliseconds submission_delay) + : submission_delay(submission_delay) {} + + std::unique_ptr submit(const std::string& job_script, const std::map& env) override + { + // Add optional delay to job submissions to prevent issues in some cases. + if (submission_delay > std::chrono::milliseconds::zero()) { + std::lock_guard lock(submission_mutex); + std::this_thread::sleep_for(submission_delay); + } + // Submit job + std::vector options = env_to_options(env); + std::unique_ptr job = std::make_unique(options, job_script); + return job; + } +private: + // SLURM environment variables: --export=KEY1=VAL1,KEY2=VAL2,... + std::vector env_to_options(const std::map& env) const + { + // By default include all SLURM_* and SPANK option environment variables. + std::string env_option = "--export=ALL"; + + for (const auto& [key, val] : env) { + env_option += "," + key + "=" + val; + } + + return {env_option}; + } + + std::chrono::milliseconds submission_delay = std::chrono::milliseconds::zero(); + std::mutex submission_mutex; }; class JobCommunicator @@ -341,7 +373,7 @@ struct JobScriptLocator std::cout << "Available models and corresponding job-scripts:\n"; for (const std::string& model_name : model_names) { std::filesystem::path used_job_script = selectJobScript(model_name); - std::cout << "* Model '" << model_name << "' --> '" << used_job_script << std::endl; + std::cout << "* Model '" << model_name << "' --> '" << used_job_script.string() << "'" << std::endl; } std::cout << std::endl; diff --git a/hpc/hq_scripts/job.sh b/hpc/hq_scripts/job.sh index 03cb381..fcbb91a 100755 --- a/hpc/hq_scripts/job.sh +++ b/hpc/hq_scripts/job.sh @@ -9,7 +9,7 @@ # Launch model server, send back server URL # and wait to ensure that HQ won't schedule any more jobs to this allocation. -function get_avaliable_port { +function get_available_port { # Define the range of ports to select from MIN_PORT=1024 MAX_PORT=65535 @@ -26,7 +26,7 @@ function get_avaliable_port { echo $port } -port=$(get_avaliable_port) +port=$(get_available_port) export PORT=$port # Assume that server sets the port according to the environment variable 'PORT'. @@ -43,8 +43,7 @@ done echo "Model server responded" # Write server URL to file identified by HQ job ID. -load_balancer_dir="." -mkdir -p "$load_balancer_dir/urls" -echo "http://$host:$port" > "$load_balancer_dir/urls/url-$HQ_JOB_ID.txt" +mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR +echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt" sleep infinity # keep the job occupied diff --git a/hpc/slurm_scripts/job.sh b/hpc/slurm_scripts/job.sh new file mode 100755 index 0000000..449d5c4 --- /dev/null +++ b/hpc/slurm_scripts/job.sh @@ -0,0 +1,48 @@ +#! /bin/bash + +#SBATCH --partition=devel +#SBATCH --ntasks=1 +#SBATCH --time=00:05:00 + + +# Launch model server, send back server URL and wait so that SLURM does not cancel the allocation. + +function get_available_port { + # Define the range of ports to select from + MIN_PORT=1024 + MAX_PORT=65535 + + # Generate a random port number + port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1) + + # Check if the port is in use + while lsof -Pi :$port -sTCP:LISTEN -t >/dev/null; do + # If the port is in use, generate a new random port number + port=$(shuf -i $MIN_PORT-$MAX_PORT -n 1) + done + + echo $port +} + +port=$(get_available_port) +export PORT=$port + +# Assume that server sets the port according to the environment variable 'PORT'. +# Otherwise the job script will be stuck waiting for model server's response. +./testmodel & # CHANGE ME! + + +host=$(hostname -I | awk '{print $1}') + +echo "Waiting for model server to respond at $host:$port..." +while ! curl -s "http://$host:$port/Info" > /dev/null; do + sleep 1 +done +echo "Model server responded" + +# Write server URL to file identified by HQ job ID. +load_balancer_dir="." +mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR +echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$SLURM_JOB_ID.txt" + +sleep infinity # keep the job occupied