Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPC: Multiple Backends #85

Merged
merged 15 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 73 additions & 118 deletions hpc/LoadBalancer.cpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
#include "LoadBalancer.hpp"
#include <iostream>
#include <string>
#include "../lib/umbridge.h"

#include <algorithm>
#include <chrono>
#include <thread>
#include <filesystem>
#include <algorithm>

#include <unistd.h>
#include <limits.h>

#include "../lib/umbridge.h"
#include <iostream>
#include <string>

void create_directory_if_not_existing(std::string directory) {
if (!std::filesystem::is_directory(directory) || !std::filesystem::exists(directory)) {
std::filesystem::create_directory(directory);
}
}

void clear_url(std::string directory) {
void clear_url(const std::string& directory) {
for (auto& file : std::filesystem::directory_iterator(directory)) {
if (std::regex_match(file.path().filename().string(), std::regex("url-\\d+\\.txt"))) {
std::filesystem::remove(file);
Expand All @@ -29,140 +20,104 @@ void launch_hq_with_alloc_queue() {
std::system("./hq server stop &> /dev/null");

std::system("./hq server start &");
sleep(1); // Workaround: give the HQ server enough time to start.
// Wait for the HQ server to start
std::system("until ./hq server info &> /dev/null; do sleep 1; done");

// Create HQ allocation queue
std::system("hq_scripts/allocation_queue.sh");
}

const std::vector<std::string> get_model_names() {
// Don't start a client, always use the default job submission script.
HyperQueueJob hq_job("", false, true);
std::string get_arg(const std::vector<std::string>& args, const std::string& arg_name) {
// Check if a string matches the format --<arg_name>=...
const std::string search_string = "--" + arg_name + "=";
auto check_format = [&search_string](const std::string& s) {
return (s.length() > search_string.length()) && (s.rfind(search_string, 0) == 0);
};

return umbridge::SupportedModels(hq_job.server_url);
// Return value of the argument or empty string if not found
if (const auto it = std::find_if(args.begin(), args.end(), check_format); it != args.end()) {
return it->substr(search_string.length());
}

return "";
}

void print_model_and_job_script_information(const std::vector<std::string>& model_names) {
// Constants
const std::filesystem::path SUBMISSION_SCRIPT_DIR("./hq_scripts");
const std::filesystem::path SUBMISSION_SCRIPT_GENERIC("job.sh");

const std::string SECTION_START_DELIMITER = "==============================MODEL INFO==============================";
const std::string SECTION_END_DELIMITER = "======================================================================";

// Sort the model names in alphabetical order for cleaner output.
std::vector<std::string> model_names_sorted = model_names;
std::sort(model_names_sorted.begin(), model_names_sorted.end());

std::cout << SECTION_START_DELIMITER << "\n";
// Print list of available models and corresponding job-scripts.
std::cout << "Available models and corresponding job-scripts:\n";
for (const std::string& model_name : model_names_sorted) {
// Determine which job script will be used by checking if a model specific job script exists.
std::string used_job_script;
const std::filesystem::path submission_script_model_specific("job_" + model_name + ".sh");
if (std::filesystem::exists(SUBMISSION_SCRIPT_DIR / submission_script_model_specific)) {
used_job_script = submission_script_model_specific.string();
} else {
used_job_script = SUBMISSION_SCRIPT_GENERIC.string();
}
std::cout << "* Model '" << model_name << "' --> '" << used_job_script << "'\n";
}
std::cout << std::endl;


// Check if there are job scripts that are unused and print a warning.
std::vector<std::string> unused_job_scripts;

// Build a regex to parse job-script filenames and extract the model name.
// Format should be: job_<model_name>.sh
const std::string format_prefix = "^job_"; // Ensures that filename starts with 'job_'.
const std::string format_suffix = "\\.sh$"; // Ensures that filename ends with '.sh'.
const std::string format_model_name = "(.*)"; // Arbitrary sequence of characters as a marked subexpression.
const std::regex format_regex(format_prefix + format_model_name + format_suffix);

for (auto& file : std::filesystem::directory_iterator(SUBMISSION_SCRIPT_DIR)) {
const std::string filename = file.path().filename().string();
// Check if filename matches format of a model specific job script, i.e. 'job_<model_name>.sh'.
std::smatch match_result;
if (std::regex_search(filename, match_result, format_regex)) {
// Extract first matched subexpression, i.e. the model name.
const std::string model_name = match_result[1].str();
// Check if a corresponding model exists. If not, mark job script as unused.
if (!std::binary_search(model_names_sorted.begin(), model_names_sorted.end(), model_name)) {
unused_job_scripts.push_back(filename);
}
}
}
int main(int argc, char* argv[]) {
clear_url("urls");

// Print the warning message.
if(!unused_job_scripts.empty()) {
// Sort unused job scripts alphabetically for cleaner output.
std::sort(unused_job_scripts.begin(), unused_job_scripts.end());
// Process command line args
std::vector<std::string> args(argv + 1, argv + argc);

std::cout << "WARNING: The following model-specific job-scripts are not used by any of the available models:\n";
for (const std::string& job_script : unused_job_scripts) {
std::cout << "* '" << job_script << "'\n";
}
std::cout << std::endl;
// Scheduler used by the load balancer (currently either SLURM or HyperQueue)
std::string scheduler = get_arg(args, "scheduler");
// Specifying a scheduler is mandatory since this should be a conscious choice by the user
if (scheduler.empty()) {
std::cerr << "Missing required argument: --scheduler=[hyperqueue | slurm]" << std::endl;
std::exit(-1);
}

std::cout << "If this behavior is unintentional, then please verify that:\n"
<< "1. The filename of your model-specific job-script follows the format: 'job_<your_model_name>.sh' (e.g. 'job_mymodel.sh')\n"
<< "2. The spelling of your model name matches in the model definition and in the filename of your model-specific job-script.\n";
// Delay for job submissions in milliseconds
std::string delay_str = get_arg(args, "delay-ms");
std::chrono::milliseconds delay = std::chrono::milliseconds::zero();
if (!delay_str.empty()) {
delay = std::chrono::milliseconds(std::stoi(delay_str));
}

std::cout << SECTION_END_DELIMITER << std::endl;
}
// Load balancer port
std::string port_str = get_arg(args, "port");
int port = 4242;
if (port_str.empty()) {
std::cout << "Argument --port not set! Using port 4242 as default." << std::endl;
} else {
port = std::stoi(port_str);
}

std::atomic<int32_t> HyperQueueJob::job_count = 0;

// Assemble job manager
std::unique_ptr<JobSubmitter> job_submitter;
std::filesystem::path script_dir;
if (scheduler == "hyperqueue") {
launch_hq_with_alloc_queue();
job_submitter = std::make_unique<HyperQueueSubmitter>(delay);
script_dir = "hq_scripts";
} else if (scheduler == "slurm") {
job_submitter = std::make_unique<SlurmSubmitter>(delay);
script_dir = "slurm_scripts";
} else {
std::cerr << "Unrecognized value for argument --scheduler: "
<< "Expected hyperqueue or slurm but got " << scheduler << " instead." << std::endl;
std::exit(-1);
}

int main(int argc, char *argv[])
{
create_directory_if_not_existing("urls");
create_directory_if_not_existing("sub-jobs");
clear_url("urls");
// Only filesystem communication is implemented. May implement network-based communication in the future.
// Directory which stores URL files and polling cycle currently hard-coded.
std::unique_ptr<JobCommunicatorFactory> comm_factory
= std::make_unique<FilesystemCommunicatorFactory>("urls", std::chrono::milliseconds(500));

launch_hq_with_alloc_queue();
// Location of job scripts and naming currently hard-corded.
JobScriptLocator locator {script_dir, "job.sh", "job_", ".sh"};

// Read environment variables for configuration
char const *port_cstr = std::getenv("PORT");
int port = 0;
if (port_cstr == NULL)
{
std::cout << "Environment variable PORT not set! Using port 4242 as default." << std::endl;
port = 4242;
}
else
{
port = atoi(port_cstr);
}
std::shared_ptr<JobManager> job_manager = std::make_shared<CommandJobManager>(
std::move(job_submitter), std::move(comm_factory), locator);

char const *delay_cstr = std::getenv("HQ_SUBMIT_DELAY_MS");
if (delay_cstr != NULL)
{
hq_submit_delay_ms = atoi(delay_cstr);
}
std::cout << "HQ_SUBMIT_DELAY_MS set to " << hq_submit_delay_ms << std::endl;

// Initialize load balancer for each available model on the model server.
const std::vector<std::string> model_names = get_model_names();
std::vector<std::string> model_names = job_manager->getModelNames();

// Inform the user about the available models and the job scripts that will be used.
// Output a warning for unused model-specific job-scripts to prevent typos.
print_model_and_job_script_information(model_names);
locator.printModelJobScripts(model_names);

// Prepare models and serve via network
std::vector<LoadBalancer> LB_vector;
for (auto model_name : model_names)
{
// Set up and serve model
LB_vector.emplace_back(LoadBalancer{model_name});
for (auto model_name : model_names) {
LB_vector.emplace_back(model_name, job_manager);
}

// umbridge::serveModels currently only accepts raw pointers.
std::vector<umbridge::Model *> LB_ptr_vector(LB_vector.size());
std::transform(LB_vector.begin(), LB_vector.end(), LB_ptr_vector.begin(),
[](LoadBalancer& obj) { return &obj; });

std::cout << "Load balancer running port" << port << std::endl;
umbridge::serveModels(LB_ptr_vector, "0.0.0.0", port, true, false);
}
Loading
Loading