Skip to content

Commit

Permalink
Merge pull request #19 from DICL/up-3.1.2
Browse files Browse the repository at this point in the history
Update the dfs components to DFS v3.1.2 (latest version)
  • Loading branch information
vicentebolea committed May 15, 2016
2 parents 2db6374 + de48182 commit c58a1e3
Show file tree
Hide file tree
Showing 20 changed files with 2,578 additions and 59 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ m4/ltsugar.m4
m4/ltversion.m4
m4/lt~obsolete.m4
m4/pkg.m4

#Docs
doc/html
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[submodule "doc/manual"]
path = doc/manual
url = [email protected]:DICL/EclipseDFS.wiki.git
[submodule "gh-pages"]
path = doc/html
url = [email protected]:DICL/EclipseDFS
branch = gh-pages
1 change: 1 addition & 0 deletions .vimrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ set makeprg=cd\ ~/build;\ make\ -j8\ check
let NERDTreeIgnore = ['Makefile.in', '^configure$', 'aclocal.m4', 'autom4te.cache', 'm4', 'config.h.in']

let g:clang_user_options = '-std=c++14'
let g:ctrlp_custom_ignore = '\v[\/](git|html)$'
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Build Status](https://travis-ci.com/DICL/EclipseDFS.svg?token=MaWCP2sHsbC2FaU6ztsx&branch=master)](https://magnum.travis-ci.com/DICL/EclipseDFS)
[![Build Status](https://travis-ci.org/DICL/EclipseDFS.svg?branch=master)](https://travis-ci.org/DICL/EclipseDFS)
[![Slack room](https://img.shields.io/badge/slack-join-pink.svg)](https://dicl.slack.com/messages/general/)
BRIEFING
========
Expand Down
2,353 changes: 2,353 additions & 0 deletions doc/Doxyfile

Large diffs are not rendered by default.

Binary file added doc/deploy_key.enc
Binary file not shown.
1 change: 1 addition & 0 deletions doc/html
Submodule html added at a57728
12 changes: 6 additions & 6 deletions src/common/histogram.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void Histogram::initialize()

// initialize the boundary
for (int i = 0; i < numserver - 1; i++) //boundaries[i] = (inf/10) * i
boundaries[i] = (int) ( ( (double) max / (double) numserver) * ( (double) (i + 1)));
boundaries[i] = (uint32_t) ( ( (double) max / (double) numserver) * ( (double) (i + 1)));

boundaries[numserver - 1] = max;

Expand Down Expand Up @@ -246,17 +246,17 @@ int Histogram::get_index (unsigned query) // return the server index range o
}

// random_within_boundaries {{{
uint32_t Histogram::random_within_boundaries (unsigned int index) {
unsigned int which_server = index;
int lower_boundary ;
uint32_t Histogram::random_within_boundaries (uint32_t index) {
uint32_t which_server = index;
uint32_t lower_boundary ;

if (which_server != 0)
lower_boundary = boundaries[which_server-1];
else
lower_boundary = boundaries[numserver-1];

int upper_boundary = boundaries[which_server];
int range = upper_boundary - lower_boundary;
uint32_t upper_boundary = boundaries[which_server];
uint32_t range = upper_boundary - lower_boundary;
uint32_t result = rand() % range + lower_boundary;

return result;
Expand Down
2 changes: 1 addition & 1 deletion src/common/histogram.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Histogram
int numbin; // number of bin -> number of histogram bin
// int digit; // number of digits to represent the problem space
double* querycount; // the data access count to each
unsigned* boundaries; // the index of end point of each node
uint32_t* boundaries; // the index of end point of each node

public:
Histogram(); // constructs an uninitialized object
Expand Down
2 changes: 1 addition & 1 deletion src/common/settings.hh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* \file settings.hh
/** \file settings.hh
* @author Vicente Adolfo Bolea Sanchez
* @brief This is the implementation file of Settings
*
Expand Down
92 changes: 81 additions & 11 deletions src/fs/dfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ namespace eclipse{
continue;
}

int which_server = rand()%NUM_SERVERS;
int which_server = file_hash_key % NUM_SERVERS;
ifstream myfile (file_name);
uint64_t start = 0;
uint64_t end = start + BLOCK_SIZE - 1;
Expand Down Expand Up @@ -121,8 +121,8 @@ namespace eclipse{

block_info.block_name = file_name + "_" + to_string(block_seq);
block_info.file_name = file_name;
block_info.block_seq = block_seq++;
block_info.block_hash_key = boundaries.random_within_boundaries(which_server);
block_info.block_seq = block_seq++;
block_info.block_size = block_size;
block_info.is_inter = 0;
block_info.node = nodes[which_server];
Expand Down Expand Up @@ -176,7 +176,17 @@ namespace eclipse{
for (int i=2; i<argc; i++) {
string file_name = argv[i];
uint32_t file_hash_key = h(file_name);
auto socket = connect (file_hash_key % NUM_SERVERS);
auto socket = connect (file_hash_key);
FileExist fe;
fe.file_name = file_name;
send_message(socket.get(), &fe);
auto rep = read_reply<Reply> (socket.get());

if (rep->message != "TRUE")
{
cerr << "[ERR] " << file_name << " doesn't exist." << endl;
continue;
}
FileRequest fr;
fr.file_name = file_name;

Expand Down Expand Up @@ -206,6 +216,55 @@ namespace eclipse{
return 0;
}

int DFS::cat(int argc, char* argv[])
{
if (argc < 3) {
cout << "[INFO] dfs cat file_1 file_2 ..." << endl;
return -1;

} else {
Histogram boundaries(NUM_SERVERS, 0);
boundaries.initialize();

for (int i=2; i<argc; i++) {
string file_name = argv[i];
uint32_t file_hash_key = h(file_name);
auto socket = connect (file_hash_key % NUM_SERVERS);
FileExist fe;
fe.file_name = file_name;
send_message(socket.get(), &fe);
auto rep = read_reply<Reply> (socket.get());

if (rep->message != "TRUE")
{
cerr << "[ERR] " << file_name << " doesn't exist." << endl;
continue;
}
FileRequest fr;
fr.file_name = file_name;

send_message (socket.get(), &fr);
auto fd = read_reply<FileDescription> (socket.get());

socket->close();
int block_seq = 0;
for (auto block_name : fd->blocks) {
uint32_t hash_key = fd->hash_keys[block_seq++];
auto tmp_socket = connect(boundaries.get_index(hash_key));
BlockRequest br;
br.block_name = block_name;
br.hash_key = hash_key;
send_message(tmp_socket.get(), &br);
auto msg = read_reply<BlockInfo>(tmp_socket.get());
cout << msg->content;
tmp_socket->close();
}
socket->close();
}
}
return 0;
}

int DFS::ls(int argc, char* argv[])
{
vector<FileInfo> total;
Expand Down Expand Up @@ -248,33 +307,38 @@ namespace eclipse{
uint32_t GB = 1024 * 1024 * 1024;
uint64_t TB = (uint64_t) 1024 * 1024 * 1024 * 1024;
uint64_t PB = (uint64_t) 1024 * 1024 * 1024 * 1024 * 1024;
uint32_t K = 1000;
uint32_t M = 1000 * 1000;
uint32_t G = 1000 * 1000 * 1000;
uint64_t T = (uint64_t) 1000 * 1000 * 1000 * 1000;
uint64_t P = (uint64_t) 1000 * 1000 * 1000 * 1000 * 1000;
float hsize = 0;
int tabsize = 12;
string unit;
cout.precision(1);
if(fl.file_size <= KB)
if(fl.file_size < K)
{
hsize = (float)fl.file_size;
unit = "B";
tabsize++;
cout.precision(0);
}
else if(fl.file_size <= MB)
else if(fl.file_size < M)
{
hsize = (float)fl.file_size / KB;
unit = "KB";
}
else if(fl.file_size <= GB)
else if(fl.file_size < G)
{
hsize = (float)fl.file_size / MB;
unit = "MB";
}
else if(fl.file_size <= TB)
else if(fl.file_size < T)
{
hsize = (float)fl.file_size / GB;
unit = "GB";
}
else if(fl.file_size <= PB)
else if(fl.file_size < P)
{
hsize = (float)fl.file_size / TB;
unit = "TB";
Expand Down Expand Up @@ -326,10 +390,12 @@ namespace eclipse{

unsigned int block_seq = 0;
for (auto block_name : fd->blocks) {
auto tmp_socket = connect(boundaries.get_index(fd->hash_keys[block_seq]));
uint32_t block_hash_key = fd->hash_keys[block_seq];
auto tmp_socket = connect(boundaries.get_index(block_hash_key));
BlockDel bd;
bd.block_name = block_name;
bd.file_name = file_name;
bd.block_hash_key = block_hash_key;
bd.block_seq = block_seq++;
send_message(tmp_socket.get(), &bd);
auto msg = read_reply<Reply>(tmp_socket.get());
Expand Down Expand Up @@ -393,9 +459,13 @@ namespace eclipse{
cout << file_name << endl;
int block_seq = 0;
for (auto block_name : fd->blocks) {
uint32_t hash_key = fd->hash_keys[block_seq++];
string node = nodes[boundaries.get_index(hash_key)];
uint32_t hash_key = fd->hash_keys[block_seq++];
string node = nodes[boundaries.get_index(hash_key)]; // When replication policy is changed, this line should be changed!
string r_node = nodes[(boundaries.get_index(hash_key)+1+NUM_SERVERS)%NUM_SERVERS]; // When replication policy is changed, this line should be changed!
string l_node = nodes[(boundaries.get_index(hash_key)-1+NUM_SERVERS)%NUM_SERVERS]; // When replication policy is changed, this line should be changed!
cout << "\t- " << setw(15) << block_name << " : " << setw(15) << node << endl;
cout << "\t- " << setw(15) << block_name << " : " << setw(15) << r_node << endl;
cout << "\t- " << setw(15) << block_name << " : " << setw(15) << l_node << endl;
}
socket->close();
}
Expand Down
1 change: 1 addition & 0 deletions src/fs/dfs.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace eclipse {
void load_settings ();
int put (int argc, char* argv[]);
int get (int argc, char* argv[]);
int cat (int argc, char* argv[]);
int ls (int argc, char* argv[]);
int rm (int argc, char* argv[]);
int format (int argc, char* argv[]);
Expand Down
8 changes: 6 additions & 2 deletions src/fs/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ int main(int argc, char* argv[])
{
if(argc < 2)
{
cout << "[INFO] dfs put|get|ls|rm|format" << endl;
cout << "[INFO] dfs put|get|cat|ls|rm|format" << endl;
return -1;
}
else
Expand All @@ -24,6 +24,10 @@ int main(int argc, char* argv[])
{
dfs.get(argc, argv);
}
else if(op.compare("cat") == 0)
{
dfs.cat(argc, argv);
}
else if(op.compare("ls") == 0)
{
dfs.ls(argc, argv);
Expand All @@ -43,7 +47,7 @@ int main(int argc, char* argv[])
else
{
cerr << "[ERR] Unknown operation" << endl;
cout << "[INFO] dfs put|get|ls|rm|format" << endl;
cout << "[INFO] dfs put|get|cat|ls|rm|format" << endl;
return -1;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/messages/blockdel.hh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ struct BlockDel: public Message {
std::string block_name;
std::string file_name;
unsigned int block_seq;
//uint32_t block_hash_key;
uint32_t block_hash_key;
//uint32_t block_size;
//unsigned int is_inter;
//std::string node;
Expand Down
1 change: 1 addition & 0 deletions src/messages/boost_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ template <typename Archive>
ar & BOOST_SERIALIZATION_NVP(c.file_name);
ar & BOOST_SERIALIZATION_NVP(c.block_seq);
ar & BOOST_SERIALIZATION_NVP(c.block_name);
ar & BOOST_SERIALIZATION_NVP(c.block_hash_key);
}

template <typename Archive>
Expand Down
53 changes: 37 additions & 16 deletions src/network/asyncchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <iomanip>
#include <istream>
#include <boost/bind.hpp>
#include <unistd.h>
#include <stdexcept>

namespace ph = boost::asio::placeholders;
using namespace std;
Expand All @@ -19,7 +21,9 @@ AsyncChannel::AsyncChannel(tcp::socket* s, tcp::socket* r, NetObserver* node_, i
sender(s),
receiver(r),
id(i)
{ }
{
is_writing.store(false);
}
AsyncChannel::~AsyncChannel() {
if (receiver!= nullptr) {
receiver->close();
Expand All @@ -35,25 +39,37 @@ void AsyncChannel::do_write (Message* m) {

stringstream ss;
ss << setfill('0') << setw(header_size) << str.length() << str;
string* to_write = new string(ss.str());
do_write_impl(to_write);

// TODO messages_queue may not be thread-safe: Let's leave that task to PeerDFS
messages_queue.emplace (make_unique<string>(ss.str()));
if (!is_writing.exchange(true)) {
do_write_impl ();
}
}
// }}}
// do_write_impl {{{
void AsyncChannel::do_write_impl (string* to_write) {
async_write (*sender, buffer(*to_write), boost::bind (&AsyncChannel::on_write,
this, ph::error, ph::bytes_transferred, to_write));
void AsyncChannel::do_write_impl () {
auto& to_write = messages_queue.front();
async_write (*sender, buffer(*to_write), transfer_exactly(to_write->length()),
boost::bind (&AsyncChannel::on_write, this, ph::error, ph::bytes_transferred));
}
// }}}
// on_write {{{
void AsyncChannel::on_write (const boost::system::error_code& ec,
size_t s, string* str) {
delete str;
size_t s) {
if (ec) {
logger->info ("Message %s, size: %d, could not reach err=%s",
str->c_str(), s, ec.message().c_str());
logger->info ("Message size: %d, could not reach err=%s",
s, ec.message().c_str());

do_write_impl(str);
do_write_impl();
} else {
messages_queue.pop();

if (!messages_queue.empty()) {
do_write_impl ();
} else {
is_writing.exchange(false);
}
}
}
// }}}
Expand All @@ -73,11 +89,15 @@ void AsyncChannel::read_coroutine (yield_context yield) {
try {
while (true) {
size_t l = async_read (*receiver, buffer(header, header_size), yield[ec]);
if (l != (size_t)header_size or ec) throw 1;
if (l != (size_t)header_size or ec) {
throw std::runtime_error("header size");
}

size_t size = atoi(header);
l = read (*receiver, body.prepare(size));
//if (ec) throw 1;
if (l != size) {
throw std::runtime_error("body size");
}

body.commit (l);
string str ((istreambuf_iterator<char>(&body)),
Expand All @@ -90,12 +110,13 @@ void AsyncChannel::read_coroutine (yield_context yield) {
delete msg;
msg=nullptr;
}
} catch (...) {
} catch (std::exception& e) {
if (ec == boost::asio::error::eof)
logger->info ("AsyncChannel: Closing server socket to client");
else
logger->info ("AsyncChannel: Message arrived error=%s",
ec.message().c_str());
logger->info ("AsyncChannel: unformed header arrived from host %s, ex: %s",
receiver->remote_endpoint().address().to_string().c_str(), e.what());


node->on_disconnect(nullptr, id);
}
Expand Down
Loading

0 comments on commit c58a1e3

Please sign in to comment.