Skip to content

Commit

Permalink
This commit contains a bunch of minor changes.
Browse files Browse the repository at this point in the history
First it allows a ClientProfile to be updated with new reservation,
weight, and limit values by adding an update function.

Second it adds an ability to invoke callbacks when a ClientInfo object
is removed due to the idle timeout. Testing this functionality has
been added to the unit tests.

Third we add the ability to clear a heap to help with a more
controlled tear-down.

Finally, dmclock-servers "cleaning" has been renamed "idle erase" to
better indicate the role. Types and variable names have been adjusted
accordingly.

Signed-off-by: J. Eric Ivancich <[email protected]>
  • Loading branch information
ivancich committed May 10, 2018
1 parent a994998 commit abacfd0
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 27 deletions.
94 changes: 67 additions & 27 deletions src/dmclock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include <cmath>
#include <memory>
#include <set>
#include <map>
#include <deque>
#include <queue>
Expand Down Expand Up @@ -90,17 +91,18 @@ namespace crimson {
double limit_inv;

// order parameters -- min, "normal", max
ClientInfo(double _reservation, double _weight, double _limit) :
reservation(_reservation),
weight(_weight),
limit(_limit),
reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
{
// empty
ClientInfo(double _reservation, double _weight, double _limit) {
update(_reservation, _weight, _limit);
}

inline void update(double _reservation, double _weight, double _limit) {
reservation = _reservation;
weight = _weight;
limit = _limit;
reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation;
weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight;
limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit;
}

friend std::ostream& operator<<(std::ostream& out,
const ClientInfo& client) {
Expand Down Expand Up @@ -771,12 +773,14 @@ namespace crimson {
Duration idle_age;
Duration erase_age;
Duration check_time;
std::deque<MarkPoint> clean_mark_points;
std::deque<MarkPoint> idle_erase_mark_points;

// NB: All threads declared at end, so they're destructed first!
using IdleEraseListener = std::function<void(const C&)>;
std::set<const IdleEraseListener*> idle_erase_listeners;

std::unique_ptr<RunEvery> cleaning_job;
// NB: All threads declared at end, so they're destructed first!

std::unique_ptr<RunEvery> idle_erase_job;

// COMMON constructor that others feed into; we can accept three
// different variations of durations
Expand All @@ -797,15 +801,34 @@ namespace crimson {
{
assert(_erase_age >= _idle_age);
assert(_check_time < _idle_age);
cleaning_job =
idle_erase_job =
std::unique_ptr<RunEvery>(
new RunEvery(check_time,
std::bind(&PriorityQueueBase::do_clean, this)));
std::bind(&PriorityQueueBase::do_idle_erase, this)));
}


~PriorityQueueBase() {
finishing = true;

DataGuard g(data_mtx);

ready_heap.clear();
limit_heap.clear();
#if USE_PROP_HEAP
prop_heap.clear();
#endif
resv_heap.clear();

for (auto c = client_map.begin(); client_map.end() != c; /* empty */) {
auto current = c++;
for (auto l : idle_erase_listeners) {
(*l)(current->second->client);
}
client_map.erase(current);
}

idle_erase_listeners.clear();
}


Expand Down Expand Up @@ -1021,8 +1044,8 @@ namespace crimson {
void reduce_reservation_tags(const C& client_id) {
auto client_it = client_map.find(client_id);

// means the client was cleaned from map; should never happen
// as long as cleaning times are long enough
// means the client was idle-erased from map; should never
// happen as long as idle erase times are long enough
assert(client_map.end() != client_it);
reduce_reservation_tags(*client_it->second);
}
Expand Down Expand Up @@ -1116,27 +1139,27 @@ namespace crimson {
* This is being called regularly by RunEvery. Every time it's
* called it notes the time and delta counter (mark point) in a
* deque. It also looks at the deque to find the most recent
* mark point that is older than clean_age. It then walks the
* map and delete all server entries that were last used before
* that mark point.
* mark point that is older than idle-erase age. It then walks
* the map and delete all server entries that were last used
* before that mark point.
*/
void do_clean() {
void do_idle_erase() {
TimePoint now = std::chrono::steady_clock::now();
DataGuard g(data_mtx);
clean_mark_points.emplace_back(MarkPoint(now, tick));
idle_erase_mark_points.emplace_back(MarkPoint(now, tick));

// first erase the super-old client records

Counter erase_point = 0;
auto point = clean_mark_points.front();
auto point = idle_erase_mark_points.front();
while (point.first <= now - erase_age) {
erase_point = point.second;
clean_mark_points.pop_front();
point = clean_mark_points.front();
idle_erase_mark_points.pop_front();
point = idle_erase_mark_points.front();
}

Counter idle_point = 0;
for (auto i : clean_mark_points) {
for (auto i : idle_erase_mark_points) {
if (i.first <= now - idle_age) {
idle_point = i.second;
} else {
Expand All @@ -1148,14 +1171,17 @@ namespace crimson {
for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
auto i2 = i++;
if (erase_point && i2->second->last_tick <= erase_point) {
for (auto l : idle_erase_listeners) {
(*l)(i2->second->client);
}
delete_from_heaps(i2->second);
client_map.erase(i2);
} else if (idle_point && i2->second->last_tick <= idle_point) {
i2->second->idle = true;
}
} // for
} // if
} // do_clean
} // do_idle_erase


// data_mtx must be held by caller
Expand All @@ -1176,6 +1202,21 @@ namespace crimson {
delete_from_heap(client, limit_heap);
delete_from_heap(client, ready_heap);
}

public:

void add_idle_erase_listener(const IdleEraseListener& listener) {
DataGuard g(data_mtx);
if (!finishing) {
idle_erase_listeners.insert(&listener);
}
}


void remove_idle_erase_listener(const IdleEraseListener& listener) {
DataGuard g(data_mtx);
idle_erase_listeners.erase(&listener);
}
}; // class PriorityQueueBase


Expand Down Expand Up @@ -1487,7 +1528,6 @@ namespace crimson {
sched_ahead_thd.join();
}

public:

inline void add_request(R&& request,
const C& client_id,
Expand Down
2 changes: 2 additions & 0 deletions support/src/indirect_intrusive_heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ namespace crimson {

bool empty() const { return 0 == count; }

void clear() { data.clear(); count = 0; }

size_t size() const { return (size_t) count; }

T& top() { return *data[0]; }
Expand Down
13 changes: 13 additions & 0 deletions test/test_dmclock_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <iostream>
#include <list>
#include <vector>
#include <atomic>


#include "dmclock_server.h"
Expand Down Expand Up @@ -97,6 +98,7 @@ namespace crimson {
using Queue = dmc::PushPriorityQueue<ClientId,Request>;
int client = 17;
double reservation = 100.0;
std::atomic<std::uint32_t> idle_erase_counter(0u);

dmc::ClientInfo ci(reservation, 1.0, 0.0);
auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
Expand All @@ -109,6 +111,9 @@ namespace crimson {
uint64_t req_cost) {
// empty; do nothing
};
auto idle_erase_listener_f = [&idle_erase_counter](const ClientId& c) {
idle_erase_counter++;
};

Queue pq(client_info_f,
server_ready_f,
Expand All @@ -117,6 +122,7 @@ namespace crimson {
std::chrono::seconds(5),
std::chrono::seconds(2),
false);
pq.add_idle_erase_listener(idle_erase_listener_f);

auto lock_pq = [&](std::function<void()> code) {
test_locked(pq.data_mtx, code);
Expand Down Expand Up @@ -171,12 +177,19 @@ namespace crimson {
"after idle age client map entry shows idle.";
});

EXPECT_EQ(0u, idle_erase_counter) <<
"idle erase counter is still 0 since client has not yet been "
"idle-erased";

std::this_thread::sleep_for(std::chrono::seconds(2));

lock_pq([&] () {
EXPECT_EQ(0u, pq.client_map.size()) <<
"client map loses its entry after erase age";
});

EXPECT_EQ(1u, idle_erase_counter) <<
"idle erase counter is now 1 since client has been idle-erased";
} // TEST


Expand Down

0 comments on commit abacfd0

Please sign in to comment.