Skip to content

Commit

Permalink
Allow the calculations of rho and delta to be handled by a "tracker"
Browse files Browse the repository at this point in the history
specified via template parameter (i.e., by static polymorphism). The
tracker follows a simple interface constisting of three functions and
one static function.

Add a BorrowingTracker. Make it the default tracker. Add unit tests
for it. Maintain the original calculations as the OrigTracker.

Signed-off-by: J. Eric Ivancich <[email protected]>
  • Loading branch information
ivancich committed Sep 22, 2017
1 parent b632cfd commit a94c4e0
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 57 deletions.
3 changes: 2 additions & 1 deletion sim/src/test_dmclock.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ namespace crimson {
};

using DmcQueue = dmc::PushPriorityQueue<ClientId,sim::TestRequest>;
using DmcServiceTracker = dmc::ServiceTracker<ServerId,dmc::BorrowingTracker>;

using DmcServer = sim::SimulatedServer<DmcQueue,
dmc::ReqParams,
dmc::PhaseType,
DmcAccum>;

using DmcClient = sim::SimulatedClient<dmc::ServiceTracker<ServerId>,
using DmcClient = sim::SimulatedClient<DmcServiceTracker,
dmc::ReqParams,
dmc::PhaseType,
DmcAccum>;
Expand Down
12 changes: 6 additions & 6 deletions sim/src/test_dmclock_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ int main(int argc, char* argv[]) {


void test::client_data(std::ostream& out,
test::MySim* sim,
test::MySim::ClientFilter client_disp_filter,
int head_w, int data_w, int data_prec) {
test::MySim* sim,
test::MySim::ClientFilter client_disp_filter,
int head_w, int data_w, int data_prec) {
// report how many ops were done by reservation and proportion for
// each client

Expand Down Expand Up @@ -270,9 +270,9 @@ void test::client_data(std::ostream& out,


void test::server_data(std::ostream& out,
test::MySim* sim,
test::MySim::ServerFilter server_disp_filter,
int head_w, int data_w, int data_prec) {
test::MySim* sim,
test::MySim::ServerFilter server_disp_filter,
int head_w, int data_w, int data_prec) {
out << std::setw(head_w) << "res_ops:";
int total_r = 0;
for (uint i = 0; i < sim->get_server_count(); ++i) {
Expand Down
155 changes: 118 additions & 37 deletions src/dmclock_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,132 @@

namespace crimson {
namespace dmclock {
struct ServerInfo {

// OrigTracker is a best-effort implementation of the the original
// dmClock calculations of delta and rho. It adheres to an
// interface, implemented via a template type, that allows it to
// be replaced with an alternative. The interface consists of the
// static create, prepare_req, resp_update, and get_last_delta
// functions.
class OrigTracker {
Counter delta_prev_req;
Counter rho_prev_req;
uint32_t my_delta;
uint32_t my_rho;

ServerInfo(Counter _delta_prev_req,
Counter _rho_prev_req) :
delta_prev_req(_delta_prev_req),
rho_prev_req(_rho_prev_req),
public:

OrigTracker(Counter global_delta,
Counter global_rho) :
delta_prev_req(global_delta),
rho_prev_req(global_rho),
my_delta(0),
my_rho(0)
{
// empty
{ /* empty */ }

static inline OrigTracker create(Counter the_delta, Counter the_rho) {
return OrigTracker(the_delta, the_rho);
}

inline void req_update(Counter delta, Counter rho) {
delta_prev_req = delta;
rho_prev_req = rho;
inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) {
Counter delta_out = 1 + the_delta - delta_prev_req - my_delta;
Counter rho_out = 1 + the_rho - rho_prev_req - my_rho;
delta_prev_req = the_delta;
rho_prev_req = the_rho;
my_delta = 0;
my_rho = 0;
return ReqParams(uint32_t(delta_out), uint32_t(rho_out));
}

inline void resp_update(PhaseType phase) {
inline void resp_update(PhaseType phase,
Counter& the_delta,
Counter& the_rho) {
++the_delta;
++my_delta;
if (phase == PhaseType::reservation) ++my_rho;
if (phase == PhaseType::reservation) {
++the_rho;
++my_rho;
}
}

inline Counter get_last_delta() const {
return delta_prev_req;
}
};
}; // struct OrigTracker


// BorrowingTracker always returns a positive delta and rho. If
// not enough responses have come in to allow that, we will borrow
// a future response and repay it later.
class BorrowingTracker {
Counter delta_prev_req;
Counter rho_prev_req;
Counter delta_borrow;
Counter rho_borrow;

public:

BorrowingTracker(Counter global_delta, Counter global_rho) :
delta_prev_req(global_delta),
rho_prev_req(global_rho),
delta_borrow(0),
rho_borrow(0)
{ /* empty */ }

static inline BorrowingTracker create(Counter the_delta,
Counter the_rho) {
return BorrowingTracker(the_delta, the_rho);
}

inline Counter calc_with_borrow(const Counter& global,
const Counter& previous,
Counter& borrow) {
Counter result = global - previous;
if (0 == result) {
// if no replies have come in, borrow one from the future
++borrow;
return 1;
} else if (result > borrow) {
// if we can give back all of what we borrowed, do so
result -= borrow;
borrow = 0;
return result;
} else {
// can only return part of what was borrowed in order to
// return positive
borrow = borrow - result + 1;
return 1;
}
}

inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) {
Counter delta_out =
calc_with_borrow(the_delta, delta_prev_req, delta_borrow);
Counter rho_out =
calc_with_borrow(the_rho, rho_prev_req, rho_borrow);
delta_prev_req = the_delta;
rho_prev_req = the_rho;
return ReqParams(uint32_t(delta_out), uint32_t(rho_out));
}

inline void resp_update(PhaseType phase,
Counter& the_delta,
Counter& the_rho) {
++the_delta;
if (phase == PhaseType::reservation) {
++the_rho;
}
}

inline Counter get_last_delta() const {
return delta_prev_req;
}
}; // struct BorrowingTracker


// S is server identifier type
template<typename S>
// T is the server info class that adheres to ServerTrackerIfc interface
template<typename S, typename T = BorrowingTracker>
class ServiceTracker {
// we don't want to include gtest.h just for FRIEND_TEST
friend class dmclock_client_server_erase_Test;
Expand All @@ -64,15 +158,15 @@ namespace crimson {

Counter delta_counter; // # reqs completed
Counter rho_counter; // # reqs completed via reservation
std::map<S,ServerInfo> server_map;
std::map<S,T> server_map;
mutable std::mutex data_mtx; // protects Counters and map

using DataGuard = std::lock_guard<decltype(data_mtx)>;

// clean config

std::deque<MarkPoint> clean_mark_points;
Duration clean_age; // age at which ServerInfo cleaned
Duration clean_age; // age at which server tracker cleaned

// NB: All threads declared at end, so they're destructed firs!

Expand Down Expand Up @@ -119,38 +213,25 @@ namespace crimson {
// this code can only run if a request did not precede the
// response or if the record was cleaned up b/w when
// the request was made and now
ServerInfo si(delta_counter, rho_counter);
si.resp_update(phase);
server_map.emplace(server_id, si);
} else {
it->second.resp_update(phase);
}

++delta_counter;
if (PhaseType::reservation == phase) {
++rho_counter;
auto i = server_map.emplace(server_id,
T::create(delta_counter, rho_counter));
it = i.first;
}
it->second.resp_update(phase, delta_counter, rho_counter);
}


/*
* Returns the ReqParams for the given server.
*/
ReqParams get_req_params(const S& server) {
DataGuard g(data_mtx);
auto it = server_map.find(server);
if (server_map.end() == it) {
server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
server_map.emplace(server,
T::create(delta_counter, rho_counter));
return ReqParams(1, 1);
} else {
Counter delta =
1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
Counter rho =
1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;

it->second.req_update(delta_counter, rho_counter);

return ReqParams(uint32_t(delta), uint32_t(rho));
return it->second.prepare_req(delta_counter, rho_counter);
}
}

Expand Down Expand Up @@ -182,7 +263,7 @@ namespace crimson {
i != server_map.end();
/* empty */) {
auto i2 = i++;
if (i2->second.delta_prev_req <= earliest) {
if (i2->second.get_last_delta() <= earliest) {
server_map.erase(i2);
}
}
Expand Down
Loading

0 comments on commit a94c4e0

Please sign in to comment.