Skip to content

Commit

Permalink
Merge branch 'master-new'
Browse files Browse the repository at this point in the history
  • Loading branch information
ivancich committed Jan 9, 2017
2 parents 2c1ad4a + 498a1d5 commit dc5cf1a
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 136 deletions.
206 changes: 141 additions & 65 deletions src/dmclock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <condition_variable>
#include <thread>
#include <iostream>
#include <sstream>
#include <limits>

#include <boost/variant.hpp>
Expand All @@ -58,8 +59,13 @@ namespace crimson {

namespace c = crimson;

constexpr double max_tag = std::numeric_limits<double>::max();
constexpr double min_tag = std::numeric_limits<double>::lowest();
constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
std::numeric_limits<double>::infinity() :
std::numeric_limits<double>::max();
constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
-1 * std::numeric_limits<double>::infinity() :
std::numeric_limits<double>::lowest();
constexpr uint tag_modulo = 1000000;

struct ClientInfo {
const double reservation; // minimum
Expand Down Expand Up @@ -88,21 +94,18 @@ namespace crimson {
friend std::ostream& operator<<(std::ostream& out,
const ClientInfo& client) {
out <<
"{ r:" << client.reservation <<
" w:" << client.weight <<
" l:" << client.limit <<
" 1/r:" << client.reservation_inv <<
" 1/w:" << client.weight_inv <<
" 1/l:" << client.limit_inv <<
"{ ClientInfo:: r:" << client.reservation <<
" w:" << std::fixed << client.weight <<
" l:" << std::fixed << client.limit <<
" 1/r:" << std::fixed << client.reservation_inv <<
" 1/w:" << std::fixed << client.weight_inv <<
" 1/l:" << std::fixed << client.limit_inv <<
" }";
return out;
}
}; // class ClientInfo


std::ostream& operator<<(std::ostream& out,
const crimson::dmclock::ClientInfo& client);

struct RequestTag {
double reservation;
double proportion;
Expand Down Expand Up @@ -164,6 +167,26 @@ namespace crimson {
// empty
}

static std::string format_tag_change(double before, double after) {
if (before == after) {
return std::string("same");
} else {
std::stringstream ss;
ss << format_tag(before) << "=>" << format_tag(after);
return ss.str();
}
}

static std::string format_tag(double value) {
if (max_tag == value) {
return std::string("max");
} else if (min_tag == value) {
return std::string("min");
} else {
return format_time(value, tag_modulo);
}
}

private:

static double tag_calc(const Time& time,
Expand All @@ -184,18 +207,21 @@ namespace crimson {
friend std::ostream& operator<<(std::ostream& out,
const RequestTag& tag) {
out <<
"{ r:" << format_time(tag.reservation) <<
" p:" << format_time(tag.proportion) <<
" l:" << format_time(tag.limit) << " }";
"{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
" r:" << format_tag(tag.reservation) <<
" p:" << format_tag(tag.proportion) <<
" l:" << format_tag(tag.limit) <<
#if 0 // try to resolve this to make sure Time is operator<<'able.
#ifndef DO_NOT_DELAY_TAG_CALC
" arrival:" << tag.arrival <<
#endif
#endif
" }";
return out;
}
}; // class RequestTag


std::ostream& operator<<(std::ostream& out,
const crimson::dmclock::RequestTag& tag);


// C is client identifier type, R is request type, B is heap
// branching factor
template<typename C, typename R, uint B>
Expand Down Expand Up @@ -238,7 +264,8 @@ namespace crimson {
}

friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
out << c.tag;
out << "{ ClientReq:: tag:" << c.tag << " client:" <<
c.client_id << " }";
return out;
}
}; // class ClientReq
Expand Down Expand Up @@ -288,21 +315,20 @@ namespace crimson {
return prev_tag;
}

static inline void assign_unpinned_tag(double& lhs, const double rhs) {
if (rhs != max_tag && rhs != min_tag) {
lhs = rhs;
}
}

inline void update_req_tag(const RequestTag& _prev,
const Counter& _tick) {
prev_tag = _prev;
assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
assign_unpinned_tag(prev_tag.limit, _prev.limit);
assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
last_tick = _tick;
}

inline double get_prev_prop_tag() const {
return prev_tag.proportion;
}

inline void set_prev_prop_tag(double value,
bool adjust_by_inc = false) {
prev_tag.proportion = value - (adjust_by_inc ? info.weight_inv : 0.0);
}

inline void add_request(const RequestTag& tag,
const C& client_id,
RequestRef&& request) {
Expand Down Expand Up @@ -372,12 +398,21 @@ namespace crimson {
return remove_by_req_filter_fw(filter_accum);
}
}

friend std::ostream&
operator<<(std::ostream& out,
const typename PriorityQueueBase<C,R,B>::ClientRec& e) {
out << "{ client:" << e.client << " top req: " <<
(e.has_request() ? e.next_request() : "none") << " }";
out << "{ ClientRec::" <<
" client:" << e.client <<
" prev_tag:" << e.prev_tag <<
" top_req:";
if (e.has_request()) {
out << e.next_request();
} else {
out << "none";
}
out << " }";

return out;
}
}; // class ClientRec
Expand Down Expand Up @@ -498,6 +533,55 @@ namespace crimson {
}


friend std::ostream& operator<<(std::ostream& out,
const PriorityQueueBase& q) {
std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);

out << "{ PriorityQueue:: " << std::endl;
for (const auto& c : q.client_map) {
out << " { client:" << c.first << ", record:" << *c.second <<
" }";
}
if (!q.resv_heap.empty()) {
const auto& resv = q.resv_heap.top();
out << std::endl << " { reservation_top:" << resv << " }" << std::endl;
const auto& ready = q.ready_heap.top();
out << " { ready_top:" << ready << " }" << std::endl;
const auto& limit = q.limit_heap.top();
out << " { limit_top:" << limit << " }" << std::endl;
} else {
out << " HEAPS-EMPTY";
}
out << " }" << std::endl;

return out;
}

