Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Jan 27, 2024
1 parent bd30ca1 commit 11a51c6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
40 changes: 33 additions & 7 deletions include/rest_rpc/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,45 @@ struct route_result_t {
std::string result;
};

template <typename Tuple, bool is_pub> class helper_t {
public:
helper_t(Tuple &tp) : tp_(tp) {}

void operator()() {}

private:
Tuple &tp_;
};

template <typename Tuple> class helper_t<Tuple, true> {
public:
helper_t(Tuple &tp) : tp_(tp) {}

void operator()() {
auto &arg = std::get<std::tuple_size<Tuple>::value - 1>(tp_);
msgpack_codec codec;
arg = codec.unpack<std::string>(arg.data(), arg.size());
}

private:
Tuple &tp_;
};

class router : asio::noncopyable {
public:
template <typename Function>
void register_handler(std::string const &name, Function f) {
template <bool is_pub = false, typename Function>
void register_handler(std::string const &name, Function f, bool pub = false) {
uint32_t key = MD5::MD5Hash32(name.data());
key2func_name_.emplace(key, name);
return register_nonmember_func(key, std::move(f));
return register_nonmember_func<is_pub>(key, std::move(f));
}

template <typename Function, typename Self>
template <bool is_pub = false, typename Function, typename Self>
void register_handler(std::string const &name, const Function &f,
Self *self) {
uint32_t key = MD5::MD5Hash32(name.data());
key2func_name_.emplace(key, name);
return register_member_func(key, f, self);
return register_member_func<is_pub>(key, f, self);
}

void remove_handler(std::string const &name) {
Expand Down Expand Up @@ -154,7 +178,7 @@ class router : asio::noncopyable {
result = msgpack_codec::pack_args_str(result_code::OK, r);
}

template <typename Function>
template <bool is_pub, typename Function>
void register_nonmember_func(uint32_t key, Function f) {
this->map_invokers_[key] = [f](std::weak_ptr<connection> conn,
nonstd::string_view str,
Expand All @@ -163,6 +187,7 @@ class router : asio::noncopyable {
msgpack_codec codec;
try {
auto tp = codec.unpack<args_tuple>(str.data(), str.size());
helper_t<args_tuple, is_pub>{tp}();
call(f, conn, result, std::move(tp));
} catch (std::invalid_argument &e) {
result = codec.pack_args_str(result_code::FAIL, e.what());
Expand All @@ -172,7 +197,7 @@ class router : asio::noncopyable {
};
}

template <typename Function, typename Self>
template <bool is_pub, typename Function, typename Self>
void register_member_func(uint32_t key, const Function &f, Self *self) {
this->map_invokers_[key] = [f, self](std::weak_ptr<connection> conn,
nonstd::string_view str,
Expand All @@ -181,6 +206,7 @@ class router : asio::noncopyable {
msgpack_codec codec;
try {
auto tp = codec.unpack<args_tuple>(str.data(), str.size());
helper_t<args_tuple, is_pub>{tp}();
call_member(f, self, conn, result, std::move(tp));
} catch (std::invalid_argument &e) {
result = codec.pack_args_str(result_code::FAIL, e.what());
Expand Down
8 changes: 4 additions & 4 deletions include/rest_rpc/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ class rpc_server : private asio::noncopyable {

void run() { io_service_pool_.run(); }

template <typename Function>
template <bool is_pub = false, typename Function>
void register_handler(std::string const &name, const Function &f) {
router_.register_handler(name, f);
router_.register_handler<is_pub>(name, f);
}

template <typename Function, typename Self>
template <bool is_pub = false, typename Function, typename Self>
void register_handler(std::string const &name, const Function &f,
Self *self) {
router_.register_handler(name, f, self);
router_.register_handler<is_pub>(name, f, self);
}

void set_conn_timeout_callback(std::function<void(int64_t)> callback) {
Expand Down
13 changes: 8 additions & 5 deletions tests/test_rest_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,12 @@ TEST_CASE("test_client_async_call_with_timeout") {

TEST_CASE("test_client_subscribe") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("publish",
[&server](rpc_conn conn, std::string key,
std::string token, std::string val) {
server.publish(std::move(key), std::move(val));
});
server.register_handler<true>(
"publish", [&server](rpc_conn conn, std::string key, std::string token,
std::string val) {
CHECK(val == "hello subscriber");
server.publish(std::move(key), std::move(val));
});
bool stop = false;
std::thread thd([&server, &stop] {
while (!stop) {
Expand All @@ -221,6 +222,8 @@ TEST_CASE("test_client_subscribe") {
rpc_client client;
bool r = client.connect("127.0.0.1", 9000);
CHECK(r);
client.publish("key", "hello subscriber");

client.subscribe("key", [&stop](string_view data) {
std::cout << data << "\n";
CHECK_EQ(data, "hello subscriber");
Expand Down

0 comments on commit 11a51c6

Please sign in to comment.