diff --git a/src/dmclock_server.h b/src/dmclock_server.h index c6dc43f..986c630 100644 --- a/src/dmclock_server.h +++ b/src/dmclock_server.h @@ -98,17 +98,18 @@ namespace crimson { double limit_inv; // order parameters -- min, "normal", max - ClientInfo(double _reservation, double _weight, double _limit) : - reservation(_reservation), - weight(_weight), - limit(_limit), - reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation), - weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight), - limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit) - { - // empty + ClientInfo(double _reservation, double _weight, double _limit) { + update(_reservation, _weight, _limit); } + inline void update(double _reservation, double _weight, double _limit) { + reservation = _reservation; + weight = _weight; + limit = _limit; + reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation; + weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight; + limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit; + } friend std::ostream& operator<<(std::ostream& out, const ClientInfo& client) { @@ -532,6 +533,12 @@ namespace crimson { // a function that can be called to look up client information using ClientInfoFunc = std::function; + // a function that can be called when dmclock decides to idle a + // client and remove its information; if the owner's + // ClientInfoFunc is maintaining information about the client, + // that information can be removed + using IdleEraseListener = std::function; + bool empty() const { DataGuard g(data_mtx); @@ -748,6 +755,7 @@ namespace crimson { }; ClientInfoFunc client_info_f; + IdleEraseListener idle_erase_f; static constexpr bool is_dynamic_cli_info_f = U1; mutable std::mutex data_mtx; @@ -805,11 +813,11 @@ namespace crimson { Duration idle_age; Duration erase_age; Duration check_time; - std::deque clean_mark_points; + std::deque idle_erase_mark_points; // NB: All threads declared at end, so they're destructed first! - std::unique_ptr cleaning_job; + std::unique_ptr idle_erase_job; // helper function to return the value of a variant if it matches the // given type T, or a default value of T otherwise @@ -827,8 +835,10 @@ namespace crimson { std::chrono::duration _erase_age, std::chrono::duration _check_time, AtLimitParam at_limit_param, - double _anticipation_timeout) : + double _anticipation_timeout, + IdleEraseListener _idle_erase_f) : client_info_f(_client_info_f), + idle_erase_f(_idle_erase_f), at_limit(get_or_default(at_limit_param, AtLimit::Reject)), reject_threshold(get_or_default(at_limit_param, 0)), anticipation_timeout(_anticipation_timeout), @@ -841,10 +851,10 @@ namespace crimson { assert(_check_time < _idle_age); // AtLimit::Reject depends on ImmediateTagCalc assert(at_limit != AtLimit::Reject || !IsDelayed); - cleaning_job = + idle_erase_job = std::unique_ptr( new RunEvery(check_time, - std::bind(&PriorityQueueBase::do_clean, this))); + std::bind(&PriorityQueueBase::do_idle_erase, this))); } @@ -1084,8 +1094,8 @@ namespace crimson { void reduce_reservation_tags(const C& client_id, const RequestTag& tag) { auto client_it = client_map.find(client_id); - // means the client was cleaned from map; should never happen - // as long as cleaning times are long enough + // means the client was idle-erased from map; should never + // happen as long as idle erase times are long enough assert(client_map.end() != client_it); ClientRec& client = *client_it->second; reduce_reservation_tags(TagCalc{}, client, tag); @@ -1185,27 +1195,27 @@ namespace crimson { * This is being called regularly by RunEvery. Every time it's * called it notes the time and delta counter (mark point) in a * deque. It also looks at the deque to find the most recent - * mark point that is older than clean_age. It then walks the - * map and delete all server entries that were last used before - * that mark point. + * mark point that is older than idle-erase age. It then walks + * the map and delete all server entries that were last used + * before that mark point. */ - void do_clean() { + void do_idle_erase() { TimePoint now = std::chrono::steady_clock::now(); DataGuard g(data_mtx); - clean_mark_points.emplace_back(MarkPoint(now, tick)); + idle_erase_mark_points.emplace_back(MarkPoint(now, tick)); // first erase the super-old client records Counter erase_point = 0; - auto point = clean_mark_points.front(); + auto point = idle_erase_mark_points.front(); while (point.first <= now - erase_age) { erase_point = point.second; - clean_mark_points.pop_front(); - point = clean_mark_points.front(); + idle_erase_mark_points.pop_front(); + point = idle_erase_mark_points.front(); } Counter idle_point = 0; - for (auto i : clean_mark_points) { + for (auto i : idle_erase_mark_points) { if (i.first <= now - idle_age) { idle_point = i.second; } else { @@ -1217,6 +1227,9 @@ namespace crimson { for (auto i = client_map.begin(); i != client_map.end(); /* empty */) { auto i2 = i++; if (erase_point && i2->second->last_tick <= erase_point) { + if (nullptr != idle_erase_f) { + idle_erase_f(i2->second->client); + } delete_from_heaps(i2->second); client_map.erase(i2); } else if (idle_point && i2->second->last_tick <= idle_point) { @@ -1224,7 +1237,7 @@ namespace crimson { } } // for } // if - } // do_clean + } // do_idle_erase // data_mtx must be held by caller @@ -1289,10 +1302,12 @@ namespace crimson { std::chrono::duration _erase_age, std::chrono::duration _check_time, AtLimitParam at_limit_param = AtLimit::Wait, - double _anticipation_timeout = 0.0) : + double _anticipation_timeout = 0.0, + typename super::IdleEraseListener _idle_erase_f = nullptr) : super(_client_info_f, _idle_age, _erase_age, _check_time, - at_limit_param, _anticipation_timeout) + at_limit_param, _anticipation_timeout, + _idle_erase_f) { // empty } @@ -1301,13 +1316,15 @@ namespace crimson { // pull convenience constructor PullPriorityQueue(typename super::ClientInfoFunc _client_info_f, AtLimitParam at_limit_param = AtLimit::Wait, - double _anticipation_timeout = 0.0) : + double _anticipation_timeout = 0.0, + typename super::IdleEraseListener _idle_erase_f = nullptr) : PullPriorityQueue(_client_info_f, std::chrono::minutes(10), std::chrono::minutes(15), std::chrono::minutes(6), at_limit_param, - _anticipation_timeout) + _anticipation_timeout, + _idle_erase_f) { // empty } @@ -1522,10 +1539,11 @@ namespace crimson { std::chrono::duration _erase_age, std::chrono::duration _check_time, AtLimitParam at_limit_param = AtLimit::Wait, - double anticipation_timeout = 0.0) : + double anticipation_timeout = 0.0, + typename super::IdleEraseListener _idle_erase_f = nullptr) : super(_client_info_f, _idle_age, _erase_age, _check_time, - at_limit_param, anticipation_timeout) + at_limit_param, anticipation_timeout, _idle_erase_f) { can_handle_f = _can_handle_f; handle_f = _handle_f; @@ -1538,7 +1556,8 @@ namespace crimson { CanHandleRequestFunc _can_handle_f, HandleRequestFunc _handle_f, AtLimitParam at_limit_param = AtLimit::Wait, - double _anticipation_timeout = 0.0) : + double _anticipation_timeout = 0.0, + typename super::IdleEraseListener _idle_erase_f = nullptr) : PushPriorityQueue(_client_info_f, _can_handle_f, _handle_f, @@ -1546,7 +1565,8 @@ namespace crimson { std::chrono::minutes(15), std::chrono::minutes(6), at_limit_param, - _anticipation_timeout) + _anticipation_timeout, + _idle_erase_f) { // empty } @@ -1558,7 +1578,6 @@ namespace crimson { sched_ahead_thd.join(); } - public: int add_request(R&& request, const C& client_id, diff --git a/support/src/indirect_intrusive_heap.h b/support/src/indirect_intrusive_heap.h index 5e2af90..f6d9882 100644 --- a/support/src/indirect_intrusive_heap.h +++ b/support/src/indirect_intrusive_heap.h @@ -227,6 +227,8 @@ namespace crimson { bool empty() const { return 0 == count; } + void clear() { data.clear(); count = 0; } + size_t size() const { return (size_t) count; } T& top() { return *data[0]; } diff --git a/test/test_dmclock_server.cc b/test/test_dmclock_server.cc index 81250a8..8266f89 100644 --- a/test/test_dmclock_server.cc +++ b/test/test_dmclock_server.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include "dmclock_server.h" @@ -102,6 +103,7 @@ namespace crimson { using Queue = dmc::PushPriorityQueue; int client = 17; double reservation = 100.0; + std::atomic idle_erase_counter(0u); dmc::ClientInfo ci(reservation, 1.0, 0.0); auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* { @@ -114,6 +116,9 @@ namespace crimson { uint64_t req_cost) { // empty; do nothing }; + auto idle_erase_listener_f = [&idle_erase_counter](const ClientId& c) { + idle_erase_counter++; + }; Queue pq(client_info_f, server_ready_f, @@ -121,7 +126,9 @@ namespace crimson { std::chrono::seconds(3), std::chrono::seconds(5), std::chrono::seconds(2), - AtLimit::Wait); + AtLimit::Wait, + 0.0, + idle_erase_listener_f); auto lock_pq = [&](std::function code) { test_locked(pq.data_mtx, code); @@ -176,12 +183,19 @@ namespace crimson { "after idle age client map entry shows idle."; }); + EXPECT_EQ(0u, idle_erase_counter) << + "idle erase counter is still 0 since client has not yet been " + "idle-erased"; + std::this_thread::sleep_for(std::chrono::seconds(2)); lock_pq([&] () { EXPECT_EQ(0u, pq.client_map.size()) << "client map loses its entry after erase age"; }); + + EXPECT_EQ(1u, idle_erase_counter) << + "idle erase counter is now 1 since client has been idle-erased"; } // TEST