From aa7de3d2327f893232c4ffd44024bfbf3415da0c Mon Sep 17 00:00:00 2001 From: Lev Gromov Date: Sun, 12 Jan 2025 19:41:30 +0100 Subject: [PATCH] WIP: Add URL transfer via HTTP --- hpc/LoadBalancer.cpp | 25 ++++++++++++++---- hpc/LoadBalancer.hpp | 55 +++++++++++++++++++++++++++++++++++++--- hpc/Makefile | 5 +++- hpc/hq_scripts/job.sh | 16 +++++++++--- hpc/http_post.cpp | 38 +++++++++++++++++++++++++++ hpc/slurm_scripts/job.sh | 16 +++++++++--- 6 files changed, 140 insertions(+), 15 deletions(-) create mode 100644 hpc/http_post.cpp diff --git a/hpc/LoadBalancer.cpp b/hpc/LoadBalancer.cpp index 75242a5..cff148f 100644 --- a/hpc/LoadBalancer.cpp +++ b/hpc/LoadBalancer.cpp @@ -78,6 +78,26 @@ int main(int argc, char* argv[]) { port = std::stoi(port_str); } + // Transfer method for model URL (currently either via filesystem or HTTP) + // TODO: Should it be optional? If so, which default value? + std::string url_transfer = get_arg(args, "url-transfer"); + if (url_transfer.empty()) { + std::cerr << "Missing required argument: --url-transfer=[filesystem | http]" << std::endl; + return EXIT_FAILURE; + } + + + // URL transfer method + std::unique_ptr comm_factory; + if (url_transfer == "filesystem") { + comm_factory = std::make_unique(url_directory, std::chrono::milliseconds(500)); + } else if (url_transfer == "http") { + comm_factory = std::make_unique(); + } else { + std::cerr << "Unrecognized value for argument --url-transfer: " + << "Expected filesystem or http but got " << url_transfer << " instead." << std::endl; + std::exit(-1); + } // Assemble job manager std::unique_ptr job_submitter; @@ -95,11 +115,6 @@ int main(int argc, char* argv[]) { std::exit(-1); } - // Only filesystem communication is implemented. May implement network-based communication in the future. - // Directory which stores URL files and polling cycle currently hard-coded. - std::unique_ptr comm_factory - = std::make_unique(url_directory, std::chrono::milliseconds(500)); - // Location of job scripts and naming currently hard-corded. JobScriptLocator locator {script_dir, "job.sh", "job_", ".sh"}; diff --git a/hpc/LoadBalancer.hpp b/hpc/LoadBalancer.hpp index 56c57d7..7500c48 100644 --- a/hpc/LoadBalancer.hpp +++ b/hpc/LoadBalancer.hpp @@ -9,6 +9,7 @@ #include #include #include +#include // Run a shell command and get the result. @@ -61,6 +62,16 @@ void remove_trailing_newline(std::string& s) { } } +// Getting the IP Address/Hostname in a portable way is difficult. +// For now we rely on the 'hostname' command being available on the cluster. +// Note that the same mechanism is used by the job script to determine the hostname of the compute node. +std::string get_hostname() { + std::string command = "hostname -I | awk '{print $1}'"; + std::string hostname = get_command_output(command); + remove_trailing_newline(hostname); + return hostname; +} + struct Command { std::string exec; std::vector options; @@ -321,14 +332,53 @@ class FilesystemCommunicatorFactory : public JobCommunicatorFactory { class NetworkCommunicator : public JobCommunicator { public: NetworkCommunicator() { - // Open TCP port for listening + svr.Post(endpoint, [this](const httplib::Request& req, httplib::Response& res) { + this->model_url_promise.set_value(req.body); + svr.stop(); + }); + + port = svr.bind_to_any_port("0.0.0.0"); + std::cout << "Running server on port " << port << std::endl; + + model_url_future = model_url_promise.get_future(); + server_thread = std::thread([this]{ svr.listen_after_bind(); }); + } + + ~NetworkCommunicator() override { + if (server_thread.joinable()) { + server_thread.join(); + } } std::map getInitMessage() override { + // Getting the IP Address/Hostname in a portable way is difficult. + // For now we rely on the 'hostname' command being available on the cluster. + // Note that the same mechanism is used by the job script to determine the hostname of the compute node. + std::string hostname = get_hostname(); + std::string url = "http://" + hostname + ":" + std::to_string(port); + std::map msg { + {"UMBRIDGE_LOADBALANCER_COMM_URL", url}, + {"UMBRIDGE_LOADBALANCER_COMM_ENDPOINT", endpoint} + }; + return msg; } + + std::string getModelUrl(const std::string& job_id) override { + std::string model_url = model_url_future.get(); + + server_thread.join(); + return model_url; + } + private: - std::string host; + std::promise model_url_promise; + std::future model_url_future; + std::thread server_thread; + + httplib::Server svr; int port; + + const std::string endpoint = "/url"; }; class NetworkCommunicatorFactory : public JobCommunicatorFactory { @@ -338,7 +388,6 @@ class NetworkCommunicatorFactory : public JobCommunicatorFactory { std::unique_ptr create() override { return std::make_unique(); } -private: }; diff --git a/hpc/Makefile b/hpc/Makefile index 3a420d4..f192b7b 100644 --- a/hpc/Makefile +++ b/hpc/Makefile @@ -1,4 +1,4 @@ -all: build-load-balancer build-is-port-free build-testmodel +all: build-load-balancer build-is-port-free build-http-post build-testmodel build-load-balancer: - g++ -O3 -Wno-unused-result -std=c++17 -I../lib/ LoadBalancer.cpp -o load-balancer -pthread @@ -6,5 +6,8 @@ build-load-balancer: build-is-port-free: - g++ -O3 -std=c++17 is_port_free.cpp -o is_port_free +build-http-post: + - g++ -O3 -std=c++17 http_post.cpp -o http_post + build-testmodel: - g++ -O3 -Wno-unused-result -std=c++17 -I../lib/ ../models/testmodel/minimal-server.cpp -o testmodel -pthread diff --git a/hpc/hq_scripts/job.sh b/hpc/hq_scripts/job.sh index 8c4a38f..2db4221 100755 --- a/hpc/hq_scripts/job.sh +++ b/hpc/hq_scripts/job.sh @@ -42,8 +42,18 @@ while ! curl -s "http://$host:$port/Info" > /dev/null; do done echo "Model server responded" -# Write server URL to file identified by HQ job ID. -mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR -echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt" +# Send back the model server URL to the loadbalancer. +if [ -n "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR" ]; then + # Using the shared filesystem + mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR + echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt" +elif [ -n "$UMBRIDGE_LOADBALANCER_COMM_URL" ] && [ -n "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" ]; then + # Using HTTP + ./http_post "$UMBRIDGE_LOADBALANCER_COMM_URL" "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" "http://$host:$port" +else + echo "Error: Environment variable required to send model server URL back to load balancer not set!" + exit 1 +fi + sleep infinity # keep the job occupied diff --git a/hpc/http_post.cpp b/hpc/http_post.cpp new file mode 100644 index 0000000..c613cc7 --- /dev/null +++ b/hpc/http_post.cpp @@ -0,0 +1,38 @@ +#include "../lib/httplib.h" + +#include + +// Usage: http_post +int main(int argc, char* argv[]) +{ + // Get command line args + if (argc < 2) { + std::cerr << "Missing required positional argument: server url" << std::endl; + return EXIT_FAILURE; + } + const std::string url(argv[1]); + + if (argc < 3) { + std::cerr << "Missing required positional argument: endpoint" << std::endl; + return EXIT_FAILURE; + } + const std::string endpoint(argv[2]); + + if (argc < 4) { + std::cerr << "Missing required positional argument: string message" << std::endl; + return EXIT_FAILURE; + } + const std::string message(argv[3]); + + httplib::Client cli(url); + auto res = cli.Post(endpoint.c_str(), message, "text/plain"); + + if (!res) { + std::cerr << "Client failed to POST: " << httplib::to_string(res.error()) << std::endl; + std::cerr << "Failed connection attempt on URL: " << url << endpoint << std::endl; + return EXIT_FAILURE; + } else if (res->status != 200) { + std::cerr << "Server returned error code: " << res->status << std::endl; + return EXIT_FAILURE; + } +} \ No newline at end of file diff --git a/hpc/slurm_scripts/job.sh b/hpc/slurm_scripts/job.sh index 23a5719..e1d1862 100755 --- a/hpc/slurm_scripts/job.sh +++ b/hpc/slurm_scripts/job.sh @@ -40,8 +40,18 @@ while ! curl -s "http://$host:$port/Info" > /dev/null; do done echo "Model server responded" -# Write server URL to file identified by HQ job ID. -mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR -echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$SLURM_JOB_ID.txt" +# Send back the model server URL to the loadbalancer. +if [ -n "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR" ]; then + # Using the shared filesystem + mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR + echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt" +elif [ -n "$UMBRIDGE_LOADBALANCER_COMM_URL" ] && [ -n "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" ]; then + # Using HTTP + ./http_post "$UMBRIDGE_LOADBALANCER_COMM_URL" "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" "http://$host:$port" +else + echo "Error: Environment variable required to send model server URL back to load balancer not set!" + exit 1 +fi + sleep infinity # keep the job occupied