Skip to content

Commit

Permalink
Solved strange bug in map, now its working perfectly
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentebolea committed May 29, 2016
1 parent 675d6a5 commit 4e19990
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 125 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ messages_files = src/messages/boundaries.cc \
src/mapreduce/messages/iblockinfo.cc \
src/mapreduce/messages/key_value_shuffle.cc \
src/mapreduce/messages/finish_shuffle.cc \
src/messages/taskstatus.cc \
src/messages/fileexist.cc

# libs -----
Expand Down
2 changes: 1 addition & 1 deletion src/common/dl_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ maptype DL_loader::load_function (std::string fun) {
reinterpret_cast<maptype>(dlsym(lib, fun.c_str()));
char* err = dlerror();

if (err) throw std::runtime_error("Symbol not found");
if (err) throw std::runtime_error("DL_LOADER: Symbol not found");
return func_;
}
// }}}
Expand Down
8 changes: 5 additions & 3 deletions src/mapreduce/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ DataSet::DataSet (uint32_t id_) :
{
find_local_master();
auto ep = find_local_master();
std::random_device rd;
std::uniform_int_distribution<uint32_t> dist(std::numeric_limits<uint32_t>::max());
job_id = dist(rd);
std::mt19937 rng;
rng.seed(std::random_device()());
std::uniform_int_distribution<std::mt19937::result_type> dist(1,
std::numeric_limits<uint32_t>::max());
job_id = dist(rng);
cout << "[CLIENT] submitting Job id: " << job_id << endl;

socket.connect(*ep);
Expand Down
21 changes: 11 additions & 10 deletions src/mapreduce/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ bool Executor::run_map (messages::Task* m, std::string input) {
stringstream ss (input);

while (!ss.eof()) {
char next_line [256] = {0}; //! :TODO: change to DFS line limit
char* next_line = new char[256]; //! :TODO: change to DFS line limit
bzero(next_line, 256);
ss.getline (next_line, 256);
if (strnlen(next_line, 256) == 0)
continue;
Expand All @@ -47,15 +48,16 @@ bool Executor::run_map (messages::Task* m, std::string input) {
auto key = key_value.first;
auto hash_key = h(key.c_str());
auto& value = key_value.second;
context.logger->info ("Generated value: %s -> %s", next_line, value.c_str());

KeyValueShuffle kv;
kv.job_id_ = m->job_id;
kv.map_id_ = 0;
kv.key_ = key;
kv.value_ = value;
peer->process(&kv);
delete next_line;
}
peer->finish_map(m->job_id);

return true;
}
Expand All @@ -78,7 +80,6 @@ bool Executor::run_reduce (messages::Task* task) {
try {

IReader ireader;
//ireader.set_net_id(0);
ireader.set_job_id(task->job_id);
ireader.set_map_id(task->map_id);
ireader.set_reducer_id(0);
Expand All @@ -88,20 +89,20 @@ bool Executor::run_reduce (messages::Task* task) {
string key;
ireader.get_next_key(key);

bool is_first_iteration = true;
int total_iterations = 0;
string last_output;
if (ireader.is_next_value())
ireader.get_next_value(last_output);

while (ireader.is_next_value()) {
string value;
ireader.get_next_value(value);

if (is_first_iteration)
ireader.get_next_value(last_output);

else
last_output = _reducer_ (value, last_output);
last_output = _reducer_ (value, last_output);

is_first_iteration = false;
total_iterations++;
}
context.logger->info ("Key %s #iterations: %i", key.c_str(), total_iterations);

FileInfo fi;
fi.file_name = key;
Expand Down
169 changes: 108 additions & 61 deletions src/mapreduce/nodes/peermr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,56 +27,20 @@ void PeerMR::process_map_block (string ignoreme, string block, Task* task) {
else
reply.message = "MAPFAILED";

if (leader_node == id) {

} else {
network->send (leader_node, &reply);
}
remaining_maps--;
if (remaining_maps == 0)
notify_map_leader(task);
}
// }}}
// process_map_file {{{
bool PeerMR::process_map_file (messages::Task* m) {
auto file = m->input_path;
//m->job_id = job_ids++;
FileInfo fi;
fi.num_block = 0;

directory.select_file_metadata(file, &fi);

int num_blocks = fi.num_block;
if (num_blocks == 0) return false;
bool PeerMR::process_map_file (messages::Task* m, std::function<void(void)> f) {
task_callbacks[m->job_id] = f;

int maps_to_exec = 0;
for (int i = 0; i< num_blocks; i++) {
BlockInfo bi;
directory.select_block_metadata (file, i, &bi);
auto block_node = boundaries->get_index(bi.block_hash_key);
if (block_node == id)
maps_to_exec++;
}
current_maps = maps_to_exec;

for (int i = 0; i< num_blocks; i++) {
BlockInfo bi;
directory.select_block_metadata (file, i, &bi);
auto block_name = bi.block_name;
auto hash_key = bi.block_hash_key;
m->block_name = bi.block_name;
m->block_hash_key = hash_key;
if (is_leader(m->input_path))
map_leader(m);
else
map_follower(m);

auto block_node = boundaries->get_index(hash_key);

if (block_node == id) {
request(hash_key, bi.block_name, std::bind(
&PeerMR::process_map_block, this,
std::placeholders::_1,
std::placeholders::_2, m));

} else {
logger->info ("Forwaring Map task to %d", block_node);
network->send (block_node, m);
}
}
return true;
}
// }}}
Expand Down Expand Up @@ -105,25 +69,26 @@ template<> void PeerMR::process(KeyValueShuffle *kv_shuffle) {
// process FinishShuffle {{{
template<> void PeerMR::process(FinishShuffle *msg) {
logger->info (" I got Finish shuffle");
try {
const uint32_t job_id = msg->job_id_;
auto it = iwriters_.find(job_id);
if (it != iwriters_.end()) {
it->second->finalize();
iwriters_.erase(it);

//Make sure all the nodes have finished shuffling
try {
const uint32_t job_id = msg->job_id_;
auto it = iwriters_.find(job_id);
if (it != iwriters_.end()) {
it->second->finalize();
iwriters_.erase(it);
}
} catch (std::exception& e) {
logger->error ("Iwriter exception");
}
} catch (std::exception& e) {
logger->error ("Iwriter exception");
}
if (task_callbacks.find(msg->job_id_) != task_callbacks.end())
task_callbacks[msg->job_id_]();
}
// }}}
// process Task {{{
template<> void PeerMR::process(Task* m) {
if (m->get_type_task() == "MAP") {
request(m->block_hash_key, m->block_name, std::bind(
&PeerMR::process_map_block, this,
std::placeholders::_1,
std::placeholders::_2, m));
map_follower(m);

} else {
auto map_id = m->map_id;
Expand All @@ -148,6 +113,18 @@ template<> void PeerMR::process(Task* m) {
}
}
// }}}
// process TaskStatus {{{
template<> void PeerMR::process(messages::TaskStatus* m) {
logger->info ("I got a Task status: %d jobid: %u", m->is_success, m->job_id);
if (m->is_success) {
remaining_follower_map_nodes--;
}

if (remaining_follower_map_nodes == 0) {
finish_map(m->job_id);
}
}
// }}}
// on_read {{{
void PeerMR::on_read(messages::Message *msg, int) {
std::string type = msg->get_type();
Expand All @@ -162,6 +139,10 @@ void PeerMR::on_read(messages::Message *msg, int) {
auto task_ = dynamic_cast<Task*>(msg);
process(task_);

} else if (type == "TaskStatus") {
auto task_ = dynamic_cast<TaskStatus*>(msg);
process(task_);

} else {
PeerDFS::on_read(msg, 0);
}
Expand Down Expand Up @@ -239,8 +220,6 @@ void PeerMR::receive_kv(messages::KeyValueShuffle *kv_shuffle) {
// }}}
// finish_map {{{
void PeerMR::finish_map (int job_id_) {
current_maps--;
if (current_maps == 0) {
FinishShuffle fs;
fs.job_id_ = job_id_;

Expand All @@ -250,7 +229,6 @@ void PeerMR::finish_map (int job_id_) {
}
}
process(&fs);
}
}
// }}}
// process_reduce {{{
Expand All @@ -263,10 +241,79 @@ bool PeerMR::process_reduce (messages::Task* m) {
process(m);
}
// }}}
// map_leader {{{
void PeerMR::map_leader (messages::Task* m) {
auto file = m->input_path;
FileInfo fi;
fi.num_block = 0;

directory.select_file_metadata(file, &fi);

int num_blocks = fi.num_block;
if (num_blocks == 0) return;

map<int, Task> tasks;
for (int i = 0; i< num_blocks; i++) {
BlockInfo bi;
directory.select_block_metadata (file, i, &bi);
auto block_name = bi.block_name;
auto hash_key = bi.block_hash_key;

auto block_node = boundaries->get_index(hash_key);
tasks.insert({block_node, *m});
tasks[block_node].blocks.push_back({hash_key, block_name});
}
remaining_follower_map_nodes = tasks.size();
logger->info ("%d nodes will run maps", remaining_follower_map_nodes);

for (auto& task : tasks) {
if (task.first == id) {
map_follower(&task.second);

} else {
logger->info ("Forwaring Map task to %d jobid:%" PRIu32, task.first, m->job_id);
network->send (task.first, &task.second);
}
}
}
// }}}
// map_follower {{{
void PeerMR::map_follower (messages::Task* m) {
logger->info ("Executing map jobid:%d", m->job_id);
remaining_maps = m->blocks.size();
for (auto& block : m->blocks)
request(block.first, block.second, std::bind(
&PeerMR::process_map_block, this,
std::placeholders::_1,
std::placeholders::_2, m));

}
// }}}
// format {{{
bool PeerMR::format () {
PeerDFS::format();
directory.init_db();
}
// }}}
// is_leader {{{
bool PeerMR::is_leader(std::string f) {
return (id == (h(f) % nodes.size()));
}
// }}}
// notify_map_leader {{{
void PeerMR::notify_map_leader (messages::Task* m) {
auto leader_node = h(m->input_path) % nodes.size();

TaskStatus ts;
ts.is_success = true;
ts.job_id = m->job_id;

if (leader_node == id) {
process(&ts);

} else {
network->send(leader_node, &ts);
}
}
// }}}
} // namespace eclipse
16 changes: 12 additions & 4 deletions src/mapreduce/nodes/peermr.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,27 @@ class PeerMR: public PeerDFS {
IBlockInfo request_iblock(messages::IBlockInfoRequest *iblock_info_request);
void write_key_value(messages::KeyValueShuffle *key_value);
void receive_kv(messages::KeyValueShuffle *kv_shuffle);
template<typename T> void process(T);
bool format ();

void process_map_block (std::string, std::string, messages::Task*);
bool process_map_file (messages::Task*);
bool process_map_file (messages::Task*, std::function<void(void)>);
void map_leader (messages::Task*);
void map_follower (messages::Task*);
bool process_reduce (messages::Task*);
template<typename T> void process(T);

void finish_map (int);
bool format ();

protected:
bool is_leader(std::string);
void notify_map_leader (messages::Task*);

uint32_t net_size_;
uint32_t job_ids = 0;
uint32_t current_maps = 0;
uint32_t remaining_follower_map_nodes = 0;
uint32_t remaining_maps = 0;
DirectoryMR directory;
std::unordered_map<uint32_t, std::function<void(void)>> task_callbacks;
std::unordered_map<uint32_t, std::shared_ptr<IWriter_interface>> iwriters_;
};

Expand Down
14 changes: 8 additions & 6 deletions src/mapreduce/nodes/remotemr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,23 @@ void RemoteMR::map (messages::Message* _m) {
bool ret;

if (m->get_type_task() == "MAP") {
ret = peer->process_map_file(m);
ret = peer->process_map_file(m, std::bind(&RemoteMR::reply_map, this, m));

} else if (m->get_type_task() == "REDUCE") {
ret = peer->process_reduce(m);
Reply reply;
reply.message = "OK";
network->send(0, &reply);

} else if (m->get_type_task() == "COUNT") {
//ret = peer->process_count(m);
}

}

void RemoteMR::reply_map (messages::Message* _m) {
Reply reply;
if (ret) {
reply.message = "OK";
} else {
reply.message = "FAIL";
}
reply.message = "OK";
network->send(0, &reply);
}
// }}}
Expand Down
1 change: 1 addition & 0 deletions src/mapreduce/nodes/remotemr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class RemoteMR: public RemoteDFS {
void request_iblock(messages::Message*);
void shuffle(messages::Message*);
void map (messages::Message*);
void reply_map (messages::Message*);

protected:
PeerMR* peer;
Expand Down
Loading

0 comments on commit 4e19990

Please sign in to comment.