From 4e199900f512b56334a1dbc406e96d7220fac092 Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Mon, 30 May 2016 00:08:21 +0900 Subject: [PATCH] Solved strange bug in map, now its working perfectly --- Makefile.am | 1 + src/common/dl_loader.cc | 2 +- src/mapreduce/dataset.cc | 8 +- src/mapreduce/executor.cc | 21 ++-- src/mapreduce/nodes/peermr.cc | 169 ++++++++++++++++++++------------ src/mapreduce/nodes/peermr.h | 16 ++- src/mapreduce/nodes/remotemr.cc | 14 +-- src/mapreduce/nodes/remotemr.h | 1 + src/messages/boost_impl.cc | 1 + src/messages/boost_impl.hh | 16 ++- src/messages/task.cc | 2 - src/messages/task.hh | 12 +-- src/messages/taskstatus.cc | 6 ++ src/messages/taskstatus.hh | 15 +++ src/network/asyncchannel.cc | 2 +- src/targets/wc.cc | 36 ++----- 16 files changed, 197 insertions(+), 125 deletions(-) create mode 100644 src/messages/taskstatus.cc create mode 100644 src/messages/taskstatus.hh diff --git a/Makefile.am b/Makefile.am index 057342f..04fbf59 100644 --- a/Makefile.am +++ b/Makefile.am @@ -35,6 +35,7 @@ messages_files = src/messages/boundaries.cc \ src/mapreduce/messages/iblockinfo.cc \ src/mapreduce/messages/key_value_shuffle.cc \ src/mapreduce/messages/finish_shuffle.cc \ + src/messages/taskstatus.cc \ src/messages/fileexist.cc # libs ----- diff --git a/src/common/dl_loader.cc b/src/common/dl_loader.cc index 4c54e36..3fdd71b 100644 --- a/src/common/dl_loader.cc +++ b/src/common/dl_loader.cc @@ -25,7 +25,7 @@ maptype DL_loader::load_function (std::string fun) { reinterpret_cast(dlsym(lib, fun.c_str())); char* err = dlerror(); - if (err) throw std::runtime_error("Symbol not found"); + if (err) throw std::runtime_error("DL_LOADER: Symbol not found"); return func_; } // }}} diff --git a/src/mapreduce/dataset.cc b/src/mapreduce/dataset.cc index 6b4924d..385d608 100644 --- a/src/mapreduce/dataset.cc +++ b/src/mapreduce/dataset.cc @@ -48,9 +48,11 @@ DataSet::DataSet (uint32_t id_) : { find_local_master(); auto ep = find_local_master(); - std::random_device rd; - std::uniform_int_distribution dist(std::numeric_limits::max()); - job_id = dist(rd); + std::mt19937 rng; + rng.seed(std::random_device()()); + std::uniform_int_distribution dist(1, + std::numeric_limits::max()); + job_id = dist(rng); cout << "[CLIENT] submitting Job id: " << job_id << endl; socket.connect(*ep); diff --git a/src/mapreduce/executor.cc b/src/mapreduce/executor.cc index 4ffec18..32e09ac 100644 --- a/src/mapreduce/executor.cc +++ b/src/mapreduce/executor.cc @@ -37,7 +37,8 @@ bool Executor::run_map (messages::Task* m, std::string input) { stringstream ss (input); while (!ss.eof()) { - char next_line [256] = {0}; //! :TODO: change to DFS line limit + char* next_line = new char[256]; //! :TODO: change to DFS line limit + bzero(next_line, 256); ss.getline (next_line, 256); if (strnlen(next_line, 256) == 0) continue; @@ -47,6 +48,7 @@ bool Executor::run_map (messages::Task* m, std::string input) { auto key = key_value.first; auto hash_key = h(key.c_str()); auto& value = key_value.second; + context.logger->info ("Generated value: %s -> %s", next_line, value.c_str()); KeyValueShuffle kv; kv.job_id_ = m->job_id; @@ -54,8 +56,8 @@ bool Executor::run_map (messages::Task* m, std::string input) { kv.key_ = key; kv.value_ = value; peer->process(&kv); + delete next_line; } - peer->finish_map(m->job_id); return true; } @@ -78,7 +80,6 @@ bool Executor::run_reduce (messages::Task* task) { try { IReader ireader; - //ireader.set_net_id(0); ireader.set_job_id(task->job_id); ireader.set_map_id(task->map_id); ireader.set_reducer_id(0); @@ -88,20 +89,20 @@ bool Executor::run_reduce (messages::Task* task) { string key; ireader.get_next_key(key); - bool is_first_iteration = true; + int total_iterations = 0; string last_output; + if (ireader.is_next_value()) + ireader.get_next_value(last_output); + while (ireader.is_next_value()) { string value; ireader.get_next_value(value); - if (is_first_iteration) - ireader.get_next_value(last_output); - - else - last_output = _reducer_ (value, last_output); + last_output = _reducer_ (value, last_output); - is_first_iteration = false; + total_iterations++; } + context.logger->info ("Key %s #iterations: %i", key.c_str(), total_iterations); FileInfo fi; fi.file_name = key; diff --git a/src/mapreduce/nodes/peermr.cc b/src/mapreduce/nodes/peermr.cc index 765fe07..f4cffb7 100644 --- a/src/mapreduce/nodes/peermr.cc +++ b/src/mapreduce/nodes/peermr.cc @@ -27,56 +27,20 @@ void PeerMR::process_map_block (string ignoreme, string block, Task* task) { else reply.message = "MAPFAILED"; - if (leader_node == id) { - - } else { - network->send (leader_node, &reply); - } + remaining_maps--; + if (remaining_maps == 0) + notify_map_leader(task); } // }}} // process_map_file {{{ -bool PeerMR::process_map_file (messages::Task* m) { - auto file = m->input_path; - //m->job_id = job_ids++; - FileInfo fi; - fi.num_block = 0; - - directory.select_file_metadata(file, &fi); - - int num_blocks = fi.num_block; - if (num_blocks == 0) return false; +bool PeerMR::process_map_file (messages::Task* m, std::function f) { + task_callbacks[m->job_id] = f; - int maps_to_exec = 0; - for (int i = 0; i< num_blocks; i++) { - BlockInfo bi; - directory.select_block_metadata (file, i, &bi); - auto block_node = boundaries->get_index(bi.block_hash_key); - if (block_node == id) - maps_to_exec++; - } - current_maps = maps_to_exec; - - for (int i = 0; i< num_blocks; i++) { - BlockInfo bi; - directory.select_block_metadata (file, i, &bi); - auto block_name = bi.block_name; - auto hash_key = bi.block_hash_key; - m->block_name = bi.block_name; - m->block_hash_key = hash_key; + if (is_leader(m->input_path)) + map_leader(m); + else + map_follower(m); - auto block_node = boundaries->get_index(hash_key); - - if (block_node == id) { - request(hash_key, bi.block_name, std::bind( - &PeerMR::process_map_block, this, - std::placeholders::_1, - std::placeholders::_2, m)); - - } else { - logger->info ("Forwaring Map task to %d", block_node); - network->send (block_node, m); - } - } return true; } // }}} @@ -105,25 +69,26 @@ template<> void PeerMR::process(KeyValueShuffle *kv_shuffle) { // process FinishShuffle {{{ template<> void PeerMR::process(FinishShuffle *msg) { logger->info (" I got Finish shuffle"); - try { - const uint32_t job_id = msg->job_id_; - auto it = iwriters_.find(job_id); - if (it != iwriters_.end()) { - it->second->finalize(); - iwriters_.erase(it); + + //Make sure all the nodes have finished shuffling + try { + const uint32_t job_id = msg->job_id_; + auto it = iwriters_.find(job_id); + if (it != iwriters_.end()) { + it->second->finalize(); + iwriters_.erase(it); + } + } catch (std::exception& e) { + logger->error ("Iwriter exception"); } - } catch (std::exception& e) { - logger->error ("Iwriter exception"); - } + if (task_callbacks.find(msg->job_id_) != task_callbacks.end()) + task_callbacks[msg->job_id_](); } // }}} // process Task {{{ template<> void PeerMR::process(Task* m) { if (m->get_type_task() == "MAP") { - request(m->block_hash_key, m->block_name, std::bind( - &PeerMR::process_map_block, this, - std::placeholders::_1, - std::placeholders::_2, m)); + map_follower(m); } else { auto map_id = m->map_id; @@ -148,6 +113,18 @@ template<> void PeerMR::process(Task* m) { } } // }}} +// process TaskStatus {{{ +template<> void PeerMR::process(messages::TaskStatus* m) { + logger->info ("I got a Task status: %d jobid: %u", m->is_success, m->job_id); + if (m->is_success) { + remaining_follower_map_nodes--; + } + + if (remaining_follower_map_nodes == 0) { + finish_map(m->job_id); + } +} +// }}} // on_read {{{ void PeerMR::on_read(messages::Message *msg, int) { std::string type = msg->get_type(); @@ -162,6 +139,10 @@ void PeerMR::on_read(messages::Message *msg, int) { auto task_ = dynamic_cast(msg); process(task_); + } else if (type == "TaskStatus") { + auto task_ = dynamic_cast(msg); + process(task_); + } else { PeerDFS::on_read(msg, 0); } @@ -239,8 +220,6 @@ void PeerMR::receive_kv(messages::KeyValueShuffle *kv_shuffle) { // }}} // finish_map {{{ void PeerMR::finish_map (int job_id_) { - current_maps--; - if (current_maps == 0) { FinishShuffle fs; fs.job_id_ = job_id_; @@ -250,7 +229,6 @@ void PeerMR::finish_map (int job_id_) { } } process(&fs); - } } // }}} // process_reduce {{{ @@ -263,10 +241,79 @@ bool PeerMR::process_reduce (messages::Task* m) { process(m); } // }}} +// map_leader {{{ +void PeerMR::map_leader (messages::Task* m) { + auto file = m->input_path; + FileInfo fi; + fi.num_block = 0; + + directory.select_file_metadata(file, &fi); + + int num_blocks = fi.num_block; + if (num_blocks == 0) return; + + map tasks; + for (int i = 0; i< num_blocks; i++) { + BlockInfo bi; + directory.select_block_metadata (file, i, &bi); + auto block_name = bi.block_name; + auto hash_key = bi.block_hash_key; + + auto block_node = boundaries->get_index(hash_key); + tasks.insert({block_node, *m}); + tasks[block_node].blocks.push_back({hash_key, block_name}); + } + remaining_follower_map_nodes = tasks.size(); + logger->info ("%d nodes will run maps", remaining_follower_map_nodes); + + for (auto& task : tasks) { + if (task.first == id) { + map_follower(&task.second); + + } else { + logger->info ("Forwaring Map task to %d jobid:%" PRIu32, task.first, m->job_id); + network->send (task.first, &task.second); + } + } +} +// }}} +// map_follower {{{ +void PeerMR::map_follower (messages::Task* m) { + logger->info ("Executing map jobid:%d", m->job_id); + remaining_maps = m->blocks.size(); + for (auto& block : m->blocks) + request(block.first, block.second, std::bind( + &PeerMR::process_map_block, this, + std::placeholders::_1, + std::placeholders::_2, m)); + +} +// }}} // format {{{ bool PeerMR::format () { PeerDFS::format(); directory.init_db(); } // }}} +// is_leader {{{ +bool PeerMR::is_leader(std::string f) { + return (id == (h(f) % nodes.size())); +} +// }}} +// notify_map_leader {{{ +void PeerMR::notify_map_leader (messages::Task* m) { + auto leader_node = h(m->input_path) % nodes.size(); + + TaskStatus ts; + ts.is_success = true; + ts.job_id = m->job_id; + + if (leader_node == id) { + process(&ts); + + } else { + network->send(leader_node, &ts); + } +} +// }}} } // namespace eclipse diff --git a/src/mapreduce/nodes/peermr.h b/src/mapreduce/nodes/peermr.h index 9620b80..22c7ef5 100644 --- a/src/mapreduce/nodes/peermr.h +++ b/src/mapreduce/nodes/peermr.h @@ -35,19 +35,27 @@ class PeerMR: public PeerDFS { IBlockInfo request_iblock(messages::IBlockInfoRequest *iblock_info_request); void write_key_value(messages::KeyValueShuffle *key_value); void receive_kv(messages::KeyValueShuffle *kv_shuffle); + template void process(T); + bool format (); + void process_map_block (std::string, std::string, messages::Task*); - bool process_map_file (messages::Task*); + bool process_map_file (messages::Task*, std::function); + void map_leader (messages::Task*); + void map_follower (messages::Task*); bool process_reduce (messages::Task*); - template void process(T); void finish_map (int); - bool format (); protected: + bool is_leader(std::string); + void notify_map_leader (messages::Task*); + uint32_t net_size_; uint32_t job_ids = 0; - uint32_t current_maps = 0; + uint32_t remaining_follower_map_nodes = 0; + uint32_t remaining_maps = 0; DirectoryMR directory; + std::unordered_map> task_callbacks; std::unordered_map> iwriters_; }; diff --git a/src/mapreduce/nodes/remotemr.cc b/src/mapreduce/nodes/remotemr.cc index 182fbd3..469ae34 100644 --- a/src/mapreduce/nodes/remotemr.cc +++ b/src/mapreduce/nodes/remotemr.cc @@ -44,21 +44,23 @@ void RemoteMR::map (messages::Message* _m) { bool ret; if (m->get_type_task() == "MAP") { - ret = peer->process_map_file(m); + ret = peer->process_map_file(m, std::bind(&RemoteMR::reply_map, this, m)); } else if (m->get_type_task() == "REDUCE") { ret = peer->process_reduce(m); + Reply reply; + reply.message = "OK"; + network->send(0, &reply); } else if (m->get_type_task() == "COUNT") { //ret = peer->process_count(m); } +} + +void RemoteMR::reply_map (messages::Message* _m) { Reply reply; - if (ret) { - reply.message = "OK"; - } else { - reply.message = "FAIL"; - } + reply.message = "OK"; network->send(0, &reply); } // }}} diff --git a/src/mapreduce/nodes/remotemr.h b/src/mapreduce/nodes/remotemr.h index bcddd4d..fa54118 100644 --- a/src/mapreduce/nodes/remotemr.h +++ b/src/mapreduce/nodes/remotemr.h @@ -18,6 +18,7 @@ class RemoteMR: public RemoteDFS { void request_iblock(messages::Message*); void shuffle(messages::Message*); void map (messages::Message*); + void reply_map (messages::Message*); protected: PeerMR* peer; diff --git a/src/messages/boost_impl.cc b/src/messages/boost_impl.cc index c44cf18..b48937f 100644 --- a/src/messages/boost_impl.cc +++ b/src/messages/boost_impl.cc @@ -29,3 +29,4 @@ BOOST_CLASS_EXPORT(eclipse::messages::IBlockInfoRequest); BOOST_CLASS_EXPORT(eclipse::messages::FileExist); BOOST_CLASS_EXPORT(eclipse::messages::KeyValueShuffle); BOOST_CLASS_EXPORT(eclipse::messages::FinishShuffle); +BOOST_CLASS_EXPORT(eclipse::messages::TaskStatus); diff --git a/src/messages/boost_impl.hh b/src/messages/boost_impl.hh index 97fa3c9..3a9f50a 100644 --- a/src/messages/boost_impl.hh +++ b/src/messages/boost_impl.hh @@ -32,6 +32,7 @@ #include "../mapreduce/messages/key_value_shuffle.h" #include "../mapreduce/messages/finish_shuffle.h" #include "fileexist.hh" +#include "taskstatus.hh" #include #include @@ -39,6 +40,7 @@ #include #include #include +#include #ifndef BASE_OBJECT #define BASE_OBJECT(X,Y) \ @@ -112,16 +114,13 @@ template template void serialize (Archive& ar, eclipse::messages::Task& c, unsigned int) { ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.id); ar & BOOST_SERIALIZATION_NVP(c.type); ar & BOOST_SERIALIZATION_NVP(c.library); ar & BOOST_SERIALIZATION_NVP(c.input_path); ar & BOOST_SERIALIZATION_NVP(c.func_name); ar & BOOST_SERIALIZATION_NVP(c.job_id); ar & BOOST_SERIALIZATION_NVP(c.map_id); - ar & BOOST_SERIALIZATION_NVP(c.output); - ar & BOOST_SERIALIZATION_NVP(c.block_name); - ar & BOOST_SERIALIZATION_NVP(c.block_hash_key); + ar & BOOST_SERIALIZATION_NVP(c.blocks); } template void serialize (Archive& ar, eclipse::messages::FileList& c, unsigned int) { @@ -260,6 +259,14 @@ template ar & BOOST_SERIALIZATION_NVP(c.map_id_); } +template + void serialize(Archive& ar, eclipse::messages::TaskStatus& c, + unsigned int) { + ar & BASE_OBJECT(Message, c); + ar & BOOST_SERIALIZATION_NVP(c.is_success); + ar & BOOST_SERIALIZATION_NVP(c.job_id); + } + } // namespace messages } // namespace eclipse @@ -294,6 +301,7 @@ BOOST_CLASS_TRACKING(ECNS::IBlockInfoRequest, TRACK_NEVER); BOOST_CLASS_TRACKING(ECNS::FileExist, TRACK_NEVER); BOOST_CLASS_TRACKING(ECNS::KeyValueShuffle, TRACK_NEVER); BOOST_CLASS_TRACKING(ECNS::FinishShuffle, TRACK_NEVER); +BOOST_CLASS_TRACKING(ECNS::TaskStatus, TRACK_NEVER); #undef ECNS #undef TRACK_NEVER diff --git a/src/messages/task.cc b/src/messages/task.cc index c6e53f7..beee1aa 100644 --- a/src/messages/task.cc +++ b/src/messages/task.cc @@ -5,14 +5,12 @@ namespace messages { std::string Task::get_type() const { return "Task"; } -Task& Task::set_id(int i) { id = i; return *this; } Task& Task::set_type(int i) { type = i; return *this; } Task& Task::set_input_path(std::string i) { input_path = i; return *this; } -int Task::get_id() { return id; } std::string Task::get_type_task() { return type;} std::string Task::get_input_path() { return input_path; } diff --git a/src/messages/task.hh b/src/messages/task.hh index 156e6ce..e575c7c 100644 --- a/src/messages/task.hh +++ b/src/messages/task.hh @@ -1,6 +1,8 @@ #pragma once #include "message.hh" +#include +#include //#include "mapreduce/task.hh" namespace eclipse { @@ -9,21 +11,17 @@ namespace messages { struct Task: public Message { std::string get_type() const override; - Task& set_id(int); Task& set_type(int); Task& set_input_path(std::string); - int get_id(); std::string get_type_task(); std::string get_input_path(); - int id, map_id; - uint32_t job_id; + int map_id = 0; + uint32_t job_id = 0; std::string type; std::string library, func_name, input_path; - std::string output; - std::string block_name; - uint32_t block_hash_key; + std::vector> blocks; }; } diff --git a/src/messages/taskstatus.cc b/src/messages/taskstatus.cc new file mode 100644 index 0000000..0e95b77 --- /dev/null +++ b/src/messages/taskstatus.cc @@ -0,0 +1,6 @@ +#include "taskstatus.hh" +namespace eclipse { +namespace messages { +std::string TaskStatus::get_type() const { return "TaskStatus"; } +} +} diff --git a/src/messages/taskstatus.hh b/src/messages/taskstatus.hh new file mode 100644 index 0000000..bbc3e21 --- /dev/null +++ b/src/messages/taskstatus.hh @@ -0,0 +1,15 @@ +#pragma once +#include "message.hh" + +namespace eclipse { +namespace messages { + +struct TaskStatus: public Message { + std::string get_type() const override; + + uint32_t job_id = 0; + bool is_success = false; +}; + +} /* messages */ +} diff --git a/src/network/asyncchannel.cc b/src/network/asyncchannel.cc index 90a3277..b0828ea 100644 --- a/src/network/asyncchannel.cc +++ b/src/network/asyncchannel.cc @@ -76,7 +76,7 @@ void AsyncChannel::on_write (const boost::system::error_code& ec, // do_read {{{ void AsyncChannel::do_read () { logger->info("Connection established, starting to read"); - spawn(iosvc, bind(&AsyncChannel::read_coroutine, this, _1)); + spawn(iosvc, boost::bind(&AsyncChannel::read_coroutine, this, _1)); } // }}} // read_coroutine {{{ diff --git a/src/targets/wc.cc b/src/targets/wc.cc index 82012c1..f781558 100644 --- a/src/targets/wc.cc +++ b/src/targets/wc.cc @@ -2,49 +2,33 @@ #include #include +#include using namespace eclipse; using namespace std; extern "C" { - pair myfunc (string); + pair mymapper(string); string myreducer (string, string); } -pair myfunc (string a) { +pair mymapper(string a) { - int total = 0; - char *p = new char[a.length()]; - strncpy (p, a.c_str(), a.length()); - p = strtok(p, " "); - while (p) { - if (p[0] != '\n' or strlen(p) == 0) - total++; - p = strtok(NULL, " "); - } + std::stringstream stream(a); + std::string oneWord; + unsigned int count = 0; - delete p; + while(stream >> oneWord) { ++count;} - auto output = to_string(total); - return {"Total", output}; + return {"Total", to_string(count)}; } string myreducer (string a, string b) { - auto a_ = atoi (a.c_str()); - auto b_ = atoi (b.c_str()); - - auto out = to_string(a_ + b_); - - return out; + return to_string(stoi(a) + stoi(b)); } int main (int argc, char** argv) { DataSet& A = DataSet::open(argv[1]); -// DataSet& B = DataSet::open("FileB"); - -// cout << B.count() << endl; -// cout << B.filter("^Total\s").count() << endl; - - A.map("myfunc"); + A.map("mymapper"); A.reduce("myreducer"); }