Skip to content

Commit

Permalink
simplify router
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Jan 27, 2024
1 parent 0445168 commit ae6c005
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 74 deletions.
10 changes: 5 additions & 5 deletions examples/client/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ void test_echo() {
}

{
auto result = client.call<std::string>("async_echo", "test");
auto result = client.call<std::string>("delay_echo", "test");
std::cout << result << std::endl;
}
}
Expand Down Expand Up @@ -349,19 +349,19 @@ void test_callback() {
rpc_client client;
bool r = client.connect("127.0.0.1", 9000);

for (size_t i = 0; i < 100; i++) {
for (size_t i = 0; i < 10; i++) {
std::string test = "test" + std::to_string(i + 1);
// set timeout 100ms
client.async_call<100>(
"async_echo",
client.async_call<10000>(
"delay_echo",
[](const asio::error_code &ec, string_view data) {
if (ec) {
std::cout << ec.value() << " timeout" << std::endl;
return;
}

auto str = as<std::string>(data);
std::cout << "echo " << str << '\n';
std::cout << "delay echo " << str << '\n';
},
test);

Expand Down
14 changes: 7 additions & 7 deletions examples/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ std::string get_name(rpc_conn conn, const person &p) {

// if you want to response later, you can use async model, you can control when
// to response
void async_echo(rpc_conn conn, const std::string &src) {
auto req_id =
conn.lock()->request_id(); // note: you need keep the request id at that
// time, and pass it into the async thread

void delay_echo(rpc_conn conn, const std::string &src) {
auto sp = conn.lock();
sp->set_delay(true);
auto req_id = sp->request_id(); // note: you need keep the request id at that
// time, and pass it into the async thread
std::thread thd([conn, req_id, src] {
std::this_thread::sleep_for(std::chrono::seconds(1));
auto conn_sp = conn.lock();
Expand Down Expand Up @@ -121,7 +121,7 @@ dummy1 get_dummy(rpc_conn conn, dummy1 d) { return d; }

int main() {
// benchmark_test();
rpc_server server(9000, std::thread::hardware_concurrency());
rpc_server server(9000, std::thread::hardware_concurrency(), 3600);

dummy d;
server.register_handler("add", &dummy::add, &d);
Expand All @@ -135,7 +135,7 @@ int main() {
server.register_handler("upload", upload);
server.register_handler("download", download);
server.register_handler("get_name", get_name);
server.register_handler<Async>("async_echo", async_echo);
server.register_handler("delay_echo", delay_echo);
server.register_handler("echo", echo);
server.register_handler("get_int", get_int);

Expand Down
13 changes: 11 additions & 2 deletions include/rest_rpc/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class connection : public std::enable_shared_from_this<connection>,
callback_ = std::move(callback);
}

void set_delay(bool delay) { delay_ = delay; }

void on_network_error(std::function<void(std::shared_ptr<connection>,
std::string)> &on_net_err) {
on_net_err_ = &on_net_err;
Expand Down Expand Up @@ -208,8 +210,14 @@ class connection : public std::enable_shared_from_this<connection>,
if (!ec) {
read_head();
if (req_type_ == request_type::req_res) {
router_.route<connection>(func_id, body_.data(), length,
this->shared_from_this());
route_result_t ret = router_.route<connection>(
func_id, nonstd::string_view{body_.data(), length},
this->shared_from_this());
if (delay_) {
delay_ = false;
} else {
response(req_id_, std::move(ret.result));
}
} else if (req_type_ == request_type::sub_pub) {
try {
msgpack_codec codec;
Expand Down Expand Up @@ -420,6 +428,7 @@ class connection : public std::enable_shared_from_this<connection>,
nullptr;
router &router_;
nonstd::any user_data_;
bool delay_ = false;
};
} // namespace rpc_service
} // namespace rest_rpc
Expand Down
106 changes: 50 additions & 56 deletions include/rest_rpc/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,38 @@
#include "codec.h"
#include "md5.hpp"
#include "meta_util.hpp"
#include "string_view.hpp"
#include "use_asio.hpp"
#include <functional>
#include <string>
#include <unordered_map>

namespace rest_rpc {
enum class ExecMode { sync, async };
const constexpr ExecMode Async = ExecMode::async;

namespace rpc_service {
class connection;

enum class router_error { ok, no_such_function, has_exception, unkonw };

struct route_result_t {
router_error ec = router_error::unkonw;
std::string result;
};

class router : asio::noncopyable {
public:
template <ExecMode model, typename Function>
template <typename Function>
void register_handler(std::string const &name, Function f) {
uint32_t key = MD5::MD5Hash32(name.data());
key2func_name_.emplace(key, name);
return register_nonmember_func<model>(key, std::move(f));
return register_nonmember_func(key, std::move(f));
}

template <ExecMode model, typename Function, typename Self>
template <typename Function, typename Self>
void register_handler(std::string const &name, const Function &f,
Self *self) {
uint32_t key = MD5::MD5Hash32(name.data());
key2func_name_.emplace(key, name);
return register_member_func<model>(key, f, self);
return register_member_func(key, f, self);
}

void remove_handler(std::string const &name) {
Expand All @@ -48,41 +53,38 @@ class router : asio::noncopyable {
}

template <typename T>
void route(uint32_t key, const char *data, std::size_t size,
std::weak_ptr<T> conn) {
auto conn_sp = conn.lock();
if (!conn_sp) {
return;
}

auto req_id = conn_sp->request_id();
route_result_t route(uint32_t key, nonstd::string_view data,
std::weak_ptr<T> conn) {
route_result_t route_result{};
std::string result;
try {
msgpack_codec codec;
auto it = map_invokers_.find(key);
if (it == map_invokers_.end()) {
result = codec.pack_args_str(
result_code::FAIL, "unknown function: " + get_name_by_key(key));
conn_sp->response(req_id, std::move(result));
return;
}

ExecMode model;
it->second(conn, data, size, result, model);
if (model == ExecMode::sync) {
if (result.size() >= MAX_BUF_LEN) {
result = codec.pack_args_str(
result_code::FAIL,
"the response result is out of range: more than 10M " +
get_name_by_key(key));
}
conn_sp->response(req_id, std::move(result));
route_result.ec = router_error::no_such_function;
} else {
it->second(conn, data, result);
route_result.ec = router_error::ok;
}
} catch (const std::exception &ex) {
msgpack_codec codec;
result = codec.pack_args_str(result_code::FAIL, ex.what());
conn_sp->response(req_id, std::move(result));
result = codec.pack_args_str(
result_code::FAIL,
std::string("exception occur when call").append(ex.what()));
route_result.ec = router_error::has_exception;
} catch (...) {
msgpack_codec codec;
result = codec.pack_args_str(
result_code::FAIL, std::string("unknown exception occur when call ")
.append(get_name_by_key(key)));
route_result.ec = router_error::no_such_function;
}

route_result.result = std::move(result);

return route_result;
}

router() = default;
Expand Down Expand Up @@ -152,38 +154,32 @@ class router : asio::noncopyable {
result = msgpack_codec::pack_args_str(result_code::OK, r);
}

template <typename Function, ExecMode mode = ExecMode::sync> struct invoker {
template <ExecMode model>
template <typename Function> struct invoker {
static inline void apply(const Function &func,
std::weak_ptr<connection> conn, const char *data,
size_t size, std::string &result,
ExecMode &exe_model) {
std::weak_ptr<connection> conn,
nonstd::string_view str, std::string &result) {
using args_tuple = typename function_traits<Function>::bare_tuple_type;
exe_model = ExecMode::sync;
msgpack_codec codec;
try {
auto tp = codec.unpack<args_tuple>(data, size);
auto tp = codec.unpack<args_tuple>(str.data(), str.size());
call(func, conn, result, std::move(tp));
exe_model = model;
} catch (std::invalid_argument &e) {
result = codec.pack_args_str(result_code::FAIL, e.what());
} catch (const std::exception &e) {
result = codec.pack_args_str(result_code::FAIL, e.what());
}
}

template <ExecMode model, typename Self>
template <typename Self>
static inline void apply_member(const Function &func, Self *self,
std::weak_ptr<connection> conn,
const char *data, size_t size,
std::string &result, ExecMode &exe_model) {
nonstd::string_view str,
std::string &result) {
using args_tuple = typename function_traits<Function>::bare_tuple_type;
exe_model = ExecMode::sync;
msgpack_codec codec;
try {
auto tp = codec.unpack<args_tuple>(data, size);
auto tp = codec.unpack<args_tuple>(str.data(), str.size());
call_member(func, self, conn, result, std::move(tp));
exe_model = model;
} catch (std::invalid_argument &e) {
result = codec.pack_args_str(result_code::FAIL, e.what());
} catch (const std::exception &e) {
Expand All @@ -192,25 +188,23 @@ class router : asio::noncopyable {
}
};

template <ExecMode model, typename Function>
template <typename Function>
void register_nonmember_func(uint32_t key, Function f) {
this->map_invokers_[key] = {std::bind(
&invoker<Function>::template apply<model>, std::move(f),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5)};
&invoker<Function>::apply, std::move(f), std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3)};
}

template <ExecMode model, typename Function, typename Self>
template <typename Function, typename Self>
void register_member_func(uint32_t key, const Function &f, Self *self) {
this->map_invokers_[key] = {std::bind(
&invoker<Function>::template apply_member<model, Self>, f, self,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4, std::placeholders::_5)};
&invoker<Function>::template apply_member<Self>, f, self,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)};
}

std::unordered_map<
uint32_t, std::function<void(std::weak_ptr<connection>, const char *,
size_t, std::string &, ExecMode &model)>>
std::unordered_map<uint32_t,
std::function<void(std::weak_ptr<connection>,
nonstd::string_view, std::string &)>>
map_invokers_;
std::unordered_map<uint32_t, std::string> key2func_name_;
};
Expand Down
8 changes: 4 additions & 4 deletions include/rest_rpc/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ class rpc_server : private asio::noncopyable {

void run() { io_service_pool_.run(); }

template <ExecMode model = ExecMode::sync, typename Function>
template <typename Function>
void register_handler(std::string const &name, const Function &f) {
router_.register_handler<model>(name, f);
router_.register_handler(name, f);
}

template <ExecMode model = ExecMode::sync, typename Function, typename Self>
template <typename Function, typename Self>
void register_handler(std::string const &name, const Function &f,
Self *self) {
router_.register_handler<model>(name, f, self);
router_.register_handler(name, f, self);
}

void set_conn_timeout_callback(std::function<void(int64_t)> callback) {
Expand Down

0 comments on commit ae6c005

Please sign in to comment.