Skip to content

Commit

Permalink
WIP: Add URL transfer via HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
Schlevidon committed Jan 12, 2025
1 parent b501927 commit aa7de3d
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 15 deletions.
25 changes: 20 additions & 5 deletions hpc/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ int main(int argc, char* argv[]) {
port = std::stoi(port_str);
}

// Transfer method for model URL (currently either via filesystem or HTTP)
// TODO: Should it be optional? If so, which default value?
std::string url_transfer = get_arg(args, "url-transfer");
if (url_transfer.empty()) {
std::cerr << "Missing required argument: --url-transfer=[filesystem | http]" << std::endl;
return EXIT_FAILURE;
}


// URL transfer method
std::unique_ptr<JobCommunicatorFactory> comm_factory;
if (url_transfer == "filesystem") {
comm_factory = std::make_unique<FilesystemCommunicatorFactory>(url_directory, std::chrono::milliseconds(500));
} else if (url_transfer == "http") {
comm_factory = std::make_unique<NetworkCommunicatorFactory>();
} else {
std::cerr << "Unrecognized value for argument --url-transfer: "
<< "Expected filesystem or http but got " << url_transfer << " instead." << std::endl;
std::exit(-1);
}

// Assemble job manager
std::unique_ptr<JobSubmitter> job_submitter;
Expand All @@ -95,11 +115,6 @@ int main(int argc, char* argv[]) {
std::exit(-1);
}

// Only filesystem communication is implemented. May implement network-based communication in the future.
// Directory which stores URL files and polling cycle currently hard-coded.
std::unique_ptr<JobCommunicatorFactory> comm_factory
= std::make_unique<FilesystemCommunicatorFactory>(url_directory, std::chrono::milliseconds(500));

// Location of job scripts and naming currently hard-corded.
JobScriptLocator locator {script_dir, "job.sh", "job_", ".sh"};

Expand Down
55 changes: 52 additions & 3 deletions hpc/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <string>
#include <vector>
#include <future>


// Run a shell command and get the result.
Expand Down Expand Up @@ -61,6 +62,16 @@ void remove_trailing_newline(std::string& s) {
}
}

// Getting the IP Address/Hostname in a portable way is difficult.
// For now we rely on the 'hostname' command being available on the cluster.
// Note that the same mechanism is used by the job script to determine the hostname of the compute node.
std::string get_hostname() {
std::string command = "hostname -I | awk '{print $1}'";
std::string hostname = get_command_output(command);
remove_trailing_newline(hostname);
return hostname;
}

