Skip to content

Commit

Permalink
Modify dmclock code to generalize filtering functions, so they don't
Browse files Browse the repository at this point in the history
assume lists but instead get an accumulator function. Update tests
accordingly.

Signed-off-by: J. Eric Ivancich <[email protected]>
  • Loading branch information
ivancich committed Sep 27, 2016
1 parent 2baf442 commit 1335fda
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 58 deletions.
84 changes: 35 additions & 49 deletions src/dmclock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,13 @@ namespace crimson {

// NB: because a deque is the underlying structure, this
// operation might be expensive
template<typename Collect>
bool remove_by_req_filter_forwards(std::function<bool(const R&)> filter,
Collect& out) {
bool remove_by_req_filter_fw(std::function<bool(const R&)> filter_accum) {
bool any_removed = false;
for (auto i = requests.begin();
i != requests.end();
/* no inc */) {
if (filter(*i->request)) {
if (filter_accum(*i->request)) {
any_removed = true;
out.push_back(*i->request);
i = requests.erase(i);
} else {
++i;
Expand All @@ -351,31 +348,28 @@ namespace crimson {

// NB: because a deque is the underlying structure, this
// operation might be expensive
template<typename Collect>
bool remove_by_req_filter_backwards(std::function<bool(const R&)> filter,
Collect& out) {
bool remove_by_req_filter_bw(std::function<bool(const R&)> filter_accum) {
bool any_removed = false;
for (auto i = --requests.end();
/* no cond */;
--i) {
if (filter(*i->request)) {
for (auto i = requests.rbegin();
i != requests.rend();
/* no inc */) {
if (filter_accum(*i->request)) {
any_removed = true;
out.push_back(*i->request);
i = requests.erase(i);
i = decltype(i){ requests.erase(std::next(i).base()) };
} else {
++i;
}
if (requests.begin() == i) break;
}
return any_removed;
}

template<typename Collect>
inline bool remove_by_req_filter(std::function<bool(const R&)> filter,
Collect& out,
bool visit_backwards) {
inline bool
remove_by_req_filter(std::function<bool(const R&)> filter_accum,
bool visit_backwards) {
if (visit_backwards) {
return remove_by_req_filter_backwards(filter, out);
return remove_by_req_filter_bw(filter_accum);
} else {
return remove_by_req_filter_forwards(filter, out);
return remove_by_req_filter_fw(filter_accum);
}
}

Expand Down Expand Up @@ -438,25 +432,13 @@ namespace crimson {
}


bool remove_by_req_filter(std::function<bool(const R&)> filter,
bool visit_backwards = false) {
struct Sink {
void push_back(const R& v) {} // do nothing
};
static Sink my_sink;
return remove_by_req_filter(filter, my_sink, visit_backwards);
}


template<typename Collect>
bool remove_by_req_filter(std::function<bool(const R&)> filter,
Collect& out,
bool remove_by_req_filter(std::function<bool(const R&)> filter_accum,
bool visit_backwards = false) {
bool any_removed = false;
DataGuard g(data_mtx);
for (auto i : client_map) {
bool modified =
i.second->remove_by_req_filter(filter, out, visit_backwards);
i.second->remove_by_req_filter(filter_accum, visit_backwards);
if (modified) {
resv_heap.adjust(*i.second);
limit_heap.adjust(*i.second);
Expand All @@ -471,29 +453,33 @@ namespace crimson {
}


void remove_by_client(const C& client) {
struct Sink {
void push_back(const R& v) {}
};
static Sink my_sink;
remove_by_client(client, my_sink);
// use as a default value when no accumulator is provide
static void request_sink(const R& req) {
// do nothing
}


// Collect must support calls to push_back(R), such as
// std::list<R>.
template<typename Collect>
void remove_by_client(const C& client, Collect& out) {
void remove_by_client(const C& client,
bool reverse = false,
std::function<void (const R&)> accum = request_sink) {
DataGuard g(data_mtx);

auto i = client_map.find(client);

if (i == client_map.end()) return;

for (auto j = i->second->requests.begin();
j != i->second->requests.end();
++j) {
out.push_back(*j->request);
if (reverse) {
for (auto j = i->second->requests.rbegin();
j != i->second->requests.rend();
++j) {
accum(*j->request);
}
} else {
for (auto j = i->second->requests.begin();
j != i->second->requests.end();
++j) {
accum(*j->request);
}
}

i->second->requests.clear();
Expand Down
128 changes: 119 additions & 9 deletions test/test_dmclock_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,16 @@ namespace crimson {
EXPECT_EQ(5, pq.request_count());

std::list<MyReq> capture;
pq.remove_by_req_filter([](const MyReq& r) -> bool {return 0 == r.id % 2;},
capture);
pq.remove_by_req_filter(
[&capture] (const MyReq& r) -> bool {
if (0 == r.id % 2) {
capture.push_front(r);
return true;
} else {
return false;
}
},
true);

EXPECT_EQ(0, pq.request_count());
EXPECT_EQ(5, capture.size());
Expand All @@ -276,7 +284,89 @@ namespace crimson {
} // TEST


TEST(dmclock_server, remove_by_req_filter_ordering) {
TEST(dmclock_server, remove_by_req_filter_ordering_forwards_visit) {
struct MyReq {
int id;

MyReq(int _id) :
id(_id)
{
// empty
}
}; // MyReq

using ClientId = int;
using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;

ClientId client1 = 17;

dmc::ClientInfo info1(0.0, 1.0, 0.0);

auto client_info_f = [&] (ClientId c) -> dmc::ClientInfo {
return info1;
};

Queue pq(client_info_f, true);

EXPECT_EQ(0, pq.client_count());
EXPECT_EQ(0, pq.request_count());

ReqParams req_params(1,1);

pq.add_request(MyReq(1), client1, req_params);
pq.add_request(MyReq(2), client1, req_params);
pq.add_request(MyReq(3), client1, req_params);
pq.add_request(MyReq(4), client1, req_params);
pq.add_request(MyReq(5), client1, req_params);
pq.add_request(MyReq(6), client1, req_params);

EXPECT_EQ(1, pq.client_count());
EXPECT_EQ(6, pq.request_count());

// remove odd ids in forward order and append to end

std::vector<MyReq> capture;
pq.remove_by_req_filter(
[&capture] (const MyReq& r) -> bool {
if (1 == r.id % 2) {
capture.push_back(r);
return true;
} else {
return false;
}
},
false);

EXPECT_EQ(3, pq.request_count());
EXPECT_EQ(3, capture.size());
EXPECT_EQ(1, capture[0].id) << "items should come out in forward order";
EXPECT_EQ(3, capture[1].id) << "items should come out in forward order";
EXPECT_EQ(5, capture[2].id) << "items should come out in forward order";

// remove even ids in reverse order but insert at front so comes
// out forwards

std::vector<MyReq> capture2;
pq.remove_by_req_filter(
[&capture2] (const MyReq& r) -> bool {
if (0 == r.id % 2) {
capture2.insert(capture2.begin(), r);
return true;
} else {
return false;
}
},
false);

EXPECT_EQ(0, pq.request_count());
EXPECT_EQ(3, capture2.size());
EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
EXPECT_EQ(4, capture2[1].id) << "items should come out in reverse order";
EXPECT_EQ(2, capture2[2].id) << "items should come out in reverse order";
} // TEST


TEST(dmclock_server, remove_by_req_filter_ordering_backwards_visit) {
struct MyReq {
int id;

Expand Down Expand Up @@ -318,8 +408,16 @@ namespace crimson {
// now remove odd ids in forward order

std::vector<MyReq> capture;
pq.remove_by_req_filter([](const MyReq& r) -> bool {return 1 == r.id % 2;},
capture);
pq.remove_by_req_filter(
[&capture] (const MyReq& r) -> bool {
if (1 == r.id % 2) {
capture.insert(capture.begin(), r);
return true;
} else {
return false;
}
},
true);

EXPECT_EQ(3, pq.request_count());
EXPECT_EQ(3, capture.size());
Expand All @@ -330,9 +428,17 @@ namespace crimson {
// now remove even ids in reverse order

std::vector<MyReq> capture2;
pq.remove_by_req_filter([](const MyReq& r) -> bool {return 0 == r.id % 2;},
capture2,
true);
pq.remove_by_req_filter(
[&capture2] (const MyReq& r) -> bool {
if (0 == r.id % 2) {
capture2.push_back(r);
return true;
} else {
return false;
}
},
true);

EXPECT_EQ(0, pq.request_count());
EXPECT_EQ(3, capture2.size());
EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
Expand Down Expand Up @@ -386,7 +492,11 @@ namespace crimson {

std::list<MyReq> removed;

pq.remove_by_client(client1, removed);
pq.remove_by_client(client1,
true,
[&removed] (const MyReq& r) {
removed.push_front(r);
});

EXPECT_EQ(3, removed.size());
EXPECT_EQ(1, removed.front().id);
Expand Down

0 comments on commit 1335fda

Please sign in to comment.