Skip to content

Commit

Permalink
Merge pull request #20 from DICL/feature-shuffling
Browse files Browse the repository at this point in the history
MapReduce is working!
  • Loading branch information
vicentebolea committed May 16, 2016
2 parents c58a1e3 + a744a41 commit 9cd19a5
Show file tree
Hide file tree
Showing 25 changed files with 270 additions and 437 deletions.
6 changes: 3 additions & 3 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include tests/Makefile.am
AM_CPPFLAGS = -I@srcdir@/src/common -I@srcdir@/src -include ./config.h
AM_CXXFLAGS = -Wall -std=c++14 -ggdb3

bin_PROGRAMS = eclipse_node dfs eclipse_app
bin_PROGRAMS = eclipse_node dfs eclipse_wc

messages_files = src/messages/boundaries.cc \
src/messages/message.cc \
Expand Down Expand Up @@ -81,10 +81,10 @@ dfs_SOURCES = src/fs/dfs.cc \

dfs_LDADD = $(LDADD) -lsqlite3

eclipse_app_SOURCES = src/targets/sample_app.cc \
eclipse_wc_SOURCES = src/targets/wc.cc \
$(messages_files)

eclipse_app_LDADD = $(LDADD) -lsqlite3
eclipse_wc_LDADD = $(LDADD) -lsqlite3

pkginclude_HEADERS = src/common/ecfs.hh src/common/settings.hh

Expand Down
10 changes: 10 additions & 0 deletions src/common/dl_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ maptype DL_loader::load_function (std::string fun) {
return func_;
}
// }}}
// load_function {{{
reducetype DL_loader::load_function_reduce (std::string fun) {
reducetype func_ =
reinterpret_cast<reducetype>(dlsym(lib, fun.c_str()));
char* err = dlerror();

if (err) throw std::runtime_error("Symbol not found");
return func_;
}
// }}}
void DL_loader::close() {
dlclose(lib);
}
2 changes: 2 additions & 0 deletions src/common/dl_loader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#include <utility>

