diff --git a/examples/client/main.cpp b/examples/client/main.cpp index c35d60a..bd12645 100644 --- a/examples/client/main.cpp +++ b/examples/client/main.cpp @@ -217,7 +217,7 @@ void test_echo() { } { - auto result = client.call("async_echo", "test"); + auto result = client.call("delay_echo", "test"); std::cout << result << std::endl; } } @@ -349,11 +349,11 @@ void test_callback() { rpc_client client; bool r = client.connect("127.0.0.1", 9000); - for (size_t i = 0; i < 100; i++) { + for (size_t i = 0; i < 10; i++) { std::string test = "test" + std::to_string(i + 1); // set timeout 100ms - client.async_call<100>( - "async_echo", + client.async_call<10000>( + "delay_echo", [](const asio::error_code &ec, string_view data) { if (ec) { std::cout << ec.value() << " timeout" << std::endl; @@ -361,7 +361,7 @@ void test_callback() { } auto str = as(data); - std::cout << "echo " << str << '\n'; + std::cout << "delay echo " << str << '\n'; }, test); diff --git a/examples/server/main.cpp b/examples/server/main.cpp index 3305c90..47da07f 100644 --- a/examples/server/main.cpp +++ b/examples/server/main.cpp @@ -75,11 +75,11 @@ std::string get_name(rpc_conn conn, const person &p) { // if you want to response later, you can use async model, you can control when // to response -void async_echo(rpc_conn conn, const std::string &src) { - auto req_id = - conn.lock()->request_id(); // note: you need keep the request id at that - // time, and pass it into the async thread - +void delay_echo(rpc_conn conn, const std::string &src) { + auto sp = conn.lock(); + sp->set_delay(true); + auto req_id = sp->request_id(); // note: you need keep the request id at that + // time, and pass it into the async thread std::thread thd([conn, req_id, src] { std::this_thread::sleep_for(std::chrono::seconds(1)); auto conn_sp = conn.lock(); @@ -121,7 +121,7 @@ dummy1 get_dummy(rpc_conn conn, dummy1 d) { return d; } int main() { // benchmark_test(); - rpc_server server(9000, std::thread::hardware_concurrency()); + rpc_server server(9000, std::thread::hardware_concurrency(), 3600); dummy d; server.register_handler("add", &dummy::add, &d); @@ -135,7 +135,7 @@ int main() { server.register_handler("upload", upload); server.register_handler("download", download); server.register_handler("get_name", get_name); - server.register_handler("async_echo", async_echo); + server.register_handler("delay_echo", delay_echo); server.register_handler("echo", echo); server.register_handler("get_int", get_int); diff --git a/include/rest_rpc/connection.h b/include/rest_rpc/connection.h index 1690aa3..7219e74 100644 --- a/include/rest_rpc/connection.h +++ b/include/rest_rpc/connection.h @@ -101,6 +101,8 @@ class connection : public std::enable_shared_from_this, callback_ = std::move(callback); } + void set_delay(bool delay) { delay_ = delay; } + void on_network_error(std::function, std::string)> &on_net_err) { on_net_err_ = &on_net_err; @@ -208,8 +210,14 @@ class connection : public std::enable_shared_from_this, if (!ec) { read_head(); if (req_type_ == request_type::req_res) { - router_.route(func_id, body_.data(), length, - this->shared_from_this()); + route_result_t ret = router_.route( + func_id, nonstd::string_view{body_.data(), length}, + this->shared_from_this()); + if (delay_) { + delay_ = false; + } else { + response(req_id_, std::move(ret.result)); + } } else if (req_type_ == request_type::sub_pub) { try { msgpack_codec codec; @@ -420,6 +428,7 @@ class connection : public std::enable_shared_from_this, nullptr; router &router_; nonstd::any user_data_; + bool delay_ = false; }; } // namespace rpc_service } // namespace rest_rpc diff --git a/include/rest_rpc/cplusplus_14.h b/include/rest_rpc/cplusplus_14.h index ac95c98..cda912a 100644 --- a/include/rest_rpc/cplusplus_14.h +++ b/include/rest_rpc/cplusplus_14.h @@ -5,13 +5,13 @@ #include #include -#if __cplusplus == 201103L - -namespace std { -template struct unique_if { typedef unique_ptr single_object; }; +namespace nonstd { +template struct unique_if { + typedef std::unique_ptr single_object; +}; template struct unique_if { - typedef unique_ptr unknown_bound; + typedef std::unique_ptr unknown_bound; }; template struct unique_if { @@ -20,12 +20,12 @@ template struct unique_if { template typename unique_if::single_object make_unique(Args &&...args) { - return unique_ptr(new T(forward(args)...)); + return std::unique_ptr(new T(std::forward(args)...)); } template typename unique_if::unknown_bound make_unique(size_t n) { - typedef typename remove_extent::type U; - return unique_ptr(new U[n]()); + typedef typename std::remove_extent::type U; + return std::unique_ptr(new U[n]()); } template @@ -59,20 +59,21 @@ template using index_sequence_for = make_index_sequence; template -using enable_if_t = typename enable_if::type; +using enable_if_t = typename std::enable_if::type; -template using remove_const_t = typename remove_const::type; +template +using remove_const_t = typename std::remove_const::type; template -using remove_reference_t = typename remove_reference::type; +using remove_reference_t = typename std::remove_reference::type; template -using tuple_element_t = typename tuple_element::type; +using tuple_element_t = typename std::tuple_element::type; -template using decay_t = typename decay::type; +template using decay_t = typename std::decay::type; template -auto apply_helper(F &&f, Tuple &&tp, std::index_sequence) +auto apply_helper(F &&f, Tuple &&tp, nonstd::index_sequence) -> decltype(std::forward(f)(std::get(std::forward(tp))...)) { return std::forward(f)(std::get(std::forward(tp))...); } @@ -80,10 +81,10 @@ auto apply_helper(F &&f, Tuple &&tp, std::index_sequence) template auto apply(F &&f, Tuple &&tp) -> decltype(apply_helper( std::forward(f), std::forward(tp), - std::make_index_sequence>::value>{})) { + make_index_sequence>::value>{})) { return apply_helper( std::forward(f), std::forward(tp), - std::make_index_sequence>::value>{}); + make_index_sequence>::value>{}); } template @@ -92,8 +93,6 @@ auto invoke(F &&f, Args &&...args) return std::forward(f)(std::forward(args)...); } -} // namespace std - -#endif +} // namespace nonstd #endif // REST_RPC_CPLUSPLUS_14_H_ diff --git a/include/rest_rpc/meta_util.hpp b/include/rest_rpc/meta_util.hpp index 8fb6a98..a2fe80e 100644 --- a/include/rest_rpc/meta_util.hpp +++ b/include/rest_rpc/meta_util.hpp @@ -8,13 +8,13 @@ namespace rest_rpc { template void for_each(const std::tuple &t, Func &&f, - std::index_sequence) { + nonstd::index_sequence) { (void)std::initializer_list{(f(std::get(t)), void(), 0)...}; } template void for_each_i(const std::tuple &t, Func &&f, - std::index_sequence) { + nonstd::index_sequence) { (void)std::initializer_list{ (f(std::get(t), std::integral_constant{}), void(), 0)...}; @@ -32,14 +32,15 @@ struct function_traits { typedef Ret (*pointer)(Arg, Args...); typedef std::tuple tuple_type; - typedef std::tuple>...> + typedef std::tuple< + nonstd::remove_const_t>...> bare_tuple_type; using args_tuple = std::tuple>...>; + nonstd::remove_const_t>...>; using args_tuple_2nd = std::tuple>...>; + nonstd::remove_const_t>...>; }; template struct function_traits { @@ -76,24 +77,24 @@ struct function_traits : function_traits {}; template using remove_const_reference_t = - std::remove_const_t>; + nonstd::remove_const_t>; template -auto make_tuple_from_sequence(std::index_sequence) +auto make_tuple_from_sequence(nonstd::index_sequence) -> decltype(std::make_tuple(Is...)) { std::make_tuple(Is...); } template constexpr auto make_tuple_from_sequence() - -> decltype(make_tuple_from_sequence(std::make_index_sequence{})) { - return make_tuple_from_sequence(std::make_index_sequence{}); + -> decltype(make_tuple_from_sequence(nonstd::make_index_sequence{})) { + return make_tuple_from_sequence(nonstd::make_index_sequence{}); } namespace detail { template void tuple_switch(const std::size_t i, Tuple &&t, F &&f, - std::index_sequence) { + nonstd::index_sequence) { (void)std::initializer_list{ (i == Is && ((void)std::forward(f)(std::integral_constant{}), 0))...}; @@ -102,14 +103,14 @@ void tuple_switch(const std::size_t i, Tuple &&t, F &&f, template inline void tuple_switch(const std::size_t i, Tuple &&t, F &&f) { - constexpr auto N = std::tuple_size>::value; + constexpr auto N = std::tuple_size>::value; detail::tuple_switch(i, std::forward(t), std::forward(f), - std::make_index_sequence{}); + nonstd::make_index_sequence{}); } template -using nth_type_of = std::tuple_element_t>; +using nth_type_of = nonstd::tuple_element_t>; template using last_type_of = nth_type_of; diff --git a/include/rest_rpc/router.h b/include/rest_rpc/router.h index 1cde1dc..d9c4d06 100644 --- a/include/rest_rpc/router.h +++ b/include/rest_rpc/router.h @@ -4,33 +4,62 @@ #include "codec.h" #include "md5.hpp" #include "meta_util.hpp" +#include "string_view.hpp" #include "use_asio.hpp" #include #include #include namespace rest_rpc { -enum class ExecMode { sync, async }; -const constexpr ExecMode Async = ExecMode::async; - namespace rpc_service { class connection; +enum class router_error { ok, no_such_function, has_exception, unkonw }; + +struct route_result_t { + router_error ec = router_error::unkonw; + std::string result; +}; + +template class helper_t { +public: + helper_t(Tuple &tp) : tp_(tp) {} + + void operator()() {} + +private: + Tuple &tp_; +}; + +template class helper_t { +public: + helper_t(Tuple &tp) : tp_(tp) {} + + void operator()() { + auto &arg = std::get::value - 1>(tp_); + msgpack_codec codec; + arg = codec.unpack(arg.data(), arg.size()); + } + +private: + Tuple &tp_; +}; + class router : asio::noncopyable { public: - template - void register_handler(std::string const &name, Function f) { + template + void register_handler(std::string const &name, Function f, bool pub = false) { uint32_t key = MD5::MD5Hash32(name.data()); key2func_name_.emplace(key, name); - return register_nonmember_func(key, std::move(f)); + return register_nonmember_func(key, std::move(f)); } - template + template void register_handler(std::string const &name, const Function &f, Self *self) { uint32_t key = MD5::MD5Hash32(name.data()); key2func_name_.emplace(key, name); - return register_member_func(key, f, self); + return register_member_func(key, f, self); } void remove_handler(std::string const &name) { @@ -48,14 +77,9 @@ class router : asio::noncopyable { } template - void route(uint32_t key, const char *data, std::size_t size, - std::weak_ptr conn) { - auto conn_sp = conn.lock(); - if (!conn_sp) { - return; - } - - auto req_id = conn_sp->request_id(); + route_result_t route(uint32_t key, nonstd::string_view data, + std::weak_ptr conn) { + route_result_t route_result{}; std::string result; try { msgpack_codec codec; @@ -63,26 +87,28 @@ class router : asio::noncopyable { if (it == map_invokers_.end()) { result = codec.pack_args_str( result_code::FAIL, "unknown function: " + get_name_by_key(key)); - conn_sp->response(req_id, std::move(result)); - return; - } - - ExecMode model; - it->second(conn, data, size, result, model); - if (model == ExecMode::sync) { - if (result.size() >= MAX_BUF_LEN) { - result = codec.pack_args_str( - result_code::FAIL, - "the response result is out of range: more than 10M " + - get_name_by_key(key)); - } - conn_sp->response(req_id, std::move(result)); + route_result.ec = router_error::no_such_function; + } else { + it->second(conn, data, result); + route_result.ec = router_error::ok; } } catch (const std::exception &ex) { msgpack_codec codec; - result = codec.pack_args_str(result_code::FAIL, ex.what()); - conn_sp->response(req_id, std::move(result)); + result = codec.pack_args_str( + result_code::FAIL, + std::string("exception occur when call").append(ex.what())); + route_result.ec = router_error::has_exception; + } catch (...) { + msgpack_codec codec; + result = codec.pack_args_str( + result_code::FAIL, std::string("unknown exception occur when call ") + .append(get_name_by_key(key))); + route_result.ec = router_error::no_such_function; } + + route_result.result = std::move(result); + + return route_result; } router() = default; @@ -93,7 +119,7 @@ class router : asio::noncopyable { template static typename std::result_of, Args...)>::type - call_helper(const F &f, const std::index_sequence &, + call_helper(const F &f, const nonstd::index_sequence &, std::tuple tup, std::weak_ptr ptr) { return f(ptr, std::move(std::get(tup))...); } @@ -103,8 +129,8 @@ class router : asio::noncopyable { F(std::weak_ptr, Args...)>::type>::value>::type call(const F &f, std::weak_ptr ptr, std::string &result, std::tuple tp) { - call_helper(f, std::make_index_sequence{}, std::move(tp), - ptr); + call_helper(f, nonstd::make_index_sequence{}, + std::move(tp), ptr); result = msgpack_codec::pack_args_str(result_code::OK); } @@ -113,7 +139,7 @@ class router : asio::noncopyable { F(std::weak_ptr, Args...)>::type>::value>::type call(const F &f, std::weak_ptr ptr, std::string &result, std::tuple tp) { - auto r = call_helper(f, std::make_index_sequence{}, + auto r = call_helper(f, nonstd::make_index_sequence{}, std::move(tp), ptr); msgpack_codec codec; result = msgpack_codec::pack_args_str(result_code::OK, r); @@ -123,7 +149,7 @@ class router : asio::noncopyable { static typename std::result_of, Args...)>::type call_member_helper(const F &f, Self *self, - const std::index_sequence &, + const nonstd::index_sequence &, std::tuple tup, std::weak_ptr ptr = std::shared_ptr{nullptr}) { @@ -136,7 +162,7 @@ class router : asio::noncopyable { call_member(const F &f, Self *self, std::weak_ptr ptr, std::string &result, std::tuple tp) { call_member_helper(f, self, - typename std::make_index_sequence{}, + typename nonstd::make_index_sequence{}, std::move(tp), ptr); result = msgpack_codec::pack_args_str(result_code::OK); } @@ -147,70 +173,52 @@ class router : asio::noncopyable { call_member(const F &f, Self *self, std::weak_ptr ptr, std::string &result, std::tuple tp) { auto r = call_member_helper( - f, self, typename std::make_index_sequence{}, + f, self, typename nonstd::make_index_sequence{}, std::move(tp), ptr); result = msgpack_codec::pack_args_str(result_code::OK, r); } - template struct invoker { - template - static inline void apply(const Function &func, - std::weak_ptr conn, const char *data, - size_t size, std::string &result, - ExecMode &exe_model) { + template + void register_nonmember_func(uint32_t key, Function f) { + this->map_invokers_[key] = [f](std::weak_ptr conn, + nonstd::string_view str, + std::string &result) { using args_tuple = typename function_traits::bare_tuple_type; - exe_model = ExecMode::sync; msgpack_codec codec; try { - auto tp = codec.unpack(data, size); - call(func, conn, result, std::move(tp)); - exe_model = model; + auto tp = codec.unpack(str.data(), str.size()); + helper_t{tp}(); + call(f, conn, result, std::move(tp)); } catch (std::invalid_argument &e) { result = codec.pack_args_str(result_code::FAIL, e.what()); } catch (const std::exception &e) { result = codec.pack_args_str(result_code::FAIL, e.what()); } - } + }; + } - template - static inline void apply_member(const Function &func, Self *self, - std::weak_ptr conn, - const char *data, size_t size, - std::string &result, ExecMode &exe_model) { + template + void register_member_func(uint32_t key, const Function &f, Self *self) { + this->map_invokers_[key] = [f, self](std::weak_ptr conn, + nonstd::string_view str, + std::string &result) { using args_tuple = typename function_traits::bare_tuple_type; - exe_model = ExecMode::sync; msgpack_codec codec; try { - auto tp = codec.unpack(data, size); - call_member(func, self, conn, result, std::move(tp)); - exe_model = model; + auto tp = codec.unpack(str.data(), str.size()); + helper_t{tp}(); + call_member(f, self, conn, result, std::move(tp)); } catch (std::invalid_argument &e) { result = codec.pack_args_str(result_code::FAIL, e.what()); } catch (const std::exception &e) { result = codec.pack_args_str(result_code::FAIL, e.what()); } - } - }; - - template - void register_nonmember_func(uint32_t key, Function f) { - this->map_invokers_[key] = {std::bind( - &invoker::template apply, std::move(f), - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, - std::placeholders::_4, std::placeholders::_5)}; - } - - template - void register_member_func(uint32_t key, const Function &f, Self *self) { - this->map_invokers_[key] = {std::bind( - &invoker::template apply_member, f, self, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, - std::placeholders::_4, std::placeholders::_5)}; + }; } - std::unordered_map< - uint32_t, std::function, const char *, - size_t, std::string &, ExecMode &model)>> + std::unordered_map, + nonstd::string_view, std::string &)>> map_invokers_; std::unordered_map key2func_name_; }; diff --git a/include/rest_rpc/rpc_server.h b/include/rest_rpc/rpc_server.h index 9aab7f2..11fcfa0 100644 --- a/include/rest_rpc/rpc_server.h +++ b/include/rest_rpc/rpc_server.h @@ -52,15 +52,15 @@ class rpc_server : private asio::noncopyable { void run() { io_service_pool_.run(); } - template + template void register_handler(std::string const &name, const Function &f) { - router_.register_handler(name, f); + router_.register_handler(name, f); } - template + template void register_handler(std::string const &name, const Function &f, Self *self) { - router_.register_handler(name, f, self); + router_.register_handler(name, f, self); } void set_conn_timeout_callback(std::function callback) { diff --git a/tests/test_rest_rpc.cpp b/tests/test_rest_rpc.cpp index d0d8086..76839bf 100644 --- a/tests/test_rest_rpc.cpp +++ b/tests/test_rest_rpc.cpp @@ -210,11 +210,12 @@ TEST_CASE("test_client_async_call_with_timeout") { TEST_CASE("test_client_subscribe") { rpc_server server(9000, std::thread::hardware_concurrency()); - server.register_handler("publish", - [&server](rpc_conn conn, std::string key, - std::string token, std::string val) { - server.publish(std::move(key), std::move(val)); - }); + server.register_handler( + "publish", [&server](rpc_conn conn, std::string key, std::string token, + std::string val) { + CHECK(val == "hello subscriber"); + server.publish(std::move(key), std::move(val)); + }); bool stop = false; std::thread thd([&server, &stop] { while (!stop) { @@ -227,6 +228,8 @@ TEST_CASE("test_client_subscribe") { rpc_client client; bool r = client.connect("127.0.0.1", 9000); CHECK(r); + client.publish("key", "hello subscriber"); + client.subscribe("key", [&stop](string_view data) { std::cout << data << "\n"; CHECK_EQ(data, "hello subscriber");