From a28b5649be3a23e8648f3d283b75afcf3376320e Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Wed, 11 May 2016 20:18:08 +0900 Subject: [PATCH 1/4] First commit --- src/mapreduce/executor.cc | 1 + src/mapreduce/messages/boost_impl_mr.cc | 10 -- src/mapreduce/messages/boost_impl_mr.hh | 102 ------------------- src/mapreduce/nodes/peermr.cc | 14 +++ src/mapreduce/nodes/peermr.h | 2 + src/mapreduce/peermr.cc | 126 ------------------------ src/mapreduce/peermr.hh | 40 -------- src/mapreduce/remotemr.cc | 110 --------------------- src/mapreduce/remotemr.hh | 25 ----- src/network/asyncchannel.cc | 8 +- 10 files changed, 22 insertions(+), 416 deletions(-) delete mode 100644 src/mapreduce/messages/boost_impl_mr.cc delete mode 100644 src/mapreduce/messages/boost_impl_mr.hh delete mode 100644 src/mapreduce/peermr.cc delete mode 100644 src/mapreduce/peermr.hh delete mode 100644 src/mapreduce/remotemr.cc delete mode 100644 src/mapreduce/remotemr.hh diff --git a/src/mapreduce/executor.cc b/src/mapreduce/executor.cc index edb405d..f962b2a 100644 --- a/src/mapreduce/executor.cc +++ b/src/mapreduce/executor.cc @@ -48,6 +48,7 @@ bool Executor::run_map (messages::Task* m, std::string input) { peer->process(&kv); //peer->insert (hash_key, key, value); } + peer->finish_map(0); return true; } diff --git a/src/mapreduce/messages/boost_impl_mr.cc b/src/mapreduce/messages/boost_impl_mr.cc deleted file mode 100644 index ca61999..0000000 --- a/src/mapreduce/messages/boost_impl_mr.cc +++ /dev/null @@ -1,10 +0,0 @@ -#include "boost_impl_mr.hh" - -BOOST_CLASS_EXPORT(eclipse::messages::IDataInsert); -BOOST_CLASS_EXPORT(eclipse::messages::IGroupInsert); -BOOST_CLASS_EXPORT(eclipse::messages::IBlockInsert); -BOOST_CLASS_EXPORT(eclipse::messages::IDataInfoRequest); -BOOST_CLASS_EXPORT(eclipse::messages::IGroupInfoRequest); -BOOST_CLASS_EXPORT(eclipse::messages::IBlockInfoRequest); -BOOST_CLASS_EXPORT(eclipse::messages::KeyValueShuffle); -BOOST_CLASS_EXPORT(eclipse::messages::FinishShuffle); diff --git a/src/mapreduce/messages/boost_impl_mr.hh b/src/mapreduce/messages/boost_impl_mr.hh deleted file mode 100644 index 05effea..0000000 --- a/src/mapreduce/messages/boost_impl_mr.hh +++ /dev/null @@ -1,102 +0,0 @@ -#pragma once -#include "../../messages/boost_impl.hh" -#include "idatainsert.hh" -#include "igroupinsert.hh" -#include "iblockinsert.hh" -#include "idatainforequest.hh" -#include "igroupinforequest.hh" -#include "iblockinforequest.hh" - -namespace eclipse { -namespace messages { - -template - void serialize(Archive& ar, eclipse::messages::IDataInsert& c, unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id); - ar & BOOST_SERIALIZATION_NVP(c.map_id); - ar & BOOST_SERIALIZATION_NVP(c.num_reducer); - } - -template - void serialize(Archive& ar, eclipse::messages::IGroupInsert& c, unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id); - ar & BOOST_SERIALIZATION_NVP(c.map_id); - ar & BOOST_SERIALIZATION_NVP(c.reducer_id); - ar & BOOST_SERIALIZATION_NVP(c.num_block); - } - -template - void serialize(Archive& ar, eclipse::messages::IBlockInsert& c, unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id); - ar & BOOST_SERIALIZATION_NVP(c.map_id); - ar & BOOST_SERIALIZATION_NVP(c.reducer_id); - ar & BOOST_SERIALIZATION_NVP(c.block_seq); - } - -template - void serialize(Archive& ar, eclipse::messages::IDataInfoRequest& c, - unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id); - ar & BOOST_SERIALIZATION_NVP(c.map_id); - } - -template - void serialize(Archive& ar, eclipse::messages::IGroupInfoRequest& c, - unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id); - ar & BOOST_SERIALIZATION_NVP(c.map_id); - ar & BOOST_SERIALIZATION_NVP(c.reducer_id); - } - -template - void serialize(Archive& ar, eclipse::messages::IBlockInfoRequest& c, - unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id); - ar & BOOST_SERIALIZATION_NVP(c.map_id); - ar & BOOST_SERIALIZATION_NVP(c.reducer_id); - ar & BOOST_SERIALIZATION_NVP(c.block_seq); - } - -template - void serialize(Archive& ar, eclipse::messages::KeyValueShuffle& c, - unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id_); - ar & BOOST_SERIALIZATION_NVP(c.map_id_); - ar & BOOST_SERIALIZATION_NVP(c.key_); - ar & BOOST_SERIALIZATION_NVP(c.value_); - } - -template - void serialize(Archive& ar, eclipse::messages::FinishShuffle& c, - unsigned int) { - ar & BASE_OBJECT(Message, c); - ar & BOOST_SERIALIZATION_NVP(c.job_id); - ar & BOOST_SERIALIZATION_NVP(c.map_id); - } - -} // namespace messages -} // namespace eclipse - -BOOST_CLASS_TRACKING(eclipse::messages::IDataInfo, - boost::serialization::track_never); -BOOST_CLASS_TRACKING(eclipse::messages::IGroupInfo, - boost::serialization::track_never); -BOOST_CLASS_TRACKING(eclipse::messages::IBlockInfo, - boost::serialization::track_never); -BOOST_CLASS_TRACKING(eclipse::messages::IDataInfoRequest, - boost::serialization::track_never); -BOOST_CLASS_TRACKING(eclipse::messages::IGroupInfoRequest, - boost::serialization::track_never); -BOOST_CLASS_TRACKING(eclipse::messages::IBlockInfoRequest, - boost::serialization::track_never); -BOOST_CLASS_TRACKING(eclipse::messages::KeyValueShuffle, - boost::serialization::track_never); -BOOST_CLASS_TRACKING(eclipse::messages::FinishShuffle, - boost::serialization::track_never); diff --git a/src/mapreduce/nodes/peermr.cc b/src/mapreduce/nodes/peermr.cc index 6def3c5..2280d14 100644 --- a/src/mapreduce/nodes/peermr.cc +++ b/src/mapreduce/nodes/peermr.cc @@ -105,6 +105,7 @@ template<> void PeerMR::process(KeyValueShuffle *kv_shuffle) { // }}} // process FinishShuffle {{{ template<> void PeerMR::process(FinishShuffle *msg) { + logger->info (" I got Finish shuffle"); const uint32_t job_id = msg->job_id_; auto it = iwriters_.find(job_id); if (it != iwriters_.end()) { @@ -197,4 +198,17 @@ void PeerMR::receive_kv(messages::KeyValueShuffle *kv_shuffle) { process(kv_shuffle); } // }}} +// finish_map +void PeerMR::finish_map (int job_id_) { + FinishShuffle fs; + fs.job_id_ = job_id_; + fs.map_id_ = 0; + + for (uint8_t i = 0; i < net_size_; i++) { + if (i != id) { + network->send(i, &fs); + } + } +} + } // namespace eclipse diff --git a/src/mapreduce/nodes/peermr.h b/src/mapreduce/nodes/peermr.h index 0cbe7bd..319f2e7 100644 --- a/src/mapreduce/nodes/peermr.h +++ b/src/mapreduce/nodes/peermr.h @@ -39,6 +39,8 @@ class PeerMR: public PeerDFS { bool process_map_file (messages::Task*); template void process(T); + void finish_map (int); + protected: uint32_t net_size_; DirectoryMR directory; diff --git a/src/mapreduce/peermr.cc b/src/mapreduce/peermr.cc deleted file mode 100644 index d2d0d5b..0000000 --- a/src/mapreduce/peermr.cc +++ /dev/null @@ -1,126 +0,0 @@ -#include "peermr.hh" -#include "../messages/message.hh" -#include "../messages/boost_impl.hh" -#include "../messages/idatainsert.hh" -#include "../messages/igroupinsert.hh" -#include "../messages/iblockinsert.hh" -#include "../messages/idatainforequest.hh" -#include "../messages/igroupinforequest.hh" -#include "../messages/iblockinforequest.hh" -#include "../messages/idatainfo.hh" -#include "../messages/igroupinfo.hh" -#include "../messages/iblockinfo.hh" -#include "executor.hh" - -namespace eclipse { - -PeerMR::PeerMR(Context &context): PeerDFS(context) { - directory.init_db(); -} -PeerMR::~PeerMR() { -} - -// process_map_block {{{ -void PeerMR::process_map_block (string ignoreme, string block, Task* task) { - auto leader_node = h(task->input_path) % 3; - Reply reply; - - logger->info ("Executing map"); -// //Executor exec(this); -// -// if (exec.run_map(task, block)) -// reply.message = "MAPDONE"; -// else -// reply.message = "MAPFAILED"; - - network->send (leader_node, &reply); -} -// }}} -// process_map_file {{{ -bool PeerMR::process_map_file (messages::Task* m) { - auto file = m->input_path; - FileInfo fi; - fi.num_block = 0; - - directory.select_file_metadata(file, &fi); - - int num_blocks = fi.num_block; - if (num_blocks == 0) return false; - - for (int i = 0; i< num_blocks; i++) { - BlockInfo bi; - directory.select_block_metadata (file, i, &bi); - auto block_name = bi.block_name; - auto hash_key = bi.block_hash_key; - - auto block_node = boundaries->get_index(hash_key); - - if (block_node == id) { - request(hash_key, block_name, std::bind( - &PeerMR::process_map_block, this, - std::placeholders::_1, - std::placeholders::_2, m)); - - } else { - network->send (block_node, m); - } - } - return true; -} -// }}} -//// process_map_dataset {{{ -//void PeerMR::process_map_dataset (messages::Task* m) { -// vec_str dataset = task.dataset; -// -// for (auto& file : dataset) { -// which_node = disk_boundaries->get_index(h(file)); -// if (which_node == id) { -// process_map_file(m); -// -// } else { -// network->send (which_node, m); -// } -// } -//} -// }}} -// {{{ -bool PeerMR::insert_idata(messages::IDataInsert *msg) { - directory.insert_idata_metadata(*msg); - logger->info("Saving to SQLite db"); - return true; -} -bool PeerMR::insert_igroup(messages::IGroupInsert *msg) { - directory.insert_igroup_metadata(*msg); - logger->info("Saving to SQLite db"); - return true; -} -bool PeerMR::insert_iblock(messages::IBlockInsert *msg) { - directory.insert_iblock_metadata(*msg); - logger->info("Saving to SQLite db"); - return true; -} -IDataInfo PeerMR::request_idata(messages::IDataInfoRequest - *idata_info_request) { - IDataInfo idata_info; - directory.select_idata_metadata(idata_info_request->job_id, - idata_info_request->map_id, &idata_info); - return idata_info; -} -IGroupInfo PeerMR::request_igroup(messages::IGroupInfoRequest - *igroup_info_request) { - IGroupInfo igroup_info; - directory.select_igroup_metadata(igroup_info_request->job_id, - igroup_info_request->map_id, igroup_info_request->reducer_id, - &igroup_info); - return igroup_info; -} -IBlockInfo PeerMR::request_iblock(messages::IBlockInfoRequest - *iblock_info_request) { - IBlockInfo iblock_info; - directory.select_iblock_metadata(iblock_info_request->job_id, - iblock_info_request->map_id, iblock_info_request->reducer_id, - iblock_info_request->block_seq, &iblock_info); - return iblock_info; -} -// }}} -} // namespace eclipse diff --git a/src/mapreduce/peermr.hh b/src/mapreduce/peermr.hh deleted file mode 100644 index 8fe845c..0000000 --- a/src/mapreduce/peermr.hh +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef ECLIPSEMR_NODES_PEERMR_H_ -#define ECLIPSEMR_NODES_PEERMR_H_ -#include "../nodes/peerdfs.hh" -#include "fs/directorymr.hh" -#include "../messages/message.hh" -#include "../messages/idatainsert.hh" -#include "../messages/igroupinsert.hh" -#include "../messages/iblockinsert.hh" -#include "../messages/idatainforequest.hh" -#include "../messages/igroupinforequest.hh" -#include "../messages/iblockinforequest.hh" -#include "../messages/idatainfo.hh" -#include "../messages/igroupinfo.hh" -#include "../messages/iblockinfo.hh" -#include "../messages/task.hh" - -namespace eclipse { - -class PeerMR: public PeerDFS { - public: - PeerMR(Context &context); - ~PeerMR(); - - bool insert_idata(messages::IDataInsert *msg); - bool insert_igroup(messages::IGroupInsert *msg); - bool insert_iblock(messages::IBlockInsert *msg); - IDataInfo request_idata(messages::IDataInfoRequest *idata_info_request); - IGroupInfo request_igroup(messages::IGroupInfoRequest *igroup_info_request); - IBlockInfo request_iblock(messages::IBlockInfoRequest *iblock_info_request); - - void process_map_block (std::string, std::string, messages::Task*); - bool process_map_file (messages::Task*); - //void process_map_dataset (messages::Task*); - - protected: - DirectoryMR directory; -}; - -} // namespace eclipse -#endif // ECLIPSEMR_NODES_PEERMR_H_ diff --git a/src/mapreduce/remotemr.cc b/src/mapreduce/remotemr.cc deleted file mode 100644 index 577ded7..0000000 --- a/src/mapreduce/remotemr.cc +++ /dev/null @@ -1,110 +0,0 @@ -#include "remotemr.hh" -#include "../messages/boost_impl.hh" -#include - -namespace eclipse { -using namespace messages; -namespace ph = std::placeholders; -using std::bind; - -// Constructor {{{ -RemoteMR::RemoteMR(Context &c): RemoteDFS(c) { - auto& rt = routing_table; - rt.insert({"IBlockInsert", bind(&RemoteMR::insert_idata, this, ph::_1)}); - rt.insert({"IGroupInsert", bind(&RemoteMR::insert_igroup, this, ph::_1)}); - rt.insert({"IBlockInsert", bind(&RemoteMR::insert_iblock, this, ph::_1)}); - rt.insert({"IBlockInfoRequest", bind(&RemoteMR::request_idata, this, ph::_1)}); - rt.insert({"IGroupInfoRequest", bind(&RemoteMR::request_igroup, this, ph::_1)}); - rt.insert({"IBlockInfoRequest", bind(&RemoteMR::request_iblock, this, ph::_1)}); - rt.insert({"Task", bind(&RemoteMR::map, this, ph::_1)}); -} -// }}} -// establish {{{ -bool RemoteMR::establish() { - peer = make_unique (context); - peer_mr = dynamic_cast (peer.get()); - peer_mr->establish(); - Router::establish(); - return true; -} -// }}} -// map {{{ -void RemoteMR::map (messages::Message* _m) { - auto m = dynamic_cast(_m); - logger->info("Task received."); - bool ret = peer_mr->process_map_file(m); - - Reply reply; - if (ret) { - reply.message = "OK"; - } else { - reply.message = "FAIL"; - } - network->send(0, &reply); -} -// }}} -// insert_idata {{{ -void RemoteMR::insert_idata(Message *msg) { - auto idata_insert = dynamic_cast(msg); - logger->info("IDataInsert received."); - bool ret = peer_mr->insert_idata(idata_insert); - Reply reply; - if (ret) { - reply.message = "OK"; - } else { - reply.message = "FAIL"; - } - network->send(0, &reply); -} -// }}} -// insert_iblock {{{ -void RemoteMR::insert_igroup(Message *msg) { - auto igroup_insert = dynamic_cast(msg); - logger->info("IGroupInsert received."); - bool ret = peer_mr->insert_igroup(igroup_insert); - Reply reply; - if (ret) { - reply.message = "OK"; - } else { - reply.message = "FAIL"; - } - network->send(0, &reply); -} -// }}} -// insert_iblock {{{ -void RemoteMR::insert_iblock(Message *msg) { - auto iblock_insert = dynamic_cast(msg); - logger->info("IBlockInsert received."); - bool ret = peer_mr->insert_iblock(iblock_insert); - Reply reply; - if (ret) { - reply.message = "OK"; - } else { - reply.message = "FAIL"; - } - network->send(0, &reply); -} -// }}} -// request_idata {{{ -void RemoteMR::request_idata(Message *msg) { - auto idata_info_request = dynamic_cast(msg); - auto idata_info = peer_mr->request_idata(idata_info_request); - network->send(0, &idata_info); -} -// }}} -// request_igroup {{{ -void RemoteMR::request_igroup(Message *msg) { - auto igroup_info_request = dynamic_cast(msg); - auto igroup_info = peer_mr->request_igroup(igroup_info_request); - network->send(0, &igroup_info); -} -// }}} -// request_iblock {{{ -void RemoteMR::request_iblock(Message *msg) { - auto iblock_info_request = dynamic_cast(msg); - auto iblock_info = peer_mr->request_iblock(iblock_info_request); - network->send(0, &iblock_info); -} -// }}} - -} // namespace eclipse diff --git a/src/mapreduce/remotemr.hh b/src/mapreduce/remotemr.hh deleted file mode 100644 index d451e0d..0000000 --- a/src/mapreduce/remotemr.hh +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once -#include "peermr.hh" -#include "../nodes/remotedfs.hh" -#include "../messages/message.hh" - -namespace eclipse { - -class RemoteMR: public RemoteDFS { - public: - RemoteMR(Context &c); - - bool establish(); - void map (messages::Message*); - void insert_idata(messages::Message*); - void insert_igroup(messages::Message*); - void insert_iblock(messages::Message*); - void request_idata(messages::Message*); - void request_igroup(messages::Message*); - void request_iblock(messages::Message*); - - protected: - PeerMR* peer_mr; -}; - -} // namespace eclipse diff --git a/src/network/asyncchannel.cc b/src/network/asyncchannel.cc index 6c301af..25580ea 100644 --- a/src/network/asyncchannel.cc +++ b/src/network/asyncchannel.cc @@ -48,12 +48,14 @@ void AsyncChannel::do_write_impl (string* to_write) { // on_write {{{ void AsyncChannel::on_write (const boost::system::error_code& ec, size_t s, string* str) { - delete str; if (ec) { - logger->info ("Message %s, size: %d, could not reach err=%s", - str->c_str(), s, ec.message().c_str()); + logger->info ("Message , size: %d, could not reach err=%s", + s, ec.message().c_str()); do_write_impl(str); + + } else { + delete str; } } // }}} From 2b53ced68a8f0ef2fb79b02c3a7fc0ecac46b4dc Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Sun, 15 May 2016 19:47:30 +0900 Subject: [PATCH 2/4] MapReduce is working, I tested with an wordcount application --- Makefile.am | 6 +-- src/common/dl_loader.cc | 10 +++++ src/common/dl_loader.hh | 2 + src/mapreduce/dataset.cc | 13 ++++++ src/mapreduce/dataset.hh | 1 + src/mapreduce/executor.cc | 74 ++++++++++++++++++++++++++++++++- src/mapreduce/executor.hh | 2 + src/mapreduce/fs/ireader.cc | 2 +- src/mapreduce/fs/iwriter.cc | 2 +- src/mapreduce/nodes/peermr.cc | 47 +++++++++++++++++++-- src/mapreduce/nodes/peermr.h | 1 + src/mapreduce/nodes/remotemr.cc | 8 +++- src/messages/boost_impl.hh | 3 ++ src/messages/task.cc | 2 +- src/messages/task.hh | 13 ++---- src/nodes/peerdfs.cc | 16 +++++++ src/nodes/peerdfs.hh | 2 +- src/targets/sample_app.cc | 31 ++++++++++++-- 18 files changed, 210 insertions(+), 25 deletions(-) diff --git a/Makefile.am b/Makefile.am index 4711e2b..057342f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,7 +3,7 @@ include tests/Makefile.am AM_CPPFLAGS = -I@srcdir@/src/common -I@srcdir@/src -include ./config.h AM_CXXFLAGS = -Wall -std=c++14 -ggdb3 -bin_PROGRAMS = eclipse_node dfs eclipse_app +bin_PROGRAMS = eclipse_node dfs eclipse_wc messages_files = src/messages/boundaries.cc \ src/messages/message.cc \ @@ -81,10 +81,10 @@ dfs_SOURCES = src/fs/dfs.cc \ dfs_LDADD = $(LDADD) -lsqlite3 -eclipse_app_SOURCES = src/targets/sample_app.cc \ +eclipse_wc_SOURCES = src/targets/wc.cc \ $(messages_files) -eclipse_app_LDADD = $(LDADD) -lsqlite3 +eclipse_wc_LDADD = $(LDADD) -lsqlite3 pkginclude_HEADERS = src/common/ecfs.hh src/common/settings.hh diff --git a/src/common/dl_loader.cc b/src/common/dl_loader.cc index 68931d2..4c54e36 100644 --- a/src/common/dl_loader.cc +++ b/src/common/dl_loader.cc @@ -29,6 +29,16 @@ maptype DL_loader::load_function (std::string fun) { return func_; } // }}} +// load_function {{{ +reducetype DL_loader::load_function_reduce (std::string fun) { + reducetype func_ = + reinterpret_cast(dlsym(lib, fun.c_str())); + char* err = dlerror(); + + if (err) throw std::runtime_error("Symbol not found"); + return func_; +} +// }}} void DL_loader::close() { dlclose(lib); } diff --git a/src/common/dl_loader.hh b/src/common/dl_loader.hh index b05af6d..34cf84e 100644 --- a/src/common/dl_loader.hh +++ b/src/common/dl_loader.hh @@ -4,6 +4,7 @@ #include typedef std::pair(*maptype)(std::string); +typedef std::string(*reducetype)(std::string, std::string); class DL_loader { public: DL_loader(std::string); @@ -11,6 +12,7 @@ class DL_loader { bool init_lib (); maptype load_function (std::string); + reducetype load_function_reduce (std::string); void close(); diff --git a/src/mapreduce/dataset.cc b/src/mapreduce/dataset.cc index dc9f6fe..bd17b6a 100644 --- a/src/mapreduce/dataset.cc +++ b/src/mapreduce/dataset.cc @@ -69,8 +69,21 @@ DataSet& DataSet::map (std::string func) { map_task.library = "libfoo.so"; map_task.func_name = func; map_task.input_path = file; + map_task.type = "MAP"; send_message(&socket, &map_task); auto reply = read_reply (&socket); return *(new DataSet(2131231)); } + +DataSet& DataSet::reduce (std::string func) { + Task task; + task.library = "libfoo.so"; + task.func_name = func; + task.input_path = file; + task.type = "REDUCE"; + + send_message(&socket, &task); + auto reply = read_reply (&socket); + return *(new DataSet(2131231)); +} diff --git a/src/mapreduce/dataset.hh b/src/mapreduce/dataset.hh index 6138d90..ae0d4ac 100644 --- a/src/mapreduce/dataset.hh +++ b/src/mapreduce/dataset.hh @@ -9,6 +9,7 @@ using boost::asio::ip::tcp; class DataSet { public: DataSet& map(std::string); + DataSet& reduce(std::string); static DataSet& open (std::string); private: diff --git a/src/mapreduce/executor.cc b/src/mapreduce/executor.cc index f962b2a..f98978f 100644 --- a/src/mapreduce/executor.cc +++ b/src/mapreduce/executor.cc @@ -2,6 +2,8 @@ #include "../common/dl_loader.hh" #include "../common/hash.hh" #include "../mapreduce/messages/key_value_shuffle.h" +#include "../mapreduce/fs/ireader.h" +#include "../messages/keyvalue.hh" #include #include @@ -35,6 +37,9 @@ bool Executor::run_map (messages::Task* m, std::string input) { while (!ss.eof()) { char next_line [256] = {0}; //! :TODO: change to DFS line limit ss.getline (next_line, 256); + if (strnlen(next_line, 256) == 0) + continue; + pair key_value = _map_ (string(next_line)); auto key = key_value.first; @@ -43,14 +48,81 @@ bool Executor::run_map (messages::Task* m, std::string input) { KeyValueShuffle kv; kv.job_id_ = 0; + kv.map_id_ = 0; kv.key_ = key; kv.value_ = value; peer->process(&kv); - //peer->insert (hash_key, key, value); } peer->finish_map(0); return true; } // }}} +// run_reduce {{{ +bool Executor::run_reduce (messages::Task* task) { + auto path_lib = context.settings.get("path.applications"); + path_lib += ("/" + task->library); + DL_loader loader (path_lib); + + try { + loader.init_lib(); + } catch (std::exception& e) { + context.logger->error ("Not found library path[%s]", path_lib.c_str()); + } + + function _reducer_ = + loader.load_function_reduce(task->func_name); + + IReader ireader; + ireader.set_net_id(0); + ireader.set_job_id(task->job_id); + ireader.set_map_id(task->map_id); + ireader.set_reducer_id(0); + ireader.init(); + + while (ireader.is_next_key()) { + string key; + ireader.get_next_key(key); + + bool is_first_iteration = true; + string last_output; + while (ireader.is_next_value()) { + string value; + ireader.get_next_value(value); + + if (is_first_iteration) + ireader.get_next_value(last_output); + + else + last_output = _reducer_ (value, last_output); + + is_first_iteration = false; + } + + FileInfo fi; + fi.file_name = key; + fi.num_block = 1; + fi.file_size = last_output.length(); + fi.file_hash_key = h(key.c_str()); + + BlockInfo bi; + bi.file_name = key; + bi.block_name = key + "_0"; + bi.block_seq = 0; + bi.block_hash_key = h(bi.block_name); + bi.block_size = last_output.length(); + bi.content = last_output; + + dynamic_cast(peer)->process(&fi); + dynamic_cast(peer)->insert_block(&bi); + + +// KeyValue kv; + // kv.key = h("output"); + // kv.name = "output"; + // kv.value = last_output; + dynamic_cast(peer)->insert(h("output"), "output", last_output); + } +} +// }}} } /* eclipse */ diff --git a/src/mapreduce/executor.hh b/src/mapreduce/executor.hh index e176fd1..f2f0d5e 100644 --- a/src/mapreduce/executor.hh +++ b/src/mapreduce/executor.hh @@ -6,11 +6,13 @@ namespace eclipse { class Executor { typedef std::pair (*maptype)(std::string); + typedef std::string (*reducetype)(std::string, std::string); public: Executor (PeerMR*); ~Executor (); bool run_map (messages::Task*, std::string); + bool run_reduce (messages::Task*); protected: PeerMR* peer; diff --git a/src/mapreduce/fs/ireader.cc b/src/mapreduce/fs/ireader.cc index 36f1f34..65cf5e1 100644 --- a/src/mapreduce/fs/ireader.cc +++ b/src/mapreduce/fs/ireader.cc @@ -21,7 +21,7 @@ using boost::asio::ip::tcp; namespace eclipse { IReader::IReader() { - scratch_path_ = context.settings.get("path.scratch"); + scratch_path_ = context.settings.get("path.idata"); num_finished_ = 0; is_next_key_ = true; is_next_value_ = true; diff --git a/src/mapreduce/fs/iwriter.cc b/src/mapreduce/fs/iwriter.cc index a0df256..0ca18dd 100644 --- a/src/mapreduce/fs/iwriter.cc +++ b/src/mapreduce/fs/iwriter.cc @@ -40,7 +40,7 @@ namespace eclipse { IWriter::IWriter() { reduce_slot_ = context.settings.get("mapreduce.reduce_slot"); iblock_size_ = context.settings.get("mapreduce.iblock_size"); - scratch_path_ = context.settings.get("path.scratch"); + scratch_path_ = context.settings.get("path.idata"); is_write_start_ = false; is_write_finish_ = false; index_counter_ = 0; diff --git a/src/mapreduce/nodes/peermr.cc b/src/mapreduce/nodes/peermr.cc index 2280d14..4ae125f 100644 --- a/src/mapreduce/nodes/peermr.cc +++ b/src/mapreduce/nodes/peermr.cc @@ -16,7 +16,7 @@ PeerMR::PeerMR() { // }}} // process_map_block {{{ void PeerMR::process_map_block (string ignoreme, string block, Task* task) { - auto leader_node = h(task->input_path) % 3; + auto leader_node = h(task->input_path) % nodes.size(); Reply reply; logger->info ("Executing map"); @@ -114,6 +114,29 @@ template<> void PeerMR::process(FinishShuffle *msg) { } } // }}} +// process Task {{{ +template<> void PeerMR::process(Task* m) { + auto map_id = m->map_id; + auto job_id = m->job_id; + + IDataInfo di; + di.map_id = map_id; + di.job_id = job_id; + di.num_reducer = 0; + directory.select_idata_metadata(job_id, map_id, &di); + + if (di.num_reducer > 0) { //! Perform reduce operation + logger->info("Performing reduce operation"); + Executor exec(this); + Reply reply; + + if (exec.run_reduce(m)) + reply.message = "MAPDONE"; + else + reply.message = "MAPFAILED"; + } +} +// }}} // on_read {{{ void PeerMR::on_read(messages::Message *msg, int) { std::string type = msg->get_type(); @@ -123,6 +146,11 @@ void PeerMR::on_read(messages::Message *msg, int) { } else if (type == "FinishShuffle") { auto finish_shuffle = dynamic_cast(msg); process(finish_shuffle); + + } else if (type == "Task") { + auto task_ = dynamic_cast(msg); + process(task_); + } else { PeerDFS::on_read(msg, 0); } @@ -198,7 +226,7 @@ void PeerMR::receive_kv(messages::KeyValueShuffle *kv_shuffle) { process(kv_shuffle); } // }}} -// finish_map +// finish_map {{{ void PeerMR::finish_map (int job_id_) { FinishShuffle fs; fs.job_id_ = job_id_; @@ -209,6 +237,19 @@ void PeerMR::finish_map (int job_id_) { network->send(i, &fs); } } + process(&fs); } - +// }}} +// process_reduce {{{ +bool PeerMR::process_reduce (messages::Task* m) { + m->job_id = 0; + m->map_id = 0; + for (uint8_t i = 0; i < net_size_; i++) { + if (i != id) { + network->send(i, m); + } + } + process(m); +} +// }}} } // namespace eclipse diff --git a/src/mapreduce/nodes/peermr.h b/src/mapreduce/nodes/peermr.h index 319f2e7..4caadb5 100644 --- a/src/mapreduce/nodes/peermr.h +++ b/src/mapreduce/nodes/peermr.h @@ -37,6 +37,7 @@ class PeerMR: public PeerDFS { void receive_kv(messages::KeyValueShuffle *kv_shuffle); void process_map_block (std::string, std::string, messages::Task*); bool process_map_file (messages::Task*); + bool process_reduce (messages::Task*); template void process(T); void finish_map (int); diff --git a/src/mapreduce/nodes/remotemr.cc b/src/mapreduce/nodes/remotemr.cc index 0541760..c622801 100644 --- a/src/mapreduce/nodes/remotemr.cc +++ b/src/mapreduce/nodes/remotemr.cc @@ -41,7 +41,13 @@ bool RemoteMR::establish() { void RemoteMR::map (messages::Message* _m) { auto m = dynamic_cast(_m); logger->info("Task received."); - bool ret = peer->process_map_file(m); + bool ret; + + if (m->get_type_task() == "MAP") { + ret = peer->process_map_file(m); + } else { + ret = peer->process_reduce(m); + } Reply reply; if (ret) { diff --git a/src/messages/boost_impl.hh b/src/messages/boost_impl.hh index 604faba..80428f5 100644 --- a/src/messages/boost_impl.hh +++ b/src/messages/boost_impl.hh @@ -117,6 +117,9 @@ template ar & BOOST_SERIALIZATION_NVP(c.library); ar & BOOST_SERIALIZATION_NVP(c.input_path); ar & BOOST_SERIALIZATION_NVP(c.func_name); + ar & BOOST_SERIALIZATION_NVP(c.job_id); + ar & BOOST_SERIALIZATION_NVP(c.map_id); + ar & BOOST_SERIALIZATION_NVP(c.output); } template void serialize (Archive& ar, eclipse::messages::FileList& c, unsigned int) { diff --git a/src/messages/task.cc b/src/messages/task.cc index 41da9ec..c6e53f7 100644 --- a/src/messages/task.cc +++ b/src/messages/task.cc @@ -13,7 +13,7 @@ Task& Task::set_input_path(std::string i) { } int Task::get_id() { return id; } -int Task::get_type() { return type;} +std::string Task::get_type_task() { return type;} std::string Task::get_input_path() { return input_path; } //// serialize {{{ diff --git a/src/messages/task.hh b/src/messages/task.hh index f90100a..eae5363 100644 --- a/src/messages/task.hh +++ b/src/messages/task.hh @@ -6,13 +6,6 @@ namespace eclipse { namespace messages { -enum TASKTYPE { - OPEN = 0, - FLATMAP = 1, - MAP = 2, - REDUCE = 4 -}; - struct Task: public Message { std::string get_type() const override; @@ -21,11 +14,13 @@ struct Task: public Message { Task& set_input_path(std::string); int get_id(); - int get_type(); + std::string get_type_task(); std::string get_input_path(); - int id, type; + int id, job_id, map_id; + std::string type; std::string library, func_name, input_path; + std::string output; }; //eclipse::messages::Task* serialize (eclipse::Task*); diff --git a/src/nodes/peerdfs.cc b/src/nodes/peerdfs.cc index d87a939..12ff20c 100644 --- a/src/nodes/peerdfs.cc +++ b/src/nodes/peerdfs.cc @@ -156,6 +156,18 @@ template<> void PeerDFS::process (BlockDel* m) { remove(block_path.c_str()); } // }}} +// process (FileInfo* m) {{{ +template<> void PeerDFS::process (FileInfo* m) { + int which_node = m->file_hash_key % nodes.size(); + + if (which_node != id){ + network->send(which_node, m); + + } else { + insert_file(m); + } +} +// }}} // on_read (Message*) {{{ void PeerDFS::on_read (Message* m, int) { string type = m->get_type(); @@ -178,6 +190,10 @@ void PeerDFS::on_read (Message* m, int) { } else if (type == "BlockDel") { auto m_ = dynamic_cast(m); process(m_); + + } else if (type == "FileInfo") { + auto m_ = dynamic_cast(m); + process(m_); } } // }}} diff --git a/src/nodes/peerdfs.hh b/src/nodes/peerdfs.hh index e6ba6dc..3a1a9ae 100644 --- a/src/nodes/peerdfs.hh +++ b/src/nodes/peerdfs.hh @@ -45,6 +45,7 @@ class PeerDFS: public Node, public AsyncNode { bool format (); FileDescription request_file (messages::FileRequest*); bool file_exist (std::string); + template void process (T); protected: Directory directory; @@ -56,7 +57,6 @@ class PeerDFS: public Node, public AsyncNode { int replica; std::vector nodes; - template void process (T); }; } diff --git a/src/targets/sample_app.cc b/src/targets/sample_app.cc index 6c29178..4c36ba8 100644 --- a/src/targets/sample_app.cc +++ b/src/targets/sample_app.cc @@ -1,22 +1,45 @@ #include "../mapreduce/dataset.hh" #include +#include using namespace eclipse; using namespace std; extern "C" { pair myfunc (string); + string myreducer (string, string); } - pair myfunc (string a) { - auto len = a.length(); - auto len_str = to_string(len); + + int total = 0; + char *p = new char[a.length()]; + strncpy (p, a.c_str(), a.length()); + p = strtok(p, " "); + while (p) { + if (p[0] != '\n' or strlen(p) == 0) + total++; + p = strtok(NULL, " "); + } + + delete p; + + auto output = to_string(total); + return {"Total", output}; +} + +string myreducer (string a, string b) { + auto a_ = atoi (a.c_str()); + auto b_ = atoi (b.c_str()); + + auto out = to_string(a_ + b_); - return {"First", len_str }; + return out; } + int main (int argc, char** argv) { DataSet& A = DataSet::open(argv[1]); A.map("myfunc"); + A.reduce("myreducer"); } From 33db5ab52b74d0f5cdbfc2f1f8efca82dfbfdf6d Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Mon, 16 May 2016 15:35:26 +0900 Subject: [PATCH 3/4] DFS is now fast again --- src/nodes/peerdfs.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/nodes/peerdfs.cc b/src/nodes/peerdfs.cc index 12ff20c..3eba216 100644 --- a/src/nodes/peerdfs.cc +++ b/src/nodes/peerdfs.cc @@ -239,7 +239,6 @@ bool PeerDFS::insert_block (messages::BlockInfo* m) { } uint32_t tmp_hash_key = boundaries->random_within_boundaries(tmp_node); insert(tmp_hash_key, m->block_name, m->content); - sleep(1); } return true; } From a744a41eacf5e09b44d38011e8aa9dad72fcc294 Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Mon, 16 May 2016 15:38:09 +0900 Subject: [PATCH 4/4] Added wc --- src/targets/wc.cc | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 src/targets/wc.cc diff --git a/src/targets/wc.cc b/src/targets/wc.cc new file mode 100644 index 0000000..4c36ba8 --- /dev/null +++ b/src/targets/wc.cc @@ -0,0 +1,45 @@ +#include "../mapreduce/dataset.hh" + +#include +#include + +using namespace eclipse; +using namespace std; + +extern "C" { + pair myfunc (string); + string myreducer (string, string); +} + +pair myfunc (string a) { + + int total = 0; + char *p = new char[a.length()]; + strncpy (p, a.c_str(), a.length()); + p = strtok(p, " "); + while (p) { + if (p[0] != '\n' or strlen(p) == 0) + total++; + p = strtok(NULL, " "); + } + + delete p; + + auto output = to_string(total); + return {"Total", output}; +} + +string myreducer (string a, string b) { + auto a_ = atoi (a.c_str()); + auto b_ = atoi (b.c_str()); + + auto out = to_string(a_ + b_); + + return out; +} + +int main (int argc, char** argv) { + DataSet& A = DataSet::open(argv[1]); + A.map("myfunc"); + A.reduce("myreducer"); +}