Skip to content

Commit

Permalink
Merge pull request #21 from DICL/refactor-wc
Browse files Browse the repository at this point in the history
Debugged MapReduce
  • Loading branch information
vicentebolea committed May 29, 2016
2 parents 9cd19a5 + 4e19990 commit 7be58e6
Show file tree
Hide file tree
Showing 20 changed files with 280 additions and 245 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ messages_files = src/messages/boundaries.cc \
src/mapreduce/messages/iblockinfo.cc \
src/mapreduce/messages/key_value_shuffle.cc \
src/mapreduce/messages/finish_shuffle.cc \
src/messages/taskstatus.cc \
src/messages/fileexist.cc

# libs -----
Expand Down
2 changes: 1 addition & 1 deletion src/common/dl_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ maptype DL_loader::load_function (std::string fun) {
reinterpret_cast<maptype>(dlsym(lib, fun.c_str()));
char* err = dlerror();

if (err) throw std::runtime_error("Symbol not found");
if (err) throw std::runtime_error("DL_LOADER: Symbol not found");
return func_;
}
// }}}
Expand Down
21 changes: 17 additions & 4 deletions src/mapreduce/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
#include "../common/hash.hh"
#include <vector>
#include <iomanip>
#include <random>

using namespace eclipse;
using namespace std;
using namespace eclipse::messages;
using vec_str = std::vector<std::string>;

