Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set of functionality additions and small adjustments. #56

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 54 additions & 35 deletions src/dmclock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ namespace crimson {
double limit_inv;

// order parameters -- min, "normal", max
ClientInfo(double _reservation, double _weight, double _limit) :
reservation(_reservation),
weight(_weight),
limit(_limit),
reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
{
// empty
ClientInfo(double _reservation, double _weight, double _limit) {
update(_reservation, _weight, _limit);
}

inline void update(double _reservation, double _weight, double _limit) {
reservation = _reservation;
weight = _weight;
limit = _limit;
reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation;
weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight;
limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit;
}

friend std::ostream& operator<<(std::ostream& out,
const ClientInfo& client) {
Expand Down Expand Up @@ -532,6 +533,12 @@ namespace crimson {
// a function that can be called to look up client information
using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;

// a function that can be called when dmclock decides to idle a
// client and remove its information; if the owner's
// ClientInfoFunc is maintaining information about the client,
// that information can be removed
using IdleEraseListener = std::function<void(const C&)>;


bool empty() const {
DataGuard g(data_mtx);
Expand Down Expand Up @@ -748,6 +755,7 @@ namespace crimson {
};

ClientInfoFunc client_info_f;
IdleEraseListener idle_erase_f;
static constexpr bool is_dynamic_cli_info_f = U1;

mutable std::mutex data_mtx;
Expand Down Expand Up @@ -805,11 +813,11 @@ namespace crimson {
Duration idle_age;
Duration erase_age;
Duration check_time;
std::deque<MarkPoint> clean_mark_points;
std::deque<MarkPoint> idle_erase_mark_points;

// NB: All threads declared at end, so they're destructed first!

std::unique_ptr<RunEvery> cleaning_job;
std::unique_ptr<RunEvery> idle_erase_job;

// helper function to return the value of a variant if it matches the
// given type T, or a default value of T otherwise
Expand All @@ -827,8 +835,10 @@ namespace crimson {
std::chrono::duration<Rep,Per> _erase_age,
std::chrono::duration<Rep,Per> _check_time,
AtLimitParam at_limit_param,
double _anticipation_timeout) :
double _anticipation_timeout,
IdleEraseListener _idle_erase_f) :
client_info_f(_client_info_f),
idle_erase_f(_idle_erase_f),
at_limit(get_or_default<AtLimit>(at_limit_param, AtLimit::Reject)),
reject_threshold(get_or_default<RejectThreshold>(at_limit_param, 0)),
anticipation_timeout(_anticipation_timeout),
Expand All @@ -841,10 +851,10 @@ namespace crimson {
assert(_check_time < _idle_age);
// AtLimit::Reject depends on ImmediateTagCalc
assert(at_limit != AtLimit::Reject || !IsDelayed);
cleaning_job =
idle_erase_job =
std::unique_ptr<RunEvery>(
new RunEvery(check_time,
std::bind(&PriorityQueueBase::do_clean, this)));
std::bind(&PriorityQueueBase::do_idle_erase, this)));
}


Expand Down Expand Up @@ -1084,8 +1094,8 @@ namespace crimson {
void reduce_reservation_tags(const C& client_id, const RequestTag& tag) {
auto client_it = client_map.find(client_id);

// means the client was cleaned from map; should never happen
// as long as cleaning times are long enough
// means the client was idle-erased from map; should never
// happen as long as idle erase times are long enough
assert(client_map.end() != client_it);
ClientRec& client = *client_it->second;
reduce_reservation_tags(TagCalc{}, client, tag);
Expand Down Expand Up @@ -1185,27 +1195,27 @@ namespace crimson {
* This is being called regularly by RunEvery. Every time it's
* called it notes the time and delta counter (mark point) in a
* deque. It also looks at the deque to find the most recent
* mark point that is older than clean_age. It then walks the
* map and delete all server entries that were last used before
* that mark point.
* mark point that is older than idle-erase age. It then walks
* the map and delete all server entries that were last used
* before that mark point.
*/
void do_clean() {
void do_idle_erase() {
TimePoint now = std::chrono::steady_clock::now();
DataGuard g(data_mtx);
clean_mark_points.emplace_back(MarkPoint(now, tick));
idle_erase_mark_points.emplace_back(MarkPoint(now, tick));

// first erase the super-old client records

Counter erase_point = 0;
auto point = clean_mark_points.front();
auto point = idle_erase_mark_points.front();
while (point.first <= now - erase_age) {
erase_point = point.second;
clean_mark_points.pop_front();
point = clean_mark_points.front();
idle_erase_mark_points.pop_front();
point = idle_erase_mark_points.front();
}

Counter idle_point = 0;
for (auto i : clean_mark_points) {
for (auto i : idle_erase_mark_points) {
if (i.first <= now - idle_age) {
idle_point = i.second;
} else {
Expand All @@ -1217,14 +1227,17 @@ namespace crimson {
for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
auto i2 = i++;
if (erase_point && i2->second->last_tick <= erase_point) {
if (nullptr != idle_erase_f) {
idle_erase_f(i2->second->client);
}
delete_from_heaps(i2->second);
client_map.erase(i2);
} else if (idle_point && i2->second->last_tick <= idle_point) {
i2->second->idle = true;
}
} // for
} // if
} // do_clean
} // do_idle_erase


// data_mtx must be held by caller
Expand Down Expand Up @@ -1289,10 +1302,12 @@ namespace crimson {
std::chrono::duration<Rep,Per> _erase_age,
std::chrono::duration<Rep,Per> _check_time,
AtLimitParam at_limit_param = AtLimit::Wait,
double _anticipation_timeout = 0.0) :
double _anticipation_timeout = 0.0,
typename super::IdleEraseListener _idle_erase_f = nullptr) :
super(_client_info_f,
_idle_age, _erase_age, _check_time,
at_limit_param, _anticipation_timeout)
at_limit_param, _anticipation_timeout,
_idle_erase_f)
{
// empty
}
Expand All @@ -1301,13 +1316,15 @@ namespace crimson {
// pull convenience constructor
PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
AtLimitParam at_limit_param = AtLimit::Wait,
double _anticipation_timeout = 0.0) :
double _anticipation_timeout = 0.0,
typename super::IdleEraseListener _idle_erase_f = nullptr) :
PullPriorityQueue(_client_info_f,
std::chrono::minutes(10),
std::chrono::minutes(15),
std::chrono::minutes(6),
at_limit_param,
_anticipation_timeout)
_anticipation_timeout,
_idle_erase_f)
{
// empty
}
Expand Down Expand Up @@ -1522,10 +1539,11 @@ namespace crimson {
std::chrono::duration<Rep,Per> _erase_age,
std::chrono::duration<Rep,Per> _check_time,
AtLimitParam at_limit_param = AtLimit::Wait,
double anticipation_timeout = 0.0) :
double anticipation_timeout = 0.0,
typename super::IdleEraseListener _idle_erase_f = nullptr) :
super(_client_info_f,
_idle_age, _erase_age, _check_time,
at_limit_param, anticipation_timeout)
at_limit_param, anticipation_timeout, _idle_erase_f)
{
can_handle_f = _can_handle_f;
handle_f = _handle_f;
Expand All @@ -1538,15 +1556,17 @@ namespace crimson {
CanHandleRequestFunc _can_handle_f,
HandleRequestFunc _handle_f,
AtLimitParam at_limit_param = AtLimit::Wait,
double _anticipation_timeout = 0.0) :
double _anticipation_timeout = 0.0,
typename super::IdleEraseListener _idle_erase_f = nullptr) :
PushPriorityQueue(_client_info_f,
_can_handle_f,
_handle_f,
std::chrono::minutes(10),
std::chrono::minutes(15),
std::chrono::minutes(6),
at_limit_param,
_anticipation_timeout)
_anticipation_timeout,
_idle_erase_f)
{
// empty
}
Expand All @@ -1558,7 +1578,6 @@ namespace crimson {
sched_ahead_thd.join();
}

public:

int add_request(R&& request,
const C& client_id,
Expand Down
2 changes: 2 additions & 0 deletions support/src/indirect_intrusive_heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ namespace crimson {

bool empty() const { return 0 == count; }

void clear() { data.clear(); count = 0; }

size_t size() const { return (size_t) count; }

T& top() { return *data[0]; }
Expand Down
16 changes: 15 additions & 1 deletion test/test_dmclock_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <iostream>
#include <list>
#include <vector>
#include <atomic>


#include "dmclock_server.h"
Expand Down Expand Up @@ -102,6 +103,7 @@ namespace crimson {
using Queue = dmc::PushPriorityQueue<ClientId,Request>;
int client = 17;
double reservation = 100.0;
std::atomic<std::uint32_t> idle_erase_counter(0u);

dmc::ClientInfo ci(reservation, 1.0, 0.0);
auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
Expand All @@ -114,14 +116,19 @@ namespace crimson {
uint64_t req_cost) {
// empty; do nothing
};
auto idle_erase_listener_f = [&idle_erase_counter](const ClientId& c) {
idle_erase_counter++;
};

Queue pq(client_info_f,
server_ready_f,
submit_req_f,
std::chrono::seconds(3),
std::chrono::seconds(5),
std::chrono::seconds(2),
AtLimit::Wait);
AtLimit::Wait,
0.0,
idle_erase_listener_f);

auto lock_pq = [&](std::function<void()> code) {
test_locked(pq.data_mtx, code);
Expand Down Expand Up @@ -176,12 +183,19 @@ namespace crimson {
"after idle age client map entry shows idle.";
});

EXPECT_EQ(0u, idle_erase_counter) <<
"idle erase counter is still 0 since client has not yet been "
"idle-erased";

std::this_thread::sleep_for(std::chrono::seconds(2));

lock_pq([&] () {
EXPECT_EQ(0u, pq.client_map.size()) <<
"client map loses its entry after erase age";
});

EXPECT_EQ(1u, idle_erase_counter) <<
"idle erase counter is now 1 since client has been idle-erased";
} // TEST


Expand Down