struct Command {
std::string exec;
std::vector<std::string> options;
Expand Down Expand Up @@ -321,14 +332,53 @@ class FilesystemCommunicatorFactory : public JobCommunicatorFactory {
class NetworkCommunicator : public JobCommunicator {
public:
NetworkCommunicator() {
// Open TCP port for listening
svr.Post(endpoint, [this](const httplib::Request& req, httplib::Response& res) {
this->model_url_promise.set_value(req.body);
svr.stop();
});

port = svr.bind_to_any_port("0.0.0.0");
std::cout << "Running server on port " << port << std::endl;

model_url_future = model_url_promise.get_future();
server_thread = std::thread([this]{ svr.listen_after_bind(); });
}

~NetworkCommunicator() override {
if (server_thread.joinable()) {
server_thread.join();
}
}

std::map<std::string, std::string> getInitMessage() override {
// Getting the IP Address/Hostname in a portable way is difficult.
// For now we rely on the 'hostname' command being available on the cluster.
// Note that the same mechanism is used by the job script to determine the hostname of the compute node.
std::string hostname = get_hostname();
std::string url = "http://" + hostname + ":" + std::to_string(port);
std::map<std::string, std::string> msg {
{"UMBRIDGE_LOADBALANCER_COMM_URL", url},
{"UMBRIDGE_LOADBALANCER_COMM_ENDPOINT", endpoint}
};
return msg;
}

std::string getModelUrl(const std::string& job_id) override {
std::string model_url = model_url_future.get();

server_thread.join();
return model_url;
}

private:
std::string host;
std::promise<std::string> model_url_promise;
std::future<std::string> model_url_future;
std::thread server_thread;

httplib::Server svr;
int port;

const std::string endpoint = "/url";
};

class NetworkCommunicatorFactory : public JobCommunicatorFactory {
Expand All @@ -338,7 +388,6 @@ class NetworkCommunicatorFactory : public JobCommunicatorFactory {
std::unique_ptr<JobCommunicator> create() override {
return std::make_unique<NetworkCommunicator>();
}
private:
};


Expand Down
5 changes: 4 additions & 1 deletion hpc/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
all: build-load-balancer build-is-port-free build-testmodel
all: build-load-balancer build-is-port-free build-http-post build-testmodel

build-load-balancer:
- g++ -O3 -Wno-unused-result -std=c++17 -I../lib/ LoadBalancer.cpp -o load-balancer -pthread

build-is-port-free:
- g++ -O3 -std=c++17 is_port_free.cpp -o is_port_free

build-http-post:
- g++ -O3 -std=c++17 http_post.cpp -o http_post

build-testmodel:
- g++ -O3 -Wno-unused-result -std=c++17 -I../lib/ ../models/testmodel/minimal-server.cpp -o testmodel -pthread
16 changes: 13 additions & 3 deletions hpc/hq_scripts/job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,18 @@ while ! curl -s "http://$host:$port/Info" > /dev/null; do
done
echo "Model server responded"

# Write server URL to file identified by HQ job ID.
mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR
echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt"
# Send back the model server URL to the loadbalancer.
if [ -n "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR" ]; then
# Using the shared filesystem
mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR
echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt"
elif [ -n "$UMBRIDGE_LOADBALANCER_COMM_URL" ] && [ -n "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" ]; then
# Using HTTP
./http_post "$UMBRIDGE_LOADBALANCER_COMM_URL" "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" "http://$host:$port"
else
echo "Error: Environment variable required to send model server URL back to load balancer not set!"
exit 1
fi


sleep infinity # keep the job occupied
38 changes: 38 additions & 0 deletions hpc/http_post.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "../lib/httplib.h"

#include <iostream>

// Usage: http_post <server url> <endpoint> <string message>
int main(int argc, char* argv[])
{
// Get command line args
if (argc < 2) {
std::cerr << "Missing required positional argument: server url" << std::endl;
return EXIT_FAILURE;
}
const std::string url(argv[1]);

if (argc < 3) {
std::cerr << "Missing required positional argument: endpoint" << std::endl;
return EXIT_FAILURE;
}
const std::string endpoint(argv[2]);

if (argc < 4) {
std::cerr << "Missing required positional argument: string message" << std::endl;
return EXIT_FAILURE;
}
const std::string message(argv[3]);

httplib::Client cli(url);
auto res = cli.Post(endpoint.c_str(), message, "text/plain");

if (!res) {
std::cerr << "Client failed to POST: " << httplib::to_string(res.error()) << std::endl;
std::cerr << "Failed connection attempt on URL: " << url << endpoint << std::endl;
return EXIT_FAILURE;
} else if (res->status != 200) {
std::cerr << "Server returned error code: " << res->status << std::endl;
return EXIT_FAILURE;
}
}
16 changes: 13 additions & 3 deletions hpc/slurm_scripts/job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,18 @@ while ! curl -s "http://$host:$port/Info" > /dev/null; do
done
echo "Model server responded"

# Write server URL to file identified by HQ job ID.
mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR
echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$SLURM_JOB_ID.txt"
# Send back the model server URL to the loadbalancer.
if [ -n "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR" ]; then
# Using the shared filesystem
mkdir -p $UMBRIDGE_LOADBALANCER_COMM_FILEDIR
echo "http://$host:$port" > "$UMBRIDGE_LOADBALANCER_COMM_FILEDIR/url-$HQ_JOB_ID.txt"
elif [ -n "$UMBRIDGE_LOADBALANCER_COMM_URL" ] && [ -n "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" ]; then
# Using HTTP
./http_post "$UMBRIDGE_LOADBALANCER_COMM_URL" "$UMBRIDGE_LOADBALANCER_COMM_ENDPOINT" "http://$host:$port"
else
echo "Error: Environment variable required to send model server URL back to load balancer not set!"
exit 1
fi


sleep infinity # keep the job occupied

0 comments on commit aa7de3d

Please sign in to comment.