eclipse::messages::Reply* read_reply(tcp::socket* socket) {

template <typename T>
T* read_reply(tcp::socket* socket) {
char header[17] = {0};
header[16] = '\0';
socket->receive(boost::asio::buffer(header, 16));
Expand All @@ -21,7 +24,7 @@ eclipse::messages::Reply* read_reply(tcp::socket* socket) {
string recv_msg(body, size_of_msg);
eclipse::messages::Message* m = load_message(recv_msg);
delete[] body;
return dynamic_cast<eclipse::messages::Reply*>(m);
return dynamic_cast<T*>(m);
}

void send_message (tcp::socket* socket, eclipse::messages::Message* msg) {
Expand All @@ -45,6 +48,13 @@ DataSet::DataSet (uint32_t id_) :
{
find_local_master();
auto ep = find_local_master();
std::mt19937 rng;
rng.seed(std::random_device()());
std::uniform_int_distribution<std::mt19937::result_type> dist(1,
std::numeric_limits<uint32_t>::max());
job_id = dist(rng);
cout << "[CLIENT] submitting Job id: " << job_id << endl;

socket.connect(*ep);
}

Expand All @@ -70,9 +80,11 @@ DataSet& DataSet::map (std::string func) {
map_task.func_name = func;
map_task.input_path = file;
map_task.type = "MAP";
map_task.job_id = job_id;
map_task.map_id = 0;

send_message(&socket, &map_task);
auto reply = read_reply (&socket);
auto reply = read_reply<Reply> (&socket);
return *(new DataSet(2131231));
}

Expand All @@ -82,8 +94,9 @@ DataSet& DataSet::reduce (std::string func) {
task.func_name = func;
task.input_path = file;
task.type = "REDUCE";
task.job_id = job_id;

send_message(&socket, &task);
auto reply = read_reply (&socket);
auto reply = read_reply<Reply> (&socket);
return *(new DataSet(2131231));
}
3 changes: 2 additions & 1 deletion src/mapreduce/dataset.hh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class DataSet {

private:
DataSet (uint32_t);
int id;
int id = 0;
uint32_t job_id = 0;
std::string file;
tcp::endpoint* find_local_master();
boost::asio::io_service iosvc;
Expand Down
72 changes: 37 additions & 35 deletions src/mapreduce/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using namespace eclipse;
using namespace std;

uint32_t eclipse::Executor::map_ids(0);

namespace eclipse {
// Constructor {{{
Executor::Executor(PeerMR* p) : peer(p) { }
Expand All @@ -35,7 +37,8 @@ bool Executor::run_map (messages::Task* m, std::string input) {
stringstream ss (input);

while (!ss.eof()) {
char next_line [256] = {0}; //! :TODO: change to DFS line limit
char* next_line = new char[256]; //! :TODO: change to DFS line limit
bzero(next_line, 256);
ss.getline (next_line, 256);
if (strnlen(next_line, 256) == 0)
continue;
Expand All @@ -45,15 +48,16 @@ bool Executor::run_map (messages::Task* m, std::string input) {
auto key = key_value.first;
auto hash_key = h(key.c_str());
auto& value = key_value.second;
context.logger->info ("Generated value: %s -> %s", next_line, value.c_str());

KeyValueShuffle kv;
kv.job_id_ = 0;
kv.job_id_ = m->job_id;
kv.map_id_ = 0;
kv.key_ = key;
kv.value_ = value;
peer->process(&kv);
delete next_line;
}
peer->finish_map(0);

return true;
}
Expand All @@ -73,8 +77,9 @@ bool Executor::run_reduce (messages::Task* task) {
function<string(string,string)> _reducer_ =
loader.load_function_reduce(task->func_name);

try {

IReader ireader;
ireader.set_net_id(0);
ireader.set_job_id(task->job_id);
ireader.set_map_id(task->map_id);
ireader.set_reducer_id(0);
Expand All @@ -84,44 +89,41 @@ bool Executor::run_reduce (messages::Task* task) {
string key;
ireader.get_next_key(key);

bool is_first_iteration = true;
int total_iterations = 0;
string last_output;
if (ireader.is_next_value())
ireader.get_next_value(last_output);

while (ireader.is_next_value()) {
string value;
ireader.get_next_value(value);

if (is_first_iteration)
ireader.get_next_value(last_output);

else
last_output = _reducer_ (value, last_output);
last_output = _reducer_ (value, last_output);

is_first_iteration = false;
total_iterations++;
}

FileInfo fi;
fi.file_name = key;
fi.num_block = 1;
fi.file_size = last_output.length();
fi.file_hash_key = h(key.c_str());

BlockInfo bi;
bi.file_name = key;
bi.block_name = key + "_0";
bi.block_seq = 0;
bi.block_hash_key = h(bi.block_name);
bi.block_size = last_output.length();
bi.content = last_output;

dynamic_cast<PeerDFS*>(peer)->process(&fi);
dynamic_cast<PeerDFS*>(peer)->insert_block(&bi);


// KeyValue kv;
// kv.key = h("output");
// kv.name = "output";
// kv.value = last_output;
dynamic_cast<PeerDFS*>(peer)->insert(h("output"), "output", last_output);
context.logger->info ("Key %s #iterations: %i", key.c_str(), total_iterations);

FileInfo fi;
fi.file_name = key;
fi.num_block = 1;
fi.file_size = last_output.length();
fi.file_hash_key = h(key.c_str());

BlockInfo bi;
bi.file_name = key;
bi.block_name = key + "_0";
bi.block_seq = 0;
bi.block_hash_key = h(bi.block_name);
bi.block_size = last_output.length();
bi.content = last_output;

dynamic_cast<PeerDFS*>(peer)->process(&fi);
dynamic_cast<PeerDFS*>(peer)->insert_block(&bi);
}
} catch (std::exception& e) {
context.logger->error ("Error in the executer: %s", e.what());
exit(1);
}
}
// }}}
Expand Down
1 change: 1 addition & 0 deletions src/mapreduce/executor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Executor {

protected:
PeerMR* peer;
static uint32_t map_ids;

};

Expand Down
50 changes: 0 additions & 50 deletions src/mapreduce/isender.cc

This file was deleted.

32 changes: 0 additions & 32 deletions src/mapreduce/isender.h

This file was deleted.

Loading

0 comments on commit 7be58e6

Please sign in to comment.