diff --git a/CMakeLists.txt b/CMakeLists.txt index 241b110..b7573ea 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -93,7 +93,6 @@ set_target_properties(common PROPERTIES POSITION_INDEPENDENT_CODE ON) ###### scheduler ###### add_executable(scheduler src/nexus/scheduler/backend_delegate.cpp - src/nexus/scheduler/complex_query.cpp src/nexus/scheduler/frontend_delegate.cpp src/nexus/scheduler/sch_info.cpp src/nexus/scheduler/scheduler.cpp diff --git a/apps/traffic_app_simple/Dockerfile b/apps/traffic_app_simple/Dockerfile deleted file mode 100644 index 141b7e6..0000000 --- a/apps/traffic_app_simple/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM nexus/applib - -ADD . /app -RUN cd /app && make clean && make all diff --git a/apps/traffic_app_simple/Makefile b/apps/traffic_app_simple/Makefile deleted file mode 100644 index e77e626..0000000 --- a/apps/traffic_app_simple/Makefile +++ /dev/null @@ -1,25 +0,0 @@ -# c++ configs -CXX = g++ -WARNING = -Wall -Wfatal-errors -Wno-unused -Wno-unused-result -CXXFLAGS = -std=c++11 -O3 -fPIC $(WARNING) -I../../src -I../../build/gen \ - -I/usr/local/cuda/include -LD_FLAGS = -lm -pthread -lglog -lgflags -lboost_system -lboost_thread \ - -lboost_filesystem -lyaml-cpp `pkg-config --libs protobuf` \ - `pkg-config --libs grpc++ grpc` `pkg-config --libs opencv` \ - -L/usr/local/cuda/lib64 -lcuda -lcudart \ - -L../../build/lib -lnexus -Wl,-rpath,../../build/lib - -all: bin/traffic_app - -bin/traffic_app : obj/main.o - @mkdir -p $(@D) - $(CXX) $(CXXFLAGS) $^ $(LD_FLAGS) -o $@ - -obj/%.o : src/%.cpp - @mkdir -p $(@D) - $(CXX) $(CXXFLAGS) -c $< -o $@ - -clean: - rm -rf bin obj - -.phony: clean diff --git a/apps/traffic_app_simple/src/main.cpp b/apps/traffic_app_simple/src/main.cpp deleted file mode 100644 index 2ddaeac..0000000 --- a/apps/traffic_app_simple/src/main.cpp +++ /dev/null @@ -1,105 +0,0 @@ -#include - -#include "nexus/app/complex_query_app.h" - -using namespace nexus; -using namespace nexus::app; - -class TrafficApp : public ComplexQueryApp { - public: - TrafficApp(std::string port, std::string rpc_port, std::string sch_addr, - size_t nthreads, int latency_slo) : - ComplexQueryApp(port, rpc_port, sch_addr, nthreads) {} - void Setup() final { - Initialize(3, 2, latency_slo); - //Get model handlers - //200 is estimate_latency - ssd_model_ = AddModel("tensorflow", "ssd_mobilenet", 1, - 200, 1, {}); - car_model_ = AddModel("caffe2", "googlenet_cars", 1, 200, 1); - face_model_ = AddModel("caffe2", "vgg_face_0", 1, 200, 1, {}); - //Build dataflow graph - AddEdge(ssd_model_, car_model_); - AddEdge(ssd_model_, face_model_); - //LoadDependency to scheduler - LoadDependency(); - //Build exec blocks - auto func1 = [&](std::shared_ptr ctx) { - auto ssd_output = ssd_model_->Execute(ctx, ctx->const_request().input()); - return std::vector{ - std::make_shared("ssd_output", ssd_output)}; - }; - AddExecBlock(func1, {}); - - auto func2 = [&](std::shared_ptr ctx) { - auto ssd_output = ctx->GetVariable("ssd_output")->result(); - std::vector > results; - std::vector car_boxes; - std::vector face_boxes; - for (int i = 0; i < ssd_output->num_records(); ++i) { - auto& rec = (*ssd_output)[i]; - auto name = rec["class_name"].as(); - if (name == "car" || name == "truck") { - car_boxes.push_back(rec["rect"].as()); - } else if (name == "person") { - face_boxes.push_back(rec["rect"].as()); - } - } - if (!car_boxes.empty()) { - results.push_back( - car_model_->Execute(ctx, ctx->const_request().input(), {}, 1, - car_boxes)); - } - if (!face_boxes.empty()) { - results.push_back( - face_model_->Execute(ctx, ctx->const_request().input(), {}, 1, - face_boxes)); - } - return std::vector{ - std::make_shared("rec_output", results)}; - }; - AddExecBlock(func2, {"ssd_output"}); - - auto func3 = [&](std::shared_ptr ctx) { - auto rec_output = ctx->GetVariable("rec_output"); - if (rec_output->count() > 0) { - rec_output->result()->ToProto(ctx->reply()); - } - return std::vector{}; - }; - AddExecBlock(func3, {"rec_output"}); - //Build query processor - BuildQueryProcessor(); - - private: - int ssd_latency_ms_; - int rec_latency_ms_; - std::shared_ptr ssd_model_; - std::shared_ptr car_model_; - std::shared_ptr face_model_; -}; - -DEFINE_string(port, "9001", "Server port"); -DEFINE_string(rpc_port, "9002", "RPC port"); -DEFINE_string(sch_addr, "127.0.0.1", "Scheduler address"); -DEFINE_int32(nthread, 4, "Number of threads processing requests"); -DEFINE_int32(latency, 400, "Latency SLO for query in ms"); - -int main(int argc, char** argv) { - // log to stderr - FLAGS_logtostderr = 1; - // Init glog - google::InitGoogleLogging(argv[0]); - // Parse command line flags - google::ParseCommandLineFlags(&argc, &argv, true); - // Setup backtrace on segfault - google::InstallFailureSignalHandler(); - - LOG(INFO) << "App port " << FLAGS_port << ", rpc port " << FLAGS_rpc_port; - // Create the frontend server - TrafficApp app(FLAGS_port, FLAGS_rpc_port, FLAGS_sch_addr, FLAGS_nthread, - FLAGS_latency); - LaunchApp(&app); - - return 0; -} diff --git a/src/nexus/app/ complex_query_app.cpp b/src/nexus/app/ complex_query_app.cpp deleted file mode 100644 index 4d96f63..0000000 --- a/src/nexus/app/ complex_query_app.cpp +++ /dev/null @@ -1,67 +0,0 @@ -#include "nexus/app/complex_query_app.h" - -namespace nexus{ -namespace app { - -ComplexQueryApp::ComplexQueryApp(std::string port, std::string rpc_port, - std::string sch_addr, size_t nthreads) : - AppBase(port, rpc_port, sch_addr, nthreads) { - SetComplexQuery(); -} - -ComplexQueryApp::Initialize(size_t n, size_t m, int latency_sla) { - n_ = n; - m_ = m; - latency_sla_ = latency_sla; - proto_ = request_.mutable_dependency(); - proto_->set_n(2); - proto_->set_m(1); - proto_->set_latency(latency_slo_); - -} - -std::shared_ptr ComplexQueryApp::AddModel(std::string &framework, - std::string &model, uint version, - uint estimate_latency, uint estimate_workload, - std::vector image_size) { - auto model_handler = GetModelHandler(true, framework, model, version, - estimate_latency, estimate_workload, - image_size); - model_handlers_[model_handler.VirtualModelSessionId()] = model_handler; - auto model_sess = model_handler.GetModelSession(); - model_sess.set_estimate_latency(estimate_latency); - proto_->add_models()->CopyFrom(model_sess); -} - - void ComplexQueryApp::AddEdge(std::shared_ptr model1, - std::shared_ptr model2); { - auto* edge = proto->add_edges(); - edge->mutable_v1()->CopyFrom(model1.GetModelSession()); - edge->mutable_v2()->CopyFrom(model2.GetModelSession()); -} - -void ComplexQueryApp::LoadDependency() { - LoadDependency(request_); -} - -void ComplexQueryApp::BuildQueryProcessor() { - qp_ = new QueryProcessor(exec_blocks_); -} - -void ComplexQueryApp::AddExecBlock(ExecFunc func, std::vector variables) { - int id = exec_blocks_.size(); - ExecBlock* exec_block = new ExecBlock(id, func, variables); - exec_blocks_.push_back(exec_block); -} - -RectProto ComplexQueryApp::GetRect(int left, int right, int top, int bottom) { - RectProto rect; - rect.set_left(left); - rect.set_right(right); - rect.set_top(top); - rect.set_bottom(bottom); - return rect; -} - -} //namespace app -} //namespace nexus diff --git a/src/nexus/app/app_base.cpp b/src/nexus/app/app_base.cpp index 48911ea..c911366 100644 --- a/src/nexus/app/app_base.cpp +++ b/src/nexus/app/app_base.cpp @@ -21,26 +21,17 @@ void AppBase::Start() { Run(qp_, nthreads_); } -std::shared_ptr AppBase::GetModelHandler(bool complex_query, +std::shared_ptr AppBase::GetModelHandler( const std::string& framework, const std::string& model_name, uint32_t version, uint64_t latency_sla, float estimate_workload, std::vector image_size, LoadBalancePolicy lb_policy) { LoadModelRequest req; req.set_node_id(node_id()); - req.set_complex_query(complex_query); auto model_sess = req.mutable_model_session(); model_sess->set_framework(framework); model_sess->set_model_name(model_name); model_sess->set_version(version); - if(complex_query) { - model_sess->set_latency_sla(0); - } else { - model_sess->set_latency_sla(latency_sla); - } - model_sess->set_estimate_latency(latency_sla); - LOG(INFO) << "[---LoadModelRequest---latency]" << latency_sla; - LOG(INFO) << "[---LoadModelRequest---latency & est_lat]" <latency_sla()<< model_sess->estimate_latency(); - + model_sess->set_latency_sla(latency_sla); if (image_size.size() > 0) { if (image_size.size() != 2) { LOG(ERROR) << "Image size is not 2"; diff --git a/src/nexus/app/app_base.h b/src/nexus/app/app_base.h index 0ef55dd..ca6779f 100644 --- a/src/nexus/app/app_base.h +++ b/src/nexus/app/app_base.h @@ -23,7 +23,7 @@ class AppBase : public Frontend { //virtual void Process(const RequestProto& request, ReplyProto* reply) {} protected: - std::shared_ptr GetModelHandler(const bool complex_query, + std::shared_ptr GetModelHandler( const std::string& framework, const std::string& model_name, uint32_t version, uint64_t latency_sla, float estimate_workload=0., std::vector image_size={}, diff --git a/src/nexus/app/complex_query_app.h b/src/nexus/app/complex_query_app.h deleted file mode 100644 index 581fe0c..0000000 --- a/src/nexus/app/complex_query_app.h +++ /dev/null @@ -1,81 +0,0 @@ -#ifndef NEXUS_APP_COMPLEX_QUERY_APP_H_ -#define NEXUS_APP_COMPLEX_QUERY_APP_H_ - -#include - -#include "nexus/app/app_base.h" - -namespace nexus{ -namespace app { - -class ComplexQueryApp : public AppBase { - public: - /*! - * \brief Create App for a complex query. - * \param port Port of socket. - * \param rpc_port Port of rpc call. - * \param sch_addr Address of scheduler. - * \param nthreads Number of threads in frontend. - */ - ComplexQueryApp(std::string port, std::string rpc_port, std::string sch_addr, - size_t nthreads); - /*! - * \brief Iinitialize complex query app. - * \param n Number of models in the complex query. - * \param m Number of edges in dataflow graph. - * \param latency_sla Latency sla requirement of the app. - */ - void Initialize(uint n, uint m, uint latency_sla); - /*! - * \brief Get model handler and add model to LoadDependency Request. - * \param n Number of models in the complex query. - * \param m Number of edges in dataflow graph. - * \param latency_sla Latency sla requirement of the app. - */ - std::shared_ptr AddModel(std::string &framework, std::string &model, uint version, - uint estimate_latency, uint estimate_workload, - std::vector image_size); - /*! - * \brief Add edge to LoadDependency Request. - * \params There is a edge in the dataflow graph from model1 to model2. - */ - void AddEdge(std::shared_ptr model1, - std::shared_ptr model2); - /*! - * \brief LoadDependencyRequest construction finished. - */ - void LoadDependency(); - /*! - * \brief Construct an ExecBlock and add it to query processor construction. - * \param func Function in the ExecBlock. - * \params variables variables needed (must have gotten value) in the Function. - */ - void AddExecBlock(ExecFunc func, std::vector variables); - /*! - * \brief Construct queryProcessor according to Exec Blocks added. - */ - void BuildQueryProcessor(); - /*! \brief Construct RectProto */ - RectProto GetRect(int left, int right, int top, int bottom); - - private: - /*! \brief Number of models */ - size_t n_; - /*! \brief Number of edges */ - size_t m_; - /*! \brief Latency required of the complex query*/ - int latency_sla_; - /*! \brief Map from model session id (lat = 0) to model handler */ - std::unordered_map > models_; - /*! \brief request for LoadDependency */ - LoadDependencyRequest request_; - /*! \brief ptoro for LoadDependencyRequest */ - LoadDependencyProto* proto_; - /*! \brief exec blocks of query processor */ - std::vector exec_blocks_; -}; - -} //namespace app -} //namespace nexus - -#endif // NEXUS_APP_COMPLEX_QUERY_APP_H_ diff --git a/src/nexus/app/frontend.cpp b/src/nexus/app/frontend.cpp index b29e383..034ea0b 100644 --- a/src/nexus/app/frontend.cpp +++ b/src/nexus/app/frontend.cpp @@ -26,9 +26,6 @@ Frontend::Frontend(std::string port, std::string rpc_port, sch_stub_ = SchedulerCtrl::NewStub(channel); // Init Node ID and register frontend to scheduler Register(); - interval_ = 20000; - std::thread t(&Frontend::report, this, interval_); - t.detach(); } Frontend::~Frontend() { @@ -37,35 +34,6 @@ Frontend::~Frontend() { } } -void Frontend::report(uint32_t interval_) { - while(true) { - auto begin = std::chrono::high_resolution_clock::now(); - std::this_thread::sleep_for(std::chrono::milliseconds(interval_)); - if(!complex_query_) continue; - CurRpsRequest request; - auto proto = request.mutable_cur_rps(); - proto->set_node_id(node_id()); - auto end = std::chrono::high_resolution_clock::now(); - auto int_ms = std::chrono::duration_cast(end - begin); - proto->set_interval(int_ms.count()); - proto->set_n(model_pool_.size()); - for (auto it = model_pool_.begin(); it != model_pool_.end(); ++it) { - std::string name = it->first; - auto modelHandler = it->second; - uint32_t count = modelHandler->count(); - ModelRps* modelRps = proto->add_model_rps(); - modelRps->set_model(name); - modelRps->set_rps(count); - } - RpcReply reply; - // Inovke RPC CheckAlive - grpc::ClientContext context; - grpc::Status status = sch_stub_->CurRps(&context, request, &reply); - if (reply.status() != CTRL_OK) { - LOG(ERROR) << status.error_code() << ": " << status.error_message(); - } - } -} void Frontend::Run(QueryProcessor* qp, size_t nthreads) { for (size_t i = 0; i < nthreads; ++i) { std::unique_ptr worker(new Worker(qp, request_pool_)); @@ -144,13 +112,7 @@ void Frontend::HandleMessage(std::shared_ptr conn, case kBackendReply: { QueryResultProto result; message->DecodeBody(&result); - std::string real_model_session_id = result.model_session_id(); - std::string model_session_id = real_model_session_id; - if(complex_query_) { - LOG(INFO) << "[---real_model_session_id---]"< Frontend::LoadModel(const LoadModelRequest& req, LOG(ERROR) << "Load model error: " << CtrlStatus_Name(reply.status()); return nullptr; } - auto model_session_id = reply.model_route().model_session_id(); - if(req.complex_query()) { - model_session_id = ModelSessionDelLatency(model_session_id); - } auto model_handler = std::make_shared( reply.model_route().model_session_id(), backend_pool_, lb_policy); // Only happens at Setup stage, so no concurrent modification to model_pool_ @@ -249,25 +207,6 @@ std::shared_ptr Frontend::LoadModel(const LoadModelRequest& req, return model_handler; } -void Frontend::LoadDependency(LoadDependencyRequest& req) { - LOG(INFO) << "[---Load model dependency---]"; - complex_query_ = true; - RpcReply reply; - req.set_node_id(node_id_); - grpc::ClientContext context; - grpc::Status status = sch_stub_->LoadDependency(&context, req, &reply); - LOG(INFO) << "[---Model dependency Loaded---]"; - if (!status.ok()) { - LOG(ERROR) << "Failed to connect to scheduler: " << - status.error_message() << "(" << status.error_code() << ")"; - return ; - } - if (reply.status() != CTRL_OK) { - LOG(ERROR) << "Load dependency error: " << CtrlStatus_Name(reply.status()); - return ; - } -} - void Frontend::Register() { // Init node id std::uniform_int_distribution dis( @@ -342,35 +281,15 @@ void Frontend::KeepAlive() { } bool Frontend::UpdateBackendPoolAndModelRoute(const ModelRouteProto& route) { - auto& real_model_session_id = route.model_session_id(); - LOG(INFO) << "Update model route for " << real_model_session_id; + auto& model_session_id = route.model_session_id(); + LOG(INFO) << "Update model route for " << model_session_id; // LOG(INFO) << route.DebugString(); - std::string model_session_id = real_model_session_id; - if(complex_query_) { - model_session_id = ModelSessionDelLatency(real_model_session_id); - } - LOG(INFO) << "[---Update model route for---] " << model_session_id; - uint32_t latency = LatencyOfModelSession(real_model_session_id); auto iter = model_pool_.find(model_session_id); if (iter == model_pool_.end()) { LOG(ERROR) << "Cannot find model handler for " << model_session_id; return false; } - if(complex_query_) { - latency_pool_[model_session_id] = latency; - model_session_pool_[model_session_id] = real_model_session_id; - } - LOG(INFO) << "[---updated model session pool and latency pool---]"; auto model_handler = iter->second; - LOG(INFO) << "[---updated model session pool and latency pool---1---]"; - if(complex_query_) { - LOG(INFO) << "[---updated model session pool and latency pool---2---]"; - if (model_handler == nullptr) { - LOG(INFO) << "[---model_handler is null---]"; - } - model_handler -> SetRealModelSessionId(real_model_session_id); - } - LOG(INFO) << "[---updated model session id---]"; // Update backend pool first { std::lock_guard lock(backend_sessions_mu_); diff --git a/src/nexus/app/frontend.h b/src/nexus/app/frontend.h index 1bbde91..cd44afd 100644 --- a/src/nexus/app/frontend.h +++ b/src/nexus/app/frontend.h @@ -7,7 +7,6 @@ #include #include #include -#include #include "nexus/app/model_handler.h" #include "nexus/app/query_processor.h" @@ -35,9 +34,6 @@ class Frontend : public ServerBase, public MessageHandler { //virtual void Process(const RequestProto& request, ReplyProto* reply) = 0; - //Report rps - void report(uint32_t interval_); - uint32_t node_id() const { return node_id_; } std::string rpc_port() const { return rpc_service_.port(); } @@ -63,16 +59,10 @@ class Frontend : public ServerBase, public MessageHandler { boost::system::error_code ec) final; void UpdateModelRoutes(const ModelRouteUpdates& request, RpcReply* reply); - - //void UpdateModelLatencies(const ModelRouteLatencies& request, RpcReply* reply); std::shared_ptr GetUserSession(uint32_t uid); std::shared_ptr LoadModel(const LoadModelRequest& req); - - void LoadDependency(LoadDependencyRequest& req); - - void SetComplexQuery() { complex_query_ = true; } std::shared_ptr LoadModel(const LoadModelRequest& req, LoadBalancePolicy lb_policy); @@ -126,17 +116,6 @@ class Frontend : public ServerBase, public MessageHandler { std::unordered_map > model_pool_; std::thread daemon_thread_; - - /*! - * \brief Map from model session ID to model latency (complex query) - * Guarded by model_pool_mu_. - */ - std::unordered_map latency_pool_; - /*! - * \brief Map from model session ID to real model session ID (contains latency) - * Guarded by model_pool_mu. - */ - std::unordered_map model_session_pool_; /*! \brief Mutex for connection_pool_ and user_sessions_ */ std::mutex user_mutex_; @@ -144,8 +123,6 @@ class Frontend : public ServerBase, public MessageHandler { /*! \brief Random number generator */ std::random_device rd_; std::mt19937 rand_gen_; - uint32_t interval_; - bool complex_query_; }; } // namespace app diff --git a/src/nexus/app/model_handler.cpp b/src/nexus/app/model_handler.cpp index e87e0ce..2ea1dc8 100644 --- a/src/nexus/app/model_handler.cpp +++ b/src/nexus/app/model_handler.cpp @@ -90,7 +90,6 @@ ModelHandler::ModelHandler(const std::string& model_session_id, running_ = true; deficit_thread_ = std::thread(&ModelHandler::DeficitDaemon, this); } - real_model_session_id_ = ""; } ModelHandler::~ModelHandler() { @@ -101,12 +100,6 @@ ModelHandler::~ModelHandler() { } } -uint32_t ModelHandler::count() { - uint32_t ret = num_; - num_ = 0; - return ret; -} - std::shared_ptr ModelHandler::Execute( std::shared_ptr ctx, const ValueProto& input, std::vector output_fields, uint32_t topk, @@ -121,7 +114,7 @@ std::shared_ptr ModelHandler::Execute( } QueryProto query; query.set_query_id(qid); - query.set_model_session_id(model_session_id()); + query.set_model_session_id(model_session_id_); query.mutable_input()->CopyFrom(input); for (auto field : output_fields) { query.add_output_field(field); diff --git a/src/nexus/app/model_handler.h b/src/nexus/app/model_handler.h index 79b747f..df34e32 100644 --- a/src/nexus/app/model_handler.h +++ b/src/nexus/app/model_handler.h @@ -29,7 +29,6 @@ class QueryResult { */ QueryResult(uint64_t qid); - uint32_t count(); bool ready() const { return ready_; } uint64_t query_id() const { return qid_; } @@ -84,18 +83,7 @@ class ModelHandler { ~ModelHandler(); - uint32_t count(); - - std::string model_session_id() const { - if(real_model_session_id_ == "") { - return model_session_id_; - } - return real_model_session_id_; - } - - void SetRealModelSessionId(std::string real_model_session_id) { - real_model_session_id_ = real_model_session_id; - } + std::string model_session_id() const { return model_session_id_; } std::shared_ptr counter() const { return counter_; } @@ -109,10 +97,6 @@ class ModelHandler { void UpdateRoute(const ModelRouteProto& route); std::vector BackendList(); - - ModelSession GetModelSession() { - return model_session_; - } private: std::shared_ptr GetBackend(); @@ -124,7 +108,6 @@ class ModelHandler { void DeficitDaemon(); ModelSession model_session_; - std::string real_model_session_id_; std::string model_session_id_; BackendPool& backend_pool_; LoadBalancePolicy lb_policy_; @@ -155,7 +138,6 @@ class ModelHandler { std::atomic running_; std::thread deficit_thread_; - uint32_t num_; }; } // namespace app diff --git a/src/nexus/app/request_context.h b/src/nexus/app/request_context.h index 59f7844..d3b51aa 100644 --- a/src/nexus/app/request_context.h +++ b/src/nexus/app/request_context.h @@ -36,8 +36,6 @@ class Variable { } bool ready() const { return pending_results_.empty(); } - - size_t count() const { return data_.size(); } std::string name() const { return name_; } diff --git a/src/nexus/common/backend_pool.h b/src/nexus/common/backend_pool.h index cc727e6..07cd96d 100644 --- a/src/nexus/common/backend_pool.h +++ b/src/nexus/common/backend_pool.h @@ -68,7 +68,6 @@ class BackendPool { void StopAll(); protected: - // brief from backend.node_id() to BackendSession std::unordered_map > backends_; std::mutex mu_; }; diff --git a/src/nexus/common/model_def.h b/src/nexus/common/model_def.h index a15122f..e018bac 100644 --- a/src/nexus/common/model_def.h +++ b/src/nexus/common/model_def.h @@ -28,40 +28,6 @@ inline void ParseModelID(const std::string model_id, model_session->set_version(std::stoi(tokens[2])); } -inline std::string ModelSessionDelLatency(const std::string model_id) { - std::vector tokens; - SplitString(model_id, ':', &tokens); - std::stringstream ss; - ss << tokens[0] << ":" << - tokens[1] << ":" << tokens[2]; - if (tokens.size() > 4) { - ss << ":" << tokens[3]; - } - ss << ":" << "0"; - return ss.str(); -} - -inline std::string ModelSessionAddLatency(const std::string model_id, const uint32_t latency) { - std::vector tokens; - SplitString(model_id, ':', &tokens); - std::stringstream ss; - ss << tokens[0] << ":" << - tokens[1] << ":" << tokens[2]; - if (tokens.size() > 4) { - ss << ":" << tokens[3]; - } - ss << ":" << std::to_string(latency); - return ss.str(); -} - -inline uint32_t LatencyOfModelSession(const std::string model_id) { - std::vector tokens; - SplitString(model_id, ':', &tokens); - int id = tokens.size() > 4 ? 4 : 3; - return std::stoi(tokens[id]); - -} - inline std::string ModelSessionToModelID(const ModelSession& model_session) { std::stringstream ss; ss << model_session.framework() << ":" << model_session.model_name() << ":" diff --git a/src/nexus/proto/control.proto b/src/nexus/proto/control.proto index ec3e020..24f0385 100644 --- a/src/nexus/proto/control.proto +++ b/src/nexus/proto/control.proto @@ -10,14 +10,11 @@ service SchedulerCtrl { rpc LoadModel(LoadModelRequest) returns (LoadModelReply) {} rpc ReportWorkload(WorkloadStatsProto) returns (RpcReply) {} rpc KeepAlive(KeepAliveRequest) returns (RpcReply) {} - rpc LoadDependency(LoadDependencyRequest) returns (RpcReply) {} - rpc CurRps(CurRpsRequest) returns (RpcReply) {} } service FrontendCtrl { rpc UpdateModelRoutes(ModelRouteUpdates) returns (RpcReply) {} rpc CheckAlive(CheckAliveRequest) returns (RpcReply) {} - //rpc UpdateModelLatencies(ModelLatencyUpdates) returns (RpcReply) {} } service BackendCtrl { @@ -39,8 +36,6 @@ enum CtrlStatus { SERVICE_UNAVAILABLE = 1; // Model not found MODEL_NOT_FOUND = 2; - //Frontend not found - FRONTEND_NOT_FOUND = 8; // No available backends NOT_ENOUGH_BACKENDS = 3; // Model session not loaded @@ -62,8 +57,6 @@ enum CtrlStatus { CTRL_FRONTEND_NODE_ID_CONFLICT = 300; CTRL_INVALID_LOAD_MODEL_REQUEST = 301; - - CTRL_PROFILER_UNAVALIABLE = 302; } message RpcReply { @@ -120,8 +113,7 @@ message LoadModelRequest { uint32 node_id = 1; ModelSession model_session = 2; double estimate_workload = 3; - bool complex_query = 4; - uint32 estimate_latency = 5; + //uint32 num_gpus = 4; } message LoadModelReply { @@ -129,14 +121,6 @@ message LoadModelReply { ModelRouteProto model_route = 2; } -message LoadDependencyRequest { - uint32 node_id = 1; - LoadDependencyProto dependency = 2; -} - -message CurRpsRequest { - CurRpsProto cur_rps = 1; -} message ModelInstanceConfig { repeated ModelSession model_session = 1; uint32 batch = 2; @@ -185,4 +169,4 @@ message UtilizationReply { uint32 node_id = 1; double utilization = 2; int32 valid_ms = 3; -} +} \ No newline at end of file diff --git a/src/nexus/proto/nnquery.proto b/src/nexus/proto/nnquery.proto index b113af4..3768ac1 100644 --- a/src/nexus/proto/nnquery.proto +++ b/src/nexus/proto/nnquery.proto @@ -108,32 +108,6 @@ message ModelSession { // otherwise ignored uint32 image_height = 10; uint32 image_width = 11; - - uint32 estimate_latency = 12; -} -message Edge { - ModelSession v1 = 1; - ModelSession v2 = 2; -} - -message LoadDependencyProto { - - uint32 n = 2; - uint32 m = 3; - double latency = 4; - repeated ModelSession models = 10; - repeated Edge edges = 11; -} - -message ModelRps { - double rps = 1; - string model = 2; -} -message CurRpsProto { - uint32 node_id = 1; - uint32 n = 2; - double interval = 3; - repeated ModelRps model_rps = 10; } message QueryProto { diff --git a/src/nexus/scheduler/backend_delegate.cpp b/src/nexus/scheduler/backend_delegate.cpp index 5e286e9..6f5321e 100644 --- a/src/nexus/scheduler/backend_delegate.cpp +++ b/src/nexus/scheduler/backend_delegate.cpp @@ -125,18 +125,14 @@ bool BackendDelegate::Assign(const BackendDelegate& other) { bool BackendDelegate::PrepareLoadModel( const ModelSession& model_sess, double workload, InstanceInfo* inst_info, double* occupancy) const { - LOG(INFO) << "[---PrepareLoadModel---]"; - LOG(INFO) << "[---model sla---]" << model_sess.latency_sla(); - if (workload_id_ >= 0 || Occupancy() == 1.0) { + if (workload_id_ >= 0 || Occupancy() >= 1.0) { // Static configured backend or fully occupied backend cannot load a new // model - LOG(INFO) << "[---workload id >= 0---]"; return false; } std::string model_sess_id = ModelSessionToString(model_sess); if (session_model_map_.count(model_sess_id) > 0) { // Already load this model session - LOG(INFO) << "[---model_sess exist---]"; return false; } std::string profile_id = ModelSessionToProfileID(model_sess); @@ -144,7 +140,6 @@ bool BackendDelegate::PrepareLoadModel( profile_id); if (profile == nullptr) { // Cannot find model profile for this GPU - LOG(INFO) << "[---profile not exist---]"; return false; } inst_info->model_sessions.push_back(model_sess); @@ -152,7 +147,6 @@ bool BackendDelegate::PrepareLoadModel( // Compute the best batch size for the workload ComputeBatchSize(inst_info, workload); if (inst_info->batch == 0) { - LOG(INFO) << "[---batch == 0---]"; return false; } // Compute new duty cycle and new exec cycle if we load this model @@ -169,7 +163,6 @@ bool BackendDelegate::PrepareLoadModel( if (res.exec_cycle_us > res.duty_cycle_us) { // Doesn't have enough spare cycles to load this workload - LOG(INFO) << "[---new_exec_cycle > new_duty_cycle---]"; return false; } *occupancy = res.exec_cycle_us / res.duty_cycle_us; @@ -520,14 +513,12 @@ bool BackendDelegate::IsIdle() const { void BackendDelegate::ComputeBatchSize(InstanceInfo* inst_info, double workload) const { // 1. Compute the max batch and throughput to saturate an empty GPU - LOG(INFO) << "[---ComputeBatchSize---]"; uint32_t batch, max_batch; double max_throughput; max_batch = inst_info->profile->GetMaxBatch( inst_info->model_sessions[0].latency_sla()); max_throughput = max_batch * 1e6 / inst_info->profile->GetForwardLatency( max_batch); - LOG(INFO) << "[---max_batch & max_thput---]"<profile->GetMaxThroughput( // inst_info->model_session.latency_sla()); if (workload == 0 || workload >= max_throughput) { @@ -553,20 +544,15 @@ void BackendDelegate::ComputeBatchSize(InstanceInfo* inst_info, // because batch = ceil(workload * duty_cycle), // duty_cycle >= (batch - 1) / workload double min_duty_cycle = (batch - 1) * 1e6 / workload; - LOG(INFO) << "[---latency_sla---]" << latency_sla_us; - LOG(INFO) << "[---data---]" < latency_sla_us) { break; } - } --batch; if (batch == 0) { // This GPU is too slow so that exec latency of batch 1 is too large to // satisfy latency_sla inst_info->batch = 0; - LOG(INFO) << "[---GPU is too slow---]"; return; } inst_info->batch = batch; diff --git a/src/nexus/scheduler/complex_query.cpp b/src/nexus/scheduler/complex_query.cpp deleted file mode 100644 index 0e9ac6d..0000000 --- a/src/nexus/scheduler/complex_query.cpp +++ /dev/null @@ -1,328 +0,0 @@ -#include "nexus/scheduler/complex_query.h" -#include -namespace nexus { -namespace scheduler { -void RpsRecord::init(std::map models_id) { - models_id_ = models_id; - max_size_ = 30; - begin_ = end_ = 0; - uint n = models_id_.size(); - models_rps_.resize(n + 1); - len_ = 1; - for (int i = 0; i <= n; i++) { - models_rps_[i].resize(max_size_ * 2 + 1); - if(i == 0) { - models_rps_[i][0] = 100; //ms - } - else { - models_rps_[i][0] = 0; - } - } -} -void RpsRecord::add(const CurRpsProto& request) { - float interval = request.interval(); - uint32_t n = request.n(); - models_rps_[0].push_back(interval); - end_ += 1; - for (uint i = 0; i < n; i++) { - uint32_t id = models_id_[request.model_rps(i).model()]; - models_rps_[id][end_] = request.model_rps(i).rps(); - } - len_++; - if(len_ > max_size_) { - begin_ ++; - } - if(begin_ >= max_size_) { - int n = models_id_.size(); - for (uint i = 0; i <= n; i++) { - for (uint j = 0; j < len_; j++) { - models_rps_[i][j] = models_rps_[i][j + begin_]; - models_rps_[i][j + begin_] = 0; - } - } - begin_ = 0; - end_ = len_ - 1; - } - //return true; -} -std::vector RpsRecord::getRecord() { - LOG(INFO) << "[---GetRecord start---]"; - std::vector ret; - float total = 0.0; - for (uint i = begin_; i <= end_; i++) { - total += models_rps_[0][i]; - } - LOG(INFO) << "[---got total---]" << total; - uint n = models_id_.size(); - for (uint i = 1; i <= n; i++) { - float mean = 0.0, std = 0.0; - for (uint j = begin_; j <= end_; j++) { - mean += models_rps_[i][j] * models_rps_[0][j]; - } - mean /= total; - for (uint j = begin_; j <= end_; j++) { - std += models_rps_[0][i] * sqr(models_rps_[i][j] / models_rps_[0][i] - mean); - } - std /= total; - std = std::sqrt(std); - ret.push_back(mean + std); - } - return ret; -} - -void QuerySplit::init(int n) { - workloads_.resize(n); -} -void QuerySplit::addModel(ModelSession model, float lat) { - models_.push_back(model); - latencys_.push_back(lat); -} -void QuerySplit::updateLatencys(std::vector latencys) { - for (int i = 0; i < models_.size(); i++) { - last_latencys_[i] = latencys_[i]; - latencys_[i] = latencys[i]; - } -} - -void QuerySplit::updateWorkloads(std::vector workloads) { - for (int i = 0; i < models_.size(); i++) { - workloads_[i] = workloads[i]; - } -} -std::vector QuerySplit::constructSplit(std::vector latencys) { - std::vector ret; - for (int i = 0; i < models_.size(); i++) { - ModelSession tmp = models_[i]; - tmp.set_latency_sla(latencys[i]); - ret.push_back(tmp); - } - return ret; -} -std::vector QuerySplit::last_subscribe_models() { - std::vector ret; - for (int i = 0; i < models_.size(); i++) { - ModelSession tmp = models_[i]; - tmp.set_latency_sla(last_latencys_[i]); - ret.push_back(tmp); - } - return ret; -} - -std::vector QuerySplit::cur_subscribe_models() { - std::vector ret; - for (int i = 0; i < models_.size(); i++) { - ModelSession tmp = models_[i]; - tmp.set_latency_sla(latencys_[i]); - ret.push_back(tmp); - } - return ret; -} -std::vector QuerySplit::cur_workloads() { - return workloads_; -} - -float ComplexQuery::structure(int n) { - LOG(INFO) << "[---Structure---]"; - for (int i = 0; i <= n; i++) { - depth_.push_back(0); - if(i != 0 && degrees_[i] == 0) { - redges_[i].push_back(0); - edges_[0].push_back(i); - degrees_[i] ++; - } - } - LOG(INFO) << "[---Data---degrees_---]"; - for (int i = 0; i <= n; i++) { - LOG(INFO) << "[---deg["< deg(degrees_); - node_.resize(n + 1); - node_[0] = 0; - uint32_t maxn = 0; - while(l != r) { - uint32_t x = node_[++l]; - LOG(INFO) << "[---bfs node_id---]" << x; - maxn = std::max(depth_[x], maxn); - for (int i = 0; i < edges_[x].size(); i++) { - uint32_t y = edges_[x][i]; - deg[y] --; - if(deg[y] == 0) { - r++; - depth_[y] = std::max(depth_[y], depth_[x] + 1); - node_[r] = y; - } - } - } - LOG(INFO) << "[---construct layers_---]"; - for (uint i = 0; i <= maxn; i++) { - layers_.push_back(std::vector()); - } - for (uint i = 0; i <= n; i++) { - layers_[depth_[i]].push_back(i); - } - diameter_ = maxn; - LOG(INFO) << "[---diameter_---]" << diameter_; -} -float ComplexQuery::gpu_need(std::vector sess, std::vector rps) { - float ret = 0.0, min_float = 1e-7, max_float = 1e7; - for (int i = 0; i < sess.size(); i++) { - float throughput = max_throughputs_[i + 1][sess[i].latency_sla()].first; - if(throughput <= min_float) { - ret += max_float; - } - else { - ret += rps[i] / throughput; - } - } - return ret; -} -QuerySplit* ComplexQuery::split() { - LOG(INFO) << "[---complex query split---]"; - float max_float = 10000000.0; - std::vector cur_rps = rps_record_.getRecord(); - LOG(INFO) << "[---Got rpsRecord---]"; - uint m = latency_ / step_; - LOG(INFO) << "[---m = lat / step---]" << m; - std::vector > layer_gpus, dp; - std::vector > last_layer_lats; - LOG(INFO) << "[---diameter_---]" << diameter_; - for (uint i = 0; i <= diameter_; i++) { - dp.push_back(std::vector()); - last_layer_lats.push_back(std::vector()); - layer_gpus.push_back(std::vector()); - LOG(INFO) << "[---i---]" << i; - for (uint j = 0; j <= m; j++) { - LOG(INFO) << "[---j---]" << i; - last_layer_lats[i].push_back(0); - if(i == 0) { - dp[i].push_back(0.0); - } - else { - dp[i].push_back(max_float); - } - if(j == 0) { - layer_gpus[i].push_back(max_float); - continue; - } - if(i == 0) { - layer_gpus[i].push_back(0.0); - continue; - } - layer_gpus[i].push_back(0.0); - for (uint k = 0; k < layers_[i].size(); k++) { - uint node = layers_[i][k]; - LOG(INFO) << "[---node---]" << node; - LOG(INFO) << "[---throughput---]" << max_throughputs_[node][j].first; - float gpu = cur_rps[node - 1] / max_throughputs_[node][j].first; - layer_gpus[i][j] += gpu; - } - } - } - LOG(INFO) << "[---initialized---]"; - std::vector split; - for (uint i = 0; i <= n_; i++) { - split.push_back(0); - } - for (uint i = 1; i <= diameter_; i++) { - for(uint j = 1; j <= m; j++) { - for (uint k = 0; k < j; k++) { - float tmp = dp[i - 1][k] + layer_gpus[i][j - k]; - if(dp[i][j] > tmp) { - dp[i][j] = tmp; - last_layer_lats[i][j] = j - k; - } - } - if(dp[i][j] > dp[i][j - 1]) { - dp[i][j] = dp[i][j - 1]; - last_layer_lats[i][j] = last_layer_lats[i][j - 1]; - } - } - } - LOG(INFO) << "[---dp finished---]"; - uint32_t last_lat = m, cur_lat; - for (uint i = diameter_; i > 0; i--) { - cur_lat = last_layer_lats[i][last_lat]; - for (uint j = 0; j < layers_[i].size(); j++) { - split[layers_[i][j] - 1] = cur_lat; - } - last_lat -= cur_lat; - } - LOG(INFO) << "[---get latencies finished---]"; - //check: if current split is 10% better than last split - float n1 = gpu_need(query_split_.cur_subscribe_models(), cur_rps); - float n2 = gpu_need(query_split_.constructSplit(split), cur_rps); - LOG(INFO) << "[---get gpu need finished---]"; - query_split_.updateWorkloads(rps_record_.getRecord()); - if(n1 > n2 * 1.1) { - query_split_.setState(true); - query_split_.updateLatencys(split); - } - else { - query_split_.setState(false); - } - return &query_split_; -} - -CtrlStatus ComplexQuery::init(const LoadDependencyProto& request, std::string common_gpu) { - LOG(INFO) << "complex query init"; - common_gpu_ = common_gpu; - models_.push_back(""); - n_ = request.n(); - - int m = request.m(); - ori_latency_ = request.latency(); - for (uint i = 0; i < n_; i++) { - std::string model = ModelSessionToString(request.models(i)); - model_sessions_.push_back(request.models(i)); - models_id_[model] = i + 1; - models_.push_back(model); - } - edges_.resize(n_ + 1); - redges_.resize(n_ + 1); - - degrees_.resize(n_ + 1); - for (uint i = 0; i <= n_; i++) { - degrees_[i] = 0; - } - for (uint i = 0; i < m; i++) { - std::string model1 = ModelSessionToString(request.edges(i).v1()); - std::string model2 = ModelSessionToString(request.edges(i).v2()); - uint32_t v1 = models_id_[model1], v2 = models_id_[model2]; - edges_[v1].push_back(v2); - redges_[v2].push_back(v1); - degrees_[v2] ++; - } - step_ = std::max(std::ceil(std::sqrt(1.0 * n_ / 100000000.0) * ori_latency_), 1.0); - LOG(INFO) << "[---complex query step_ = step_---]" << step_; - m = ori_latency_ / step_; - latency_ = step_ * m; - max_throughputs_.push_back(std::vector>()); - for (uint i = 0; i < n_; i++) { - std::string profile_id = ModelSessionToProfileID(model_sessions_[i]); - auto profile = ModelDatabase::Singleton().GetModelProfile(common_gpu_, profile_id); - if (profile == nullptr) { - // Cannot find model profile - return CTRL_PROFILER_UNAVALIABLE; - } - max_throughputs_.push_back(std::vector>()); - max_throughputs_[i + 1].push_back(std::make_pair(0.0, 0)); - for (uint j = 1; j <= m; j++) { - uint32_t lat = j * step_; - max_throughputs_[i + 1].push_back(profile->GetMaxThroughput(lat)); - } - } - structure(n_); - query_split_.init(n_); - for (uint i = 0; i < n_; i++) { - query_split_.addModel(request.models(i), request.models(i).estimate_latency()); - } - rps_record_.init(models_id_); - return CTRL_OK; -} -void ComplexQuery::addRecord(const CurRpsProto& request) { - rps_record_.add(request); -} -} -} diff --git a/src/nexus/scheduler/complex_query.h b/src/nexus/scheduler/complex_query.h deleted file mode 100644 index 277327b..0000000 --- a/src/nexus/scheduler/complex_query.h +++ /dev/null @@ -1,107 +0,0 @@ -#ifndef NEXUS_SCHEDULER_COMPLEXQUERY_H_ -#define NEXUS_SCHEDULER_COMPLEXQUERY_H_ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "nexus/common/rpc_call.h" -#include "nexus/common/rpc_service_base.h" -#include "nexus/common/config.h" -#include "nexus/common/model_db.h" -#include "nexus/common/model_def.h" -#include "nexus/common/util.h" -#include "nexus/proto/control.grpc.pb.h" -#include "nexus/proto/nnquery.pb.h" - -namespace nexus { -namespace scheduler { - -class RpsRecord { - public: - RpsRecord() {} - void init(std::map models_id); - void add(const CurRpsProto& request); - float sqr(float x) { - return x * x; - } - std::vector getRecord(); - private: - std::map models_id_; - std::vector > models_rps_; - uint32_t max_size_, begin_, end_, len_; -}; -class QuerySplit { - - public: - QuerySplit() {} - void init(int n); - void addModel(ModelSession model, float lat); - void updateLatencys(std::vector latencys); - - void updateWorkloads(std::vector workloads); - std::vector constructSplit(std::vector latencys); - - std::vector last_subscribe_models(); - - std::vector cur_subscribe_models(); - - std::vector cur_workloads(); - - void setState(bool state) { - state_ = state; - } - bool getState() { - return state_; - } - private: - std::vector models_; - std::vector latencys_; - std::vector last_latencys_; - std::vector workloads_; - bool state_; - -}; - -class ComplexQuery { - public: - ComplexQuery() {} - float structure(int n); - float gpu_need(std::vector sess, std::vector rps); - QuerySplit* split(); - CtrlStatus init(const LoadDependencyProto& request, std::string common_gpu); - void addRecord(const CurRpsProto& request); - private: - uint32_t ori_latency_; - uint32_t n_; - std::vector model_sessions_; - std::map models_id_; - std::vector > edges_; - std::vector > redges_; - std::vector > layers_; - std::vector models_; - std::vector intervals_; - std::vector degrees_; - std::vector depth_; - std::vector node_; - std::string common_gpu_; - uint32_t latency_; - uint32_t step_; - uint32_t diameter_; - std::vector> > max_throughputs_; - RpsRecord rps_record_; - QuerySplit query_split_; -}; -} -} -#endif diff --git a/src/nexus/scheduler/frontend_delegate.cpp b/src/nexus/scheduler/frontend_delegate.cpp index 7dce517..63ed2ab 100644 --- a/src/nexus/scheduler/frontend_delegate.cpp +++ b/src/nexus/scheduler/frontend_delegate.cpp @@ -9,7 +9,7 @@ namespace scheduler { FrontendDelegate::FrontendDelegate(uint32_t node_id, const std::string& ip, const std::string& server_port, const std::string& rpc_port, - int beacon_sec, std::string common_gpu): + int beacon_sec): node_id_(node_id), ip_(ip), server_port_(server_port), @@ -22,8 +22,6 @@ FrontendDelegate::FrontendDelegate(uint32_t node_id, const std::string& ip, grpc::InsecureChannelCredentials()); stub_ = FrontendCtrl::NewStub(channel); last_time_ = std::chrono::system_clock::now(); - common_gpu_ = common_gpu; - complexQuery_ = false; } std::time_t FrontendDelegate::LastAliveTime() { @@ -59,16 +57,7 @@ bool FrontendDelegate::IsAlive() { void FrontendDelegate::SubscribeModel(const std::string& model_session_id) { subscribe_models_.insert(model_session_id); } -CtrlStatus FrontendDelegate::LoadDependency(const LoadDependencyProto& request) { - complexQuery_ = true; - return query_.init(request, common_gpu_); -} -void FrontendDelegate::CurrentRps(const CurRpsProto& request){ - if(complexQuery_) { - query_.addRecord(request); - } -} CtrlStatus FrontendDelegate::UpdateModelRoutesRpc( const ModelRouteUpdates& request) { RpcReply reply; @@ -86,8 +75,6 @@ CtrlStatus FrontendDelegate::UpdateModelRoutesRpc( } return reply.status(); } -QuerySplit* FrontendDelegate::split() { - return query_.split(); -} + } // namespace scheduler } // namespace nexus diff --git a/src/nexus/scheduler/frontend_delegate.h b/src/nexus/scheduler/frontend_delegate.h index a2cafac..e4bea7e 100644 --- a/src/nexus/scheduler/frontend_delegate.h +++ b/src/nexus/scheduler/frontend_delegate.h @@ -7,10 +7,7 @@ #include #include - #include "nexus/proto/control.grpc.pb.h" -#include "nexus/proto/nnquery.pb.h" -#include "nexus/scheduler/complex_query.h" namespace nexus { namespace scheduler { @@ -21,7 +18,7 @@ class FrontendDelegate { public: FrontendDelegate(uint32_t node_id, const std::string& ip, const std::string& server_port, const std::string& rpc_addr, - int beacon_sec, std::string common_gpu); + int beacon_sec); uint32_t node_id() const { return node_id_; } @@ -38,14 +35,7 @@ class FrontendDelegate { } CtrlStatus UpdateModelRoutesRpc(const ModelRouteUpdates& request); - - CtrlStatus LoadDependency(const LoadDependencyProto& request); - - void CurrentRps(const CurRpsProto& request); - - bool containComplexQuery() {return complexQuery_;} - - QuerySplit* split(); + private: uint32_t node_id_; std::string ip_; @@ -53,13 +43,9 @@ class FrontendDelegate { std::string rpc_port_; int beacon_sec_; long timeout_ms_; - bool complexQuery_; - ComplexQuery query_; std::unique_ptr stub_; std::chrono::time_point last_time_; std::unordered_set subscribe_models_; - RpsRecord rpsRecord_; - std::string common_gpu_; }; } // namespace scheduler diff --git a/src/nexus/scheduler/rps_record.h b/src/nexus/scheduler/rps_record.h deleted file mode 100644 index c6c0abb..0000000 --- a/src/nexus/scheduler/rps_record.h +++ /dev/null @@ -1,99 +0,0 @@ -#ifndef NEXUS_SCHEDULER_RPSRECORD_H_ -#define NEXUS_SCHEDULER_RPSRECORD_H_ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "nexus/common/rpc_call.h" -#include "nexus/common/rpc_service_base.h" -#include "nexus/proto/control.grpc.pb.h" -#include "nexus/scheduler/backend_delegate.h" -#include "nexus/scheduler/frontend_delegate.h" -#include "nexus/scheduler/sch_info.h" -class RpsRecord { - public: - RpsRecord() {} - void init(std::map models_id, double split) { - this.models_id = models_id; - max_size = 30; - begin = end = 0; - int n = models_id.size(); - models_rps.resize(n + 1); - len = 1; - for (int i = 0; i <= n; i++) { - models_rps[i].resize(max_size * 2 + 1); - if(i == 0) { - models_rps[i][0] = 100; //ms - } - else { - models_rps[i][0] = split; - } - } - } - void add(const CurRpsRequest& request) { - double interval = request.interval; - uint32_t n = request.n; - models_rps[0].push(interval); - end += 1; - for (int i = 0; i < n; i++) { - uint32_t id = models_id[request.model_rps(i).model]; - models_rps[id][end] = request.model_rps(i).rps; - } - len++; - if(len > max_size) { - begin ++; - } - if(begin >= max_size) { - int n = models_id.size(); - for (int i = 0; i <= n; i++) { - for (int j = 0; j < len; j++) { - models_rps[i][j] = models_rps[i][j + begin]; - models_rps[i][j + begin] = 0; - } - } - begin = 0; - end = len - 1; - } - //return true; - } - double sqr(double x) { - return x * x; - } - std::vector getRecord() { - std::vector ret; - double total = 0.0; - for (int i = begin; i <= end; i++) { - total += models_rps[0][i]; - } - int n = models_id.size(); - for (int i = 1; i <= n; i++) { - double mean = 0.0, std = 0.0; - for (int j = begin; j <= end; j++) { - mean += models_rps[i][j] * models[0][j]; - } - mean /= total; - for (int j = begin; j <= end; j++) { - std += models_rps[0][i] * sqr(models_rps[i][j] / models_rps[0][i] - mean); - } - std /= total; - std = std::sqrt(std); - ret.push_back(mean + std); - } - return ret; - } - private: - std::map models_id; - std::vector > models_rps; - //std::queue intervals; - uint32_t max_size, begin, end, len; -} diff --git a/src/nexus/scheduler/sch_info.h b/src/nexus/scheduler/sch_info.h index 12d7f00..b578a6f 100644 --- a/src/nexus/scheduler/sch_info.h +++ b/src/nexus/scheduler/sch_info.h @@ -15,7 +15,6 @@ DECLARE_int32(avg_interval); - namespace nexus { namespace scheduler { diff --git a/src/nexus/scheduler/scheduler.cpp b/src/nexus/scheduler/scheduler.cpp index df2ad4b..32b85d4 100644 --- a/src/nexus/scheduler/scheduler.cpp +++ b/src/nexus/scheduler/scheduler.cpp @@ -25,8 +25,6 @@ INSTANTIATE_RPC_CALL(AsyncService, Unregister, UnregisterRequest, RpcReply); INSTANTIATE_RPC_CALL(AsyncService, LoadModel, LoadModelRequest, LoadModelReply); INSTANTIATE_RPC_CALL(AsyncService, ReportWorkload, WorkloadStatsProto, RpcReply); INSTANTIATE_RPC_CALL(AsyncService, KeepAlive, KeepAliveRequest, RpcReply); -INSTANTIATE_RPC_CALL(AsyncService, LoadDependency, LoadDependencyRequest, RpcReply); -INSTANTIATE_RPC_CALL(AsyncService, CurRps, CurRpsRequest, RpcReply); Scheduler::Scheduler(std::string port, size_t nthreads) : AsyncRpcServiceBase(port, nthreads), @@ -101,12 +99,11 @@ void Scheduler::Register(const grpc::ServerContext& ctx, request.node_id(), ip, request.server_port(), request.rpc_port(), request.gpu_device_name(), request.gpu_available_memory(), beacon_interval_sec_); - common_gpu_ = request.gpu_device_name(); RegisterBackend(std::move(backend), reply); } else { // FRONTEND_NODE auto frontend = std::make_shared( request.node_id(), ip, request.server_port(), request.rpc_port(), - beacon_interval_sec_, common_gpu_); + beacon_interval_sec_); RegisterFrontend(std::move(frontend), reply); } } @@ -122,35 +119,12 @@ void Scheduler::Unregister(const grpc::ServerContext& ctx, } reply->set_status(CTRL_OK); } -void Scheduler::LoadDependency(const grpc::ServerContext& ctx, - const LoadDependencyRequest& request, - RpcReply* reply) { - uint32_t node_id = request.node_id(); - auto frontend = GetFrontend(node_id); - if(frontend == nullptr) { - reply->set_status(FRONTEND_NOT_FOUND); - return; - } - reply->set_status(frontend->LoadDependency(request.dependency())); -} -void Scheduler::CurRps(const grpc::ServerContext& ctx, - const CurRpsRequest& request, - RpcReply* reply) { - uint32_t node_id = request.cur_rps().node_id(); - auto frontend = GetFrontend(node_id); - if(frontend == nullptr) { - reply->set_status(FRONTEND_NOT_FOUND); - return; - } - frontend->CurrentRps(request.cur_rps()); -} void Scheduler::LoadModel(const grpc::ServerContext& ctx, const LoadModelRequest& request, LoadModelReply* reply) { ModelSession model_sess(request.model_session()); { - LOG(INFO) << "[---latency & esti_latency---]" << request.model_session().latency_sla() << " " << request.model_session().estimate_latency(); auto info = ModelDatabase::Singleton().GetModelInfo(ModelSessionToModelID(model_sess)); if (info == nullptr) { reply->set_status(MODEL_NOT_FOUND); @@ -280,40 +254,22 @@ void Scheduler::LoadModel(const grpc::ServerContext& ctx, } while (false); // Find best-fit backends to serve the workload - ModelSession real_model_sess = model_sess; - std::string real_model_sess_id = ModelSessionToString(model_sess); - if(request.complex_query()) { - real_model_sess.set_latency_sla(request.model_session().estimate_latency()); - real_model_sess_id = ModelSessionToString(real_model_sess); - } - LOG(INFO) << "[---latency & esti_latency---]" << real_model_sess.latency_sla() << " " << real_model_sess.latency_sla(); - LOG(INFO) << "[---Scheduler Load Model ing--- find best backend ---]"; std::vector > assign_backends; std::unordered_set used; if (workload == 0) { BackendDelegatePtr backend; InstanceInfo inst_info; - if(request.complex_query()){ - FindBestBackend(real_model_sess, workload, used, &backend, &inst_info); - } else { - FindBestBackend(real_model_sess, workload, used, &backend, &inst_info); - } + FindBestBackend(model_sess, workload, used, &backend, &inst_info); if (backend == nullptr) { - LOG(INFO) << "[---backend is null---]"; reply->set_status(NOT_ENOUGH_BACKENDS); return; } assign_backends.emplace_back(backend, inst_info); } else { while (workload > 1e-3) { - LOG(INFO) << "[---Find backend remind workload---]" << workload; BackendDelegatePtr backend; InstanceInfo inst_info; - if(request.complex_query()) { - FindBestBackend(real_model_sess, workload, used, &backend, &inst_info); - } else { - FindBestBackend(real_model_sess, workload, used, &backend, &inst_info); - } + FindBestBackend(model_sess, workload, used, &backend, &inst_info); if (backend == nullptr) { reply->set_status(NOT_ENOUGH_BACKENDS); return; @@ -334,14 +290,14 @@ void Scheduler::LoadModel(const grpc::ServerContext& ctx, session_info->backend_weights.emplace(backend->node_id(), inst_info.GetWeight()); } - session_info->model_sessions.push_back(real_model_sess); - session_table_.emplace(real_model_sess_id, session_info); + session_info->model_sessions.push_back(model_sess); + session_table_.emplace(model_sess_id, session_info); frontend->SubscribeModel(model_sess_id); session_info->SubscribeModelSession(frontend->node_id(), model_sess_id); // Fill route table in the reply reply->set_status(CTRL_OK); - GetModelRoute(real_model_sess_id, reply->mutable_model_route()); + GetModelRoute(model_sess_id, reply->mutable_model_route()); } void Scheduler::ReportWorkload(const grpc::ServerContext& ctx, @@ -395,10 +351,6 @@ void Scheduler::HandleRpcs() { std::bind(&Scheduler::ReportWorkload, this, _1, _2, _3)); new KeepAlive_Call(&service_, cq_.get(), std::bind(&Scheduler::KeepAlive, this, _1, _2, _3)); - new LoadDependency_Call(&service_, cq_.get(), - std::bind(&Scheduler::LoadDependency, this, _1, _2, _3)); - new CurRps_Call(&service_, cq_.get(), - std::bind(&Scheduler::CurRps, this, _1, _2, _3)); void* tag; bool ok; while (running_) { @@ -708,23 +660,18 @@ void Scheduler::FindBestBackend( const ModelSession& model_sess, double request_rate, const std::unordered_set& skips, BackendDelegatePtr* best_backend, InstanceInfo* inst_info) { - LOG(INFO) << "[---FindBestBackend---]"; using ModelLoad = std::tuple; ModelLoad max_tp_load; ModelLoad max_occ_load; for (auto iter : backends_) { - LOG(INFO) << "[---each backend---]"; auto backend = iter.second; if (skips.find(backend->node_id()) != skips.end()) { - LOG(INFO) << "[---skip backend]"; continue; } if (!backend->IsAlive() || backend->workload_id() >= 0) { - LOG(INFO) << "[---backend !alive OR workload_id >= 0]"; continue; } if (std::fabs(request_rate) < 1e-3 && !backend->IsIdle()) { - LOG(INFO) << "[---request rate =0 OR isIdle]"; continue; } InstanceInfo tmp_info; @@ -734,7 +681,6 @@ void Scheduler::FindBestBackend( if (!ret) { continue; } - LOG(INFO) << "[---There is a backend---]"; if (std::get<0>(max_tp_load) == nullptr || tmp_info.throughput > std::get<1>(max_tp_load).throughput) { max_tp_load = std::make_tuple(backend, tmp_info, occupancy); @@ -778,7 +724,7 @@ bool Scheduler::BeaconCheck() { ", last alive time: " << std::ctime(&last_time); RemoveFrontend(frontend); } - + // 2. Aggregate model session rps for (auto iter : session_table_) { const auto& model_sess_id = iter.first; @@ -836,6 +782,7 @@ bool Scheduler::BeaconCheck() { } return trigger; } + void Scheduler::EpochSchedule() { std::lock_guard lock(mutex_); std::unordered_set > visited; @@ -843,111 +790,6 @@ void Scheduler::EpochSchedule() { std::vector overload_backends; VLOG(1) << "Epoch schedule"; - // 0.5. Complex query - std::vector frontends; - for (auto it : frontends_) { - auto frontend = it.second; - if(frontend->containComplexQuery()) { - auto query_split = frontend->split(); - if(query_split->getState() == false) { - //Efficience optimization is less than 10% - continue; - } - //delete expired sessions - for (auto model_sess: query_split->last_subscribe_models()) { - auto model_sess_id = ModelSessionToString(model_sess); - ServerList& subs = session_subscribers_.at(model_sess_id); - subs.erase(frontend->node_id()); - auto session_info = session_table_.at(model_sess_id); - if (subs.empty()) { - if (session_info->has_static_workload) { - continue; - } - LOG(INFO) << "Remove model session: " << model_sess_id; - RemoveFromSessionGroup(&session_info->model_sessions, model_sess_id); - for (auto iter : session_info->backend_throughputs) { - auto backend = GetBackend(iter.first); - backend->UnloadModel(model_sess_id); - } - session_table_.erase(model_sess_id); - } - else { - changed_sessions.insert(session_info); - } - } - //add new session - std::vector workloads = query_split->cur_workloads(); - uint id = -1; - for (auto model_sess: query_split->cur_subscribe_models()) { - id += 1; - auto info = ModelDatabase::Singleton().GetModelInfo( - ModelSessionToModelID(model_sess)); - if ((*info)["resizable"] && (*info)["resizable"].as()) { - if (model_sess.image_height() == 0) { - // Set default image size for resizable CNN - model_sess.set_image_height((*info)["image_height"].as()); - model_sess.set_image_width((*info)["image_width"].as()); - } - } - std::string model_sess_id = ModelSessionToString(model_sess); - float workload = workloads[id]; - - std::lock_guard lock(mutex_); - if (session_table_.find(model_sess_id) != session_table_.end()) { - // TODO: For now, if model session is already loaded, don't allocate - // new backends, just rely on epoch scheduling - if (session_subscribers_.count(model_sess_id) == 0) { - session_subscribers_.emplace(model_sess_id, ServerList{frontend->node_id()}); - } else { - session_subscribers_.at(model_sess_id).insert(frontend->node_id()); - } - } - else { - std::vector > assign_backends; - std::unordered_set used; - if (workload == 0) { - BackendDelegatePtr backend; - InstanceInfo inst_info; - FindBestBackend(model_sess, workload, used, &backend, &inst_info); - if (backend == nullptr) { - LOG(INFO) << "Not enough backends for model session: " << model_sess_id; - } - else { - assign_backends.emplace_back(backend, inst_info); - } - } else { - while (workload > 0) { - BackendDelegatePtr backend; - InstanceInfo inst_info; - FindBestBackend(model_sess, workload, used, &backend, &inst_info); - if (backend == nullptr) { - LOG(INFO) << "Not enough backends for model session: " << model_sess_id; - break; - } - assign_backends.emplace_back(backend, inst_info); - used.insert(backend->node_id()); - workload -= inst_info.throughput; - } - } - - // Load models - auto session_info = std::make_shared(); - for (auto iter : assign_backends) { - auto backend = iter.first; - auto const& inst_info = iter.second; - backend->LoadModel(inst_info); - backend->UpdateModelTableRpc(); - session_info->backend_throughputs.emplace(backend->node_id(), - inst_info.throughput); - } - session_info->model_sessions.push_back(model_sess); - session_table_.emplace(model_sess_id, session_info); - changed_sessions.insert(session_info); - session_subscribers_.emplace(model_sess_id, ServerList{frontend->node_id()}); - } - } - } - } // 1. Adjust the GPU allocation based on the workload for (auto iter : session_table_) { auto const& model_sess_id = iter.first; diff --git a/src/nexus/scheduler/scheduler.h b/src/nexus/scheduler/scheduler.h index 0a993d1..aafc629 100644 --- a/src/nexus/scheduler/scheduler.h +++ b/src/nexus/scheduler/scheduler.h @@ -69,12 +69,6 @@ class Scheduler : public AsyncRpcServiceBase { */ void Unregister(const grpc::ServerContext& ctx, const UnregisterRequest& request, RpcReply* reply); - - void CurRps(const grpc::ServerContext& ctx, - const CurRpsRequest& request, RpcReply* reply); - - void LoadDependency(const grpc::ServerContext& ctx, - const LoadDependencyRequest& request, RpcReply* reply); /*! * \brief Handles LoadModel RPC. * @@ -282,8 +276,6 @@ class Scheduler : public AsyncRpcServiceBase { std::unordered_map session_table_; /*! \brief Mutex for accessing internal data */ std::mutex mutex_; - - std::string common_gpu_; }; } // namespace scheduler diff --git a/tests/python/generator.py b/tests/python/generator.py deleted file mode 100644 index 50ee461..0000000 --- a/tests/python/generator.py +++ /dev/null @@ -1,118 +0,0 @@ -import os -import glob -import time -import random -import logging -from threading import Thread -from multiprocessing import Queue -from datetime import datetime -import numpy as np -from grpc.beta import implementations -import tensorflow as tf -from tensorflow_serving.apis import predict_pb2 -from tensorflow_serving.apis import prediction_service_pb2 - - -class Dataset(object): - def __init__(self, data_dir, max_count=1000): - self.images = [] - for fn in os.listdir(data_dir): - with open(os.path.join(data_dir, fn), 'rb') as f: - self.images.append(f.read()) - if max_count > 0 and len(self.images) >= max_count: - break - - def rand_idx(self): - return random.randint(0, len(self.images) - 1) - - -class Worker(Thread): - def __init__(self, idx, dataset, queue, output, app_id): - super(Worker, self).__init__() - self.daemon = True - self.index = idx - self.dataset = dataset - self.queue = queue - self.lats = [] - self.img_idx = random.randint(0, len(self.dataset.images) - 1) - self.output= output - self.app_id = app_id - - - def run(self): - host = "localhost" - port = "9000" - channel = implementations.insecure_channel(host, int(port)) - stub = prediction_service_pb2.beta_create_PredictionService_stub(channel) - request = predict_pb2.PredictRequest() - request.model_spec.name = 'vgg'+str(self.app_id) - request.model_spec.signature_name = 'predict_images' - while True: - idx = self.queue.get() - if idx == -1: - break - img = self.dataset.images[self.img_idx] - request.inputs['images'].CopyFrom( - tf.contrib.util.make_tensor_proto(img, shape=[1])) - start = datetime.now() - try: - result = stub.Predict(request, 5.0) - except: - self.lats.append((self.img_idx, 100000.0)) - else: - end = datetime.now() - lat = (end - start).total_seconds() * 1000.0 - self.lats.append((self.img_idx, lat)) - - self.img_idx = (self.img_idx + 1) % len(self.dataset.images) - -class Generator(object): - def __init__(self, dataset, output, app_id): - self.dataset = dataset - self.queue = Queue() - self.workers = [] - self.beg = None - for i in range(0, 500): - worker = Worker(i, dataset, self.queue, output, app_id) - worker.start() - self.workers.append(worker) - - def run(self, rate, duration): - count = 0 - gap = 1. / rate - total = duration * rate - beg = time.time() - self.beg = time.time() - logging.info('Start sending request at {} req/s'.format(rate)) - while True: - now = time.time() - while count * gap <= now - beg: - self.queue.put(1) - count += 1 - now = time.time() - if count >= total: - break - if count >= total or now - beg >= duration: - break - to_sleep = beg + count * gap - now - if to_sleep > 0: - time.sleep(to_sleep) - elapse = time.time() - self.beg - logging.info('Generate {} requests in {} sec, rate: {} req/s'.format( - count, elapse, float(count) / elapse)) - - def stop_all(self): - for _ in range(len(self.workers)): - self.queue.put(-1) - for t in self.workers: - t.join() - elapse = time.time() - self.beg - logging.info('Finished all requests in {} sec'.format(elapse)) - - def output_latencies(self, output): - self.stop_all() - with open(output, 'a') as fout: - for worker in self.workers: - for img_idx, lat in worker.lats: - fout.write('%s\n' % (lat)) - logging.info('Output latencies to %s' % output) diff --git a/tests/python/traffic_client.py b/tests/python/traffic_client.py deleted file mode 100644 index 247010c..0000000 --- a/tests/python/traffic_client.py +++ /dev/null @@ -1,141 +0,0 @@ -import os -import sys -import numpy as np -import argparse -import random -from threading import Thread -import nexus - -from generator import * - -class NexusClient(ClientBase): - def __init__(self, service_addr, dataset): - user_id = random.randint(1, 10000000) - self.client = nexus.Client(service_addr, user_id) - self.dataset = dataset - self.total_images = len(dataset.images) - self.img_idx = random.randint(0, self.total_images-1) - - def query(self): - img = self.dataset.images[self.img_idx] - self.img_idx = (self.img_idx + 1) % self.total_images - reply = self.client.request(img) - if reply is None: - return (0, []) - if reply.status != 0: - return (-reply.status, reply.query_latency) - return (reply.latency_us, reply.query_latency) - - -def run_test(server_addr, dataset, rate, duration, output, nthreads): - clients = [] - for _ in range(nthreads): - client = NexusClient(server_addr, dataset) - clients.append(client) - gen = Generator(clients) - gen.run(rate, duration) - gen.output_latencies(output) - -def run(sla, rps, duration, dataset): - print('Test rps %s' % rps) - nthreads = int(rps * float(sla) / 1e3 * 1.1) - print('Number of threads: %s' % nthreads) - - output = 'traffic_sla%s_rps%s.txt' % (sla, rps) - thread = Thread(target=run_test, - args=('127.0.0.1:9001', dataset, rps, duration, output, nthreads)) - thread.daemon = True - thread.start() - thread.join() - total, good, drop, noresp, lat_50, lat_99, lat_max = parse_result(output, sla) - percent = float(good) / total - print('Traffic: %.2f%% (total/good/drop/noresp: %s/%s/%s/%s), 50th/99th/max latency: %s/%s/%s ms' % ( - percent*100, total, good, drop, noresp, lat_50, lat_99, lat_max)) - ret = True - if percent < .99: - ret = False - retry = True - if percent < .98: - retry = False - return ret, retry - -def run_with_retry(sla, rps, duration, dataset): - retry = 0 - while retry < 3: - good, flag = run(sla, rps, duration, dataset) - if good: - return True - if not flag: - return False - retry += 1 - return False - -def eval_traffic(sla, base_rps): - duration = 60 - #datapath = os.path.join(os.path.expanduser("~"), 'datasets/traffic/jackson_day') - datapath = os.path.join(os.path.expanduser("~"), 'datasets/traffic/jackson_night') - print('Dataset: %s' % datapath) - dataset = Dataset(datapath) - print('Latency sla: %s ms' % sla) - - min_rps = base_rps - max_rps = base_rps - while True: - good = run_with_retry(sla, max_rps, duration, dataset) - if not good: - break - min_rps = max_rps - max_rps += 40 - while max_rps - min_rps > 1: - rps = (max_rps + min_rps) / 2 - good = run_with_retry(sla, rps, duration, dataset) - if good: - min_rps = rps - else: - max_rps = rps - print('Max throughput: %s' % min_rps) - - -def automatic(): - if len(sys.argv) < 3: - print('%s sla base_rps' % sys.argv[0]) - exit() - sla = int(sys.argv[1]) - base_rps = eval(sys.argv[2]) - # print('sla: %s' % sla) - # print('num of models: %s' % n) - # print('share prefix: %s' % share_prefix) - eval_traffic(sla, base_rps) - - -def manual(): - if len(sys.argv) < 4: - print('%s sla n rate [share_prefix]' % sys.argv[0]) - exit() - sla = int(sys.argv[1]) - n = int(sys.argv[2]) - rate = eval(sys.argv[3]) - if len(sys.argv) > 4: - share_prefix = (int(sys.argv[4]) == 1) - else: - share_prefix = False - print('sla: %s' % sla) - print('num of models: %s' % n) - print('rate: %s' % rate) - print('share prefix: %s' % share_prefix) - - duration = 60 - datapath = os.path.join(os.path.expanduser("~"), 'datasets/vgg_face') - dataset = Dataset(datapath) - - run(sla, n, rate, share_prefix, duration, dataset) - - -if __name__ == "__main__": - - FORMAT = "[%(asctime)-15s %(levelname)s] %(message)s" - logging.basicConfig(format=FORMAT) - logging.getLogger().setLevel(logging.INFO) - - #manual() -automatic()