Skip to content

Commit

Permalink
First
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentebolea committed May 23, 2016
1 parent 9cd19a5 commit 675d6a5
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 166 deletions.
19 changes: 15 additions & 4 deletions src/mapreduce/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
#include "../common/hash.hh"
#include <vector>
#include <iomanip>
#include <random>

using namespace eclipse;
using namespace std;
using namespace eclipse::messages;
using vec_str = std::vector<std::string>;

eclipse::messages::Reply* read_reply(tcp::socket* socket) {

template <typename T>
T* read_reply(tcp::socket* socket) {
char header[17] = {0};
header[16] = '\0';
socket->receive(boost::asio::buffer(header, 16));
Expand All @@ -21,7 +24,7 @@ eclipse::messages::Reply* read_reply(tcp::socket* socket) {
string recv_msg(body, size_of_msg);
eclipse::messages::Message* m = load_message(recv_msg);
delete[] body;
return dynamic_cast<eclipse::messages::Reply*>(m);
return dynamic_cast<T*>(m);
}

void send_message (tcp::socket* socket, eclipse::messages::Message* msg) {
Expand All @@ -45,6 +48,11 @@ DataSet::DataSet (uint32_t id_) :
{
find_local_master();
auto ep = find_local_master();
std::random_device rd;
std::uniform_int_distribution<uint32_t> dist(std::numeric_limits<uint32_t>::max());
job_id = dist(rd);
cout << "[CLIENT] submitting Job id: " << job_id << endl;

socket.connect(*ep);
}

Expand All @@ -70,9 +78,11 @@ DataSet& DataSet::map (std::string func) {
map_task.func_name = func;
map_task.input_path = file;
map_task.type = "MAP";
map_task.job_id = job_id;
map_task.map_id = 0;

send_message(&socket, &map_task);
auto reply = read_reply (&socket);
auto reply = read_reply<Reply> (&socket);
return *(new DataSet(2131231));
}

Expand All @@ -82,8 +92,9 @@ DataSet& DataSet::reduce (std::string func) {
task.func_name = func;
task.input_path = file;
task.type = "REDUCE";
task.job_id = job_id;

send_message(&socket, &task);
auto reply = read_reply (&socket);
auto reply = read_reply<Reply> (&socket);
return *(new DataSet(2131231));
}
3 changes: 2 additions & 1 deletion src/mapreduce/dataset.hh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class DataSet {

private:
DataSet (uint32_t);
int id;
int id = 0;
uint32_t job_id = 0;
std::string file;
tcp::endpoint* find_local_master();
boost::asio::io_service iosvc;
Expand Down
53 changes: 27 additions & 26 deletions src/mapreduce/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using namespace eclipse;
using namespace std;

uint32_t eclipse::Executor::map_ids(0);

namespace eclipse {
// Constructor {{{
Executor::Executor(PeerMR* p) : peer(p) { }
Expand Down Expand Up @@ -47,13 +49,13 @@ bool Executor::run_map (messages::Task* m, std::string input) {
auto& value = key_value.second;

KeyValueShuffle kv;
kv.job_id_ = 0;
kv.job_id_ = m->job_id;
kv.map_id_ = 0;
kv.key_ = key;
kv.value_ = value;
peer->process(&kv);
}
peer->finish_map(0);
peer->finish_map(m->job_id);

return true;
}
Expand All @@ -73,8 +75,10 @@ bool Executor::run_reduce (messages::Task* task) {
function<string(string,string)> _reducer_ =
loader.load_function_reduce(task->func_name);

try {

IReader ireader;
ireader.set_net_id(0);
//ireader.set_net_id(0);
ireader.set_job_id(task->job_id);
ireader.set_map_id(task->map_id);
ireader.set_reducer_id(0);
Expand All @@ -99,29 +103,26 @@ bool Executor::run_reduce (messages::Task* task) {
is_first_iteration = false;
}

FileInfo fi;
fi.file_name = key;
fi.num_block = 1;
fi.file_size = last_output.length();
fi.file_hash_key = h(key.c_str());

BlockInfo bi;
bi.file_name = key;
bi.block_name = key + "_0";
bi.block_seq = 0;
bi.block_hash_key = h(bi.block_name);
bi.block_size = last_output.length();
bi.content = last_output;

dynamic_cast<PeerDFS*>(peer)->process(&fi);
dynamic_cast<PeerDFS*>(peer)->insert_block(&bi);


// KeyValue kv;
// kv.key = h("output");
// kv.name = "output";
// kv.value = last_output;
dynamic_cast<PeerDFS*>(peer)->insert(h("output"), "output", last_output);
FileInfo fi;
fi.file_name = key;
fi.num_block = 1;
fi.file_size = last_output.length();
fi.file_hash_key = h(key.c_str());

BlockInfo bi;
bi.file_name = key;
bi.block_name = key + "_0";
bi.block_seq = 0;
bi.block_hash_key = h(bi.block_name);
bi.block_size = last_output.length();
bi.content = last_output;

dynamic_cast<PeerDFS*>(peer)->process(&fi);
dynamic_cast<PeerDFS*>(peer)->insert_block(&bi);
}
} catch (std::exception& e) {
context.logger->error ("Error in the executer: %s", e.what());
exit(1);
}
}
// }}}
Expand Down
1 change: 1 addition & 0 deletions src/mapreduce/executor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Executor {

protected:
PeerMR* peer;
static uint32_t map_ids;

};

Expand Down
50 changes: 0 additions & 50 deletions src/mapreduce/isender.cc

This file was deleted.

32 changes: 0 additions & 32 deletions src/mapreduce/isender.h

This file was deleted.

Loading

0 comments on commit 675d6a5

Please sign in to comment.