// for debugging
void display_queues(std::ostream& out,
bool show_res = true,
bool show_lim = true,
bool show_ready = true,
bool show_prop = true) const {
auto filter = [](const ClientRec& e)->bool { return true; };
DataGuard g(data_mtx);
if (show_res) {
resv_heap.display_sorted(out << "RESER:", filter) << std::endl;
}
if (show_lim) {
limit_heap.display_sorted(out << "LIMIT:", filter) << std::endl;
}
if (show_ready) {
ready_heap.display_sorted(out << "READY:", filter) << std::endl;
}
#if USE_PROP_HEAP
if (show_prop) {
prop_heap.display_sorted(out << "PROPO:", filter) << std::endl;
}
#endif
} // display_queues


protected:

// The ClientCompare functor is essentially doing a precedes?
Expand All @@ -518,7 +602,7 @@ namespace crimson {
//
// ready_opt determines how the ready flag influences the sort
//
// use_prop_delta determines whether the proportial delta is
// use_prop_delta determines whether the proportional delta is
// added in for comparison
template<double RequestTag::*tag_field,
ReadyOption ready_opt,
Expand Down Expand Up @@ -661,7 +745,7 @@ namespace crimson {
// this pointer will help us create a reference to a shared
// pointer, no matter which of two codepaths we take
ClientRec* temp_client;

auto client_it = client_map.find(client_id);
if (client_map.end() != client_it) {
temp_client = &(*client_it->second); // address of obj of shared_ptr
Expand Down Expand Up @@ -725,7 +809,7 @@ namespace crimson {
}
}

// if this conditional does not fire, it
// if this conditional does not fire, it
if (lowest_prop_tag < lowest_prop_tag_trigger) {
client.prop_delta = lowest_prop_tag - time;
}
Expand All @@ -744,18 +828,26 @@ namespace crimson {
}
#else
RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost);
// copy tag to previous tag for client
client.update_req_tag(tag, tick);
#endif

client.add_request(tag, client.client, std::move(request));
if (1 == client.requests.size()) {
// NB: can the following 4 calls to adjust be changed
// promote? Can adding a request ever demote a client in the
// heaps?
resv_heap.adjust(client);
limit_heap.adjust(client);
ready_heap.adjust(client);
#if USE_PROP_HEAP
prop_heap.adjust(client);
#endif
}

client.cur_rho = req_params.rho;
client.cur_delta = req_params.delta;

#ifdef DO_NOT_DELAY_TAG_CALC
// copy tag to previous tag for client
client.update_req_tag(tag, tick);
#endif

resv_heap.adjust(client);
limit_heap.adjust(client);
ready_heap.adjust(client);
Expand All @@ -776,6 +868,14 @@ namespace crimson {
ClientReq& first = top.next_request();
RequestRef request = std::move(first.request);

#if DEBUG
std::cout << "[ OUT client:" << top.info <<
" res:" << RequestTag::format_tag(first.tag.reservation) <<
" wgt:" << RequestTag::format_tag(first.tag.proportion) <<
" lim:" << RequestTag::format_tag(first.tag.limit) <<
" ]" << std::endl;
#endif

// pop request and adjust heaps
top.pop_request();

Expand Down Expand Up @@ -803,30 +903,6 @@ namespace crimson {
} // pop_process_request


// for debugging
void display_queues(bool show_res = true,
bool show_lim = true,
bool show_ready = true,
bool show_prop = true) {
auto filter = [](const ClientRecRef& e)->bool { return !e->handled; };
if (show_res) {
resv_heap.display_sorted(std::cout << "RESER:", filter) << std::endl;
}
if (show_lim) {
limit_heap.display_sorted(std::cout << "LIMIT:", filter) << std::endl;
}
if (show_ready) {
ready_heap.display_sorted(std::cout << "READY:", filter) << std::endl;
}
#if USE_PROP_HEAP
if (show_prop) {
prop_heap.display_sorted(std::cout << "PROPO:", filter) << std::endl;
}
#endif
std::cout << "{ HEAP_BRANCHING:" << B << "}" << std::endl;
} // display_queues


// data_mtx should be held when called
void reduce_reservation_tags(ClientRec& client) {
for (auto& r : client.requests) {
Expand Down Expand Up @@ -857,7 +933,7 @@ namespace crimson {
// data_mtx should be held when called
NextReq do_next_request(Time now) {
NextReq result;

// if reservation queue is empty, all are empty (i.e., no active clients)
if(resv_heap.empty()) {
result.type = NextReqType::none;
Expand Down
2 changes: 1 addition & 1 deletion src/dmclock_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
std::string crimson::dmclock::format_time(const Time& time, uint modulo) {
long subtract = long(time / modulo) * modulo;
std::stringstream ss;
ss << std::fixed << std::setprecision(4) << time - subtract;
ss << std::fixed << std::setprecision(4) << (time - subtract);
return ss.str();
}

Expand Down
Loading

0 comments on commit dc5cf1a

Please sign in to comment.