Skip to content

Commit

Permalink
automatic garbage clean
Browse files Browse the repository at this point in the history
  • Loading branch information
fxsjy committed Aug 19, 2015
1 parent a1c48d3 commit fb8e347
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 3 deletions.
2 changes: 1 addition & 1 deletion server/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ DEFINE_int32(elect_timeout_min, 150, "mininum timeout to make a new election");
DEFINE_int32(elect_timeout_max, 300, "maximum timeout to make a new election");
DEFINE_int64(session_expire_timeout, 6000000, "timeout for session expiration, 6 seconds in default");
DEFINE_bool(ins_data_compress, true, "enable snappy compression on leveldb storage");

DEFINE_int32(ins_gc_interval, 60, "binlog clean interval (seconds)");
//ins_cli only
DEFINE_string(ins_cmd, "", "the command of inc shell");
DEFINE_string(ins_key, "key", "");
Expand Down
73 changes: 71 additions & 2 deletions server/ins_node_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
#include <gflags/gflags.h>
#include <limits>
#include "common/this_thread.h"
#include "common/timer.h"
#include "storage/meta.h"
Expand All @@ -21,6 +22,7 @@ DECLARE_int32(elect_timeout_min);
DECLARE_int32(elect_timeout_max);
DECLARE_int64(session_expire_timeout);
DECLARE_bool(ins_data_compress);
DECLARE_int32(ins_gc_interval);

const std::string tag_last_applied_index = "#TAG_LAST_APPLIED_INDEX#";

Expand All @@ -42,7 +44,8 @@ InsNodeImpl::InsNodeImpl (std::string& server_id,
server_start_timestamp_(0),
commit_index_(-1),
last_applied_index_(-1),
single_node_mode_(false){
single_node_mode_(false),
last_safe_clean_index_(-1) {
srand(time(NULL));
replication_cond_ = new CondVar(&mu_);
commit_cond_ = new CondVar(&mu_);
Expand Down Expand Up @@ -104,6 +107,9 @@ InsNodeImpl::InsNodeImpl (std::string& server_id,
session_checker_.AddTask(
boost::bind(&InsNodeImpl::RemoveExpiredSessions, this)
);
binlog_cleaner_.AddTask(
boost::bind(&InsNodeImpl::GarbageClean, this)
);
}

InsNodeImpl::~InsNodeImpl() {
Expand Down Expand Up @@ -1615,7 +1621,7 @@ void InsNodeImpl::UnLock(::google::protobuf::RpcController* /*controller*/,


void InsNodeImpl::DelBinlog(int64_t index) {
LOG(INFO, "delete binlog [%ld]", index);
LOG(DEBUG, "delete binlog [%ld]", index);
bool ret = binlogger_->RemoveSlot(index);
if (ret) {
binlog_cleaner_.DelayTask(10, //10ms delay
Expand Down Expand Up @@ -1650,6 +1656,69 @@ void InsNodeImpl::CleanBinlog(::google::protobuf::RpcController* controller,
done->Run();
}

void InsNodeImpl::GarbageClean() {
std::vector<std::string> all_members;
bool is_leader = false;
{
MutexLock lock(&mu_);
if (status_ == kLeader) {
is_leader = true;
}
std::copy(members_.begin(), members_.end(), std::back_inserter(all_members));
}
if (is_leader) {
int64_t min_applied_index = std::numeric_limits<int64_t>::max();
bool ret_all = true;
std::vector<std::string>::iterator it;
for(it = all_members.begin(); it != all_members.end(); it++) {
galaxy::ins::InsNode_Stub* stub;
std::string server_id = *it;
rpc_client_.GetStub(server_id, &stub);
boost::scoped_ptr<galaxy::ins::InsNode_Stub> stub_guard(stub);
::galaxy::ins::ShowStatusRequest request;
::galaxy::ins::ShowStatusResponse response;
bool ok = rpc_client_.SendRequest(stub, &InsNode_Stub::ShowStatus,
&request, &response, 2, 1);
if (!ok) {
LOG(INFO, "faild to get last_applied_index from %s", server_id.c_str());
ret_all = false;
break;
} else {
min_applied_index = std::min(min_applied_index, response.last_applied());
}
}
if (ret_all) {
int64_t safe_clean_index = min_applied_index;
int64_t old_index;
{
MutexLock lock(&mu_);
old_index = last_safe_clean_index_;
last_safe_clean_index_ = safe_clean_index;
}
if (old_index != safe_clean_index) {
LOG(INFO, "[gc] safe clean index is : %ld", safe_clean_index);
for(it = all_members.begin(); it != all_members.end(); it++) {
galaxy::ins::InsNode_Stub* stub;
std::string server_id = *it;
rpc_client_.GetStub(server_id, &stub);
boost::scoped_ptr<galaxy::ins::InsNode_Stub> stub_guard(stub);
::galaxy::ins::CleanBinlogRequest request;
::galaxy::ins::CleanBinlogResponse response;
request.set_end_index(safe_clean_index);
bool ok = rpc_client_.SendRequest(stub, &InsNode_Stub::CleanBinlog,
&request, &response, 2, 1);
if (!ok) {
LOG(INFO, "failed to clean binlog request to %s", server_id.c_str());
}
}
}
} // end-if, got min_applied_index
}// end-if, this node is leader

binlog_cleaner_.DelayTask(FLAGS_ins_gc_interval * 1000,
boost::bind(&InsNodeImpl::GarbageClean, this));
}

} //namespace ins
} //namespace galaxy

2 changes: 2 additions & 0 deletions server/ins_node_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class InsNodeImpl : public InsNode {
const std::string& session_id);
void ForwardKeepAlive(const ::galaxy::ins::KeepAliveRequest * request,
::galaxy::ins::KeepAliveResponse * response);
void GarbageClean();
public:
std::vector<std::string> members_;
private:
Expand Down Expand Up @@ -270,6 +271,7 @@ class InsNodeImpl : public InsNode {
Mutex session_locks_mu_;
ThreadPool binlog_cleaner_;
bool single_node_mode_;
int64_t last_safe_clean_index_;
};

} //namespace ins
Expand Down

0 comments on commit fb8e347

Please sign in to comment.