typedef std::pair<std::string, std::string>(*maptype)(std::string);
typedef std::string(*reducetype)(std::string, std::string);
class DL_loader {
public:
DL_loader(std::string);
~DL_loader();

bool init_lib ();
maptype load_function (std::string);
reducetype load_function_reduce (std::string);
void close();


Expand Down
13 changes: 13 additions & 0 deletions src/mapreduce/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,21 @@ DataSet& DataSet::map (std::string func) {
map_task.library = "libfoo.so";
map_task.func_name = func;
map_task.input_path = file;
map_task.type = "MAP";

send_message(&socket, &map_task);
auto reply = read_reply (&socket);
return *(new DataSet(2131231));
}

DataSet& DataSet::reduce (std::string func) {
Task task;
task.library = "libfoo.so";
task.func_name = func;
task.input_path = file;
task.type = "REDUCE";

send_message(&socket, &task);
auto reply = read_reply (&socket);
return *(new DataSet(2131231));
}
1 change: 1 addition & 0 deletions src/mapreduce/dataset.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ using boost::asio::ip::tcp;
class DataSet {
public:
DataSet& map(std::string);
DataSet& reduce(std::string);
static DataSet& open (std::string);

private:
Expand Down
75 changes: 74 additions & 1 deletion src/mapreduce/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include "../common/dl_loader.hh"
#include "../common/hash.hh"
#include "../mapreduce/messages/key_value_shuffle.h"
#include "../mapreduce/fs/ireader.h"
#include "../messages/keyvalue.hh"

#include <string>
#include <sstream>
Expand Down Expand Up @@ -35,6 +37,9 @@ bool Executor::run_map (messages::Task* m, std::string input) {
while (!ss.eof()) {
char next_line [256] = {0}; //! :TODO: change to DFS line limit
ss.getline (next_line, 256);
if (strnlen(next_line, 256) == 0)
continue;

pair<string, string> key_value = _map_ (string(next_line));

auto key = key_value.first;
Expand All @@ -43,13 +48,81 @@ bool Executor::run_map (messages::Task* m, std::string input) {

KeyValueShuffle kv;
kv.job_id_ = 0;
kv.map_id_ = 0;
kv.key_ = key;
kv.value_ = value;
peer->process(&kv);
//peer->insert (hash_key, key, value);
}
peer->finish_map(0);

return true;
}
// }}}
// run_reduce {{{
bool Executor::run_reduce (messages::Task* task) {
auto path_lib = context.settings.get<string>("path.applications");
path_lib += ("/" + task->library);
DL_loader loader (path_lib);

try {
loader.init_lib();
} catch (std::exception& e) {
context.logger->error ("Not found library path[%s]", path_lib.c_str());
}

function<string(string,string)> _reducer_ =
loader.load_function_reduce(task->func_name);

IReader ireader;
ireader.set_net_id(0);
ireader.set_job_id(task->job_id);
ireader.set_map_id(task->map_id);
ireader.set_reducer_id(0);
ireader.init();

while (ireader.is_next_key()) {
string key;
ireader.get_next_key(key);

bool is_first_iteration = true;
string last_output;
while (ireader.is_next_value()) {
string value;
ireader.get_next_value(value);

if (is_first_iteration)
ireader.get_next_value(last_output);

else
last_output = _reducer_ (value, last_output);

is_first_iteration = false;
}

FileInfo fi;
fi.file_name = key;
fi.num_block = 1;
fi.file_size = last_output.length();
fi.file_hash_key = h(key.c_str());

BlockInfo bi;
bi.file_name = key;
bi.block_name = key + "_0";
bi.block_seq = 0;
bi.block_hash_key = h(bi.block_name);
bi.block_size = last_output.length();
bi.content = last_output;

dynamic_cast<PeerDFS*>(peer)->process(&fi);
dynamic_cast<PeerDFS*>(peer)->insert_block(&bi);


// KeyValue kv;
// kv.key = h("output");
// kv.name = "output";
// kv.value = last_output;
dynamic_cast<PeerDFS*>(peer)->insert(h("output"), "output", last_output);
}
}
// }}}
} /* eclipse */
2 changes: 2 additions & 0 deletions src/mapreduce/executor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
namespace eclipse {
class Executor {
typedef std::pair<string, string> (*maptype)(std::string);
typedef std::string (*reducetype)(std::string, std::string);
public:
Executor (PeerMR*);
~Executor ();

bool run_map (messages::Task*, std::string);
bool run_reduce (messages::Task*);

protected:
PeerMR* peer;
Expand Down
2 changes: 1 addition & 1 deletion src/mapreduce/fs/ireader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ using boost::asio::ip::tcp;
namespace eclipse {

IReader::IReader() {
scratch_path_ = context.settings.get<string>("path.scratch");
scratch_path_ = context.settings.get<string>("path.idata");
num_finished_ = 0;
is_next_key_ = true;
is_next_value_ = true;
Expand Down
2 changes: 1 addition & 1 deletion src/mapreduce/fs/iwriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace eclipse {
IWriter::IWriter() {
reduce_slot_ = context.settings.get<int>("mapreduce.reduce_slot");
iblock_size_ = context.settings.get<int>("mapreduce.iblock_size");
scratch_path_ = context.settings.get<string>("path.scratch");
scratch_path_ = context.settings.get<string>("path.idata");
is_write_start_ = false;
is_write_finish_ = false;
index_counter_ = 0;
Expand Down
10 changes: 0 additions & 10 deletions src/mapreduce/messages/boost_impl_mr.cc

This file was deleted.

102 changes: 0 additions & 102 deletions src/mapreduce/messages/boost_impl_mr.hh

This file was deleted.

Loading

0 comments on commit 9cd19a5

Please sign in to comment.