diff --git a/tests/test_rest_rpc.cpp b/tests/test_rest_rpc.cpp index 27c2957..9b7f291 100644 --- a/tests/test_rest_rpc.cpp +++ b/tests/test_rest_rpc.cpp @@ -23,6 +23,16 @@ struct person { }; person get_person(rpc_conn conn) { return {1, "tom", 20}; } +void server_user_data(rpc_conn conn) { + auto shared_conn = conn.lock(); + if (shared_conn) { + shared_conn->set_user_data(std::string("aa")); + auto s = conn.lock()->get_user_data(); + CHECK_EQ(s, "aa"); + } + return; +} + void hello(rpc_conn conn, const std::string &str) { std::cout << "hello " << str << std::endl; } @@ -32,8 +42,7 @@ TEST_CASE("test_client_reconnect") { client.enable_auto_reconnect(); // automatic reconnect client.enable_auto_heartbeat(); // automatic heartbeat client.set_error_callback([](asio::error_code ec) { - std::cout << "line: " << __LINE__ << ", 11msg: " << ec.message() - << std::endl; + std::cout << "line: " << __LINE__ << ", msg: " << ec.message() << std::endl; }); client.connect("127.0.0.1", 9000); @@ -53,8 +62,7 @@ TEST_CASE("test_client_reconnect") { std::cout << ex.what() << std::endl; } } else { - // count++; - std::cout << "connected failed: " << count++ << "\n"; + count++; } std::this_thread::sleep_for(std::chrono::seconds(1)); } @@ -65,9 +73,10 @@ TEST_CASE("test_client_default_constructor") { dummy d; server.register_handler("add", &dummy::add, &d); server.async_run(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); rpc_client client; + client.update_addr("127.0.0.1", 9000); bool r = client.connect("127.0.0.1", 9000); CHECK(r); auto result = client.call("add", 1, 2); @@ -79,7 +88,7 @@ TEST_CASE("test_constructor_with_language") { dummy d; server.register_handler("add", &dummy::add, &d); server.async_run(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); rpc_client client(client_language_t::CPP, nullptr); bool r = client.connect("127.0.0.1", 9000); @@ -93,7 +102,7 @@ TEST_CASE("test_client_async_connect") { dummy d; server.register_handler("add", &dummy::add, &d); server.async_run(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); rpc_client client; client.async_connect("127.0.0.1", 9000); @@ -111,7 +120,7 @@ TEST_CASE("test_client_sync_call") { dummy d; server.register_handler("add", &dummy::add, &d); server.async_run(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); rpc_client client("127.0.0.1", 9000); bool r = client.connect(); @@ -120,12 +129,24 @@ TEST_CASE("test_client_sync_call") { CHECK_EQ(result, 3); } +TEST_CASE("test_client_sync_call_return_void") { + rpc_server server(9000, std::thread::hardware_concurrency()); + server.register_handler("echo", echo); + server.async_run(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + rpc_client client("127.0.0.1", 9000); + bool r = client.connect(); + CHECK(r); + client.call<>("echo"); +} + TEST_CASE("test_client_async_call") { rpc_server server(9000, std::thread::hardware_concurrency()); server.register_handler("get_person", get_person); server.register_handler("hello", hello); server.async_run(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); rpc_client client("127.0.0.1", 9000); bool r = client.connect(); @@ -160,7 +181,7 @@ TEST_CASE("test_client_async_call_with_timeout") { rpc_server server(9000, std::thread::hardware_concurrency()); server.register_handler("echo", echo); server.async_run(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); rpc_client client; bool r = client.connect("127.0.0.1", 9000); @@ -170,8 +191,135 @@ TEST_CASE("test_client_async_call_with_timeout") { client.async_call<0>( "echo", [](const asio::error_code &ec, string_view data) { - auto str = as(data); - std::cout << "echo " << str << '\n'; + std::cout << "error code " << ec << ", err msg: " << data << '\n'; + }, + test); + client.async_call<>( + "echo", + [](const asio::error_code &ec, string_view data) { + std::cout << "error code " << ec << ", err msg: " << data << '\n'; }, test); } + +TEST_CASE("test_client_subscribe") { + rpc_server server(9000, std::thread::hardware_concurrency()); + server.register_handler("publish", + [&server](rpc_conn conn, std::string key, + std::string token, std::string val) { + server.publish(std::move(key), std::move(val)); + }); + bool stop = false; + std::thread thd([&server, &stop] { + while (!stop) { + server.publish("key", "hello subscriber"); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); + server.async_run(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + rpc_client client; + bool r = client.connect("127.0.0.1", 9000); + CHECK(r); + client.subscribe("key", [&stop](string_view data) { + std::cout << data << "\n"; + CHECK_EQ(data, "hello subscriber"); + stop = true; + }); + thd.join(); +} + +TEST_CASE("test_server_publish_encode_msg") { + rpc_server server(9000, std::thread::hardware_concurrency()); + server.register_handler("publish", + [&server](rpc_conn conn, std::string key, + std::string token, std::string val) { + server.publish(std::move(key), std::move(val)); + }); + bool stop = false; + std::thread thd([&server, &stop] { + person pp{1, "tom", 20}; + while (!stop) { + server.publish("person", pp); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); + server.async_run(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + rpc_client client; + bool r = client.connect("127.0.0.1", 9000); + CHECK(r); + client.subscribe("person", [&stop](string_view data) { + try { + msgpack_codec codec; + person p = codec.unpack(data.data(), data.size()); + CHECK_EQ(p.id, 1); + CHECK_EQ(p.age, 20); + CHECK_EQ(p.name, "tom"); + stop = true; + } catch (const std::exception &ex) { + std::cout << ex.what() << std::endl; + } + }); + thd.join(); +} + +TEST_CASE("test_client_subscribe_by_token") { + rpc_server server(9000, std::thread::hardware_concurrency()); + bool stop = false; + std::thread thd([&server, &stop] { + while (!stop) { + auto list = server.get_token_list(); + for (auto &token : list) { + server.publish_by_token("key", token, "hello token subscriber"); + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); + server.async_run(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + rpc_client client; + bool r = client.connect("127.0.0.1", 9000); + CHECK(r); + client.subscribe( + "key", "048a796c8a3c6a6b7bd1223bf2c8cee05232e927b521984ba417cb2fca6df9d1", + [&stop](string_view data) { + std::cout << data << "\n"; + CHECK_EQ(data, "hello token subscriber"); + stop = true; + }); + thd.join(); +} + +TEST_CASE("test_server_callback") { + rpc_server server(9000, std::thread::hardware_concurrency()); + dummy d; + server.register_handler("add", &dummy::add, &d); + server.set_network_err_callback( + [](std::shared_ptr conn, std::string reason) { + std::cout << "remote client address: " << conn->remote_address() + << " networking error, reason: " << reason << "\n"; + }); + server.set_conn_timeout_callback([](int64_t conn_id) { + std::cout << "connect id : " << conn_id << " timeout \n"; + }); + server.async_run(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + rpc_client client("127.0.0.1", 9000); + bool r = client.connect(); + CHECK(r); + auto result = client.call("add", 1, 2); + CHECK_EQ(result, 3); +} +TEST_CASE("test_server_user_data") { + rpc_server server(9000, std::thread::hardware_concurrency()); + server.register_handler("server_user_data", server_user_data); + server.async_run(); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + rpc_client client("127.0.0.1", 9000); + bool r = client.connect(); + CHECK(r); + client.call<>("server_user_data"); +} \ No newline at end of file