diff --git a/benchmark/protocols/poe/BUILD b/benchmark/protocols/poe/BUILD new file mode 100644 index 000000000..e7bbde3b1 --- /dev/null +++ b/benchmark/protocols/poe/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//visibility:private"]) + +load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") + +cc_binary( + name = "kv_server_performance", + srcs = ["kv_server_performance.cpp"], + deps = [ + "//chain/storage:memory_db", + "//executor/kv:kv_executor", + "//platform/config:resdb_config_utils", + "//platform/consensus/ordering/poe/framework:consensus", + "//service/utils:server_factory", + ], +) + diff --git a/benchmark/protocols/poe/kv_server_performance.cpp b/benchmark/protocols/poe/kv_server_performance.cpp new file mode 100644 index 000000000..fa13e9a94 --- /dev/null +++ b/benchmark/protocols/poe/kv_server_performance.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include + +#include "chain/storage/memory_db.h" +#include "executor/kv/kv_executor.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/consensus/ordering/poe/framework/consensus.h" +#include "platform/networkstrate/service_network.h" +#include "platform/statistic/stats.h" +#include "proto/kv/kv.pb.h" + +using namespace resdb; +using namespace resdb::poe; +using namespace resdb::storage; + +void ShowUsage() { + printf(" [logging_dir]\n"); +} + +std::string GetRandomKey() { + int num1 = rand() % 10; + int num2 = rand() % 10; + return std::to_string(num1) + std::to_string(num2); +} + +int main(int argc, char** argv) { + if (argc < 3) { + ShowUsage(); + exit(0); + } + + // google::InitGoogleLogging(argv[0]); + // FLAGS_minloglevel = google::GLOG_WARNING; + + char* config_file = argv[1]; + char* private_key_file = argv[2]; + char* cert_file = argv[3]; + + if (argc >= 5) { + auto monitor_port = Stats::GetGlobalStats(5); + monitor_port->SetPrometheus(argv[4]); + } + + std::unique_ptr config = + GenerateResDBConfig(config_file, private_key_file, cert_file); + + config->RunningPerformance(true); + ResConfigData config_data = config->GetConfigData(); + + auto performance_consens = std::make_unique( + *config, std::make_unique(std::make_unique())); + performance_consens->SetupPerformanceDataFunc([]() { + KVRequest request; + request.set_cmd(KVRequest::SET); + request.set_key(GetRandomKey()); + request.set_value("helloword"); + std::string request_data; + request.SerializeToString(&request_data); + return request_data; + }); + + auto server = + std::make_unique(*config, std::move(performance_consens)); + server->Run(); +} diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index 59779b221..fd24da3a3 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -231,6 +231,7 @@ void TransactionExecutor::Execute(std::unique_ptr request, response = std::make_unique(); } + response->set_proxy_id(batch_request.proxy_id()); response->set_createtime(batch_request.createtime()); response->set_local_id(batch_request.local_id()); response->set_hash(batch_request.hash()); diff --git a/platform/consensus/ordering/common/algorithm/BUILD b/platform/consensus/ordering/common/algorithm/BUILD new file mode 100644 index 000000000..9abd8715e --- /dev/null +++ b/platform/consensus/ordering/common/algorithm/BUILD @@ -0,0 +1,12 @@ +package(default_visibility = ["//platform/consensus/ordering:__subpackages__"]) + +cc_library( + name = "protocol_base", + srcs = ["protocol_base.cpp"], + hdrs = ["protocol_base.h"], + deps = [ + "//common:comm", + "//common/crypto:signature_verifier", + ], +) + diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.cpp b/platform/consensus/ordering/common/algorithm/protocol_base.cpp new file mode 100644 index 000000000..3c6c2fc3b --- /dev/null +++ b/platform/consensus/ordering/common/algorithm/protocol_base.cpp @@ -0,0 +1,53 @@ +#include "platform/consensus/ordering/common/algorithm/protocol_base.h" + +#include + +namespace resdb { +namespace common { + +ProtocolBase::ProtocolBase( + int id, + int f, + int total_num, + SingleCallFuncType single_call, + BroadcastCallFuncType broadcast_call, + CommitFuncType commit) : + id_(id), + f_(f), + total_num_(total_num), + single_call_(single_call), + broadcast_call_(broadcast_call), + commit_(commit) { + stop_ = false; +} + +ProtocolBase::ProtocolBase( int id, int f, int total_num) : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr){ + +} + +ProtocolBase::~ProtocolBase() { + Stop(); +} + +void ProtocolBase::Stop(){ + stop_ = true; +} + +bool ProtocolBase::IsStop(){ + return stop_; +} + +int ProtocolBase::SendMessage(int msg_type, const google::protobuf::Message& msg, int node_id) { + return single_call_(msg_type, msg, node_id); +} + +int ProtocolBase::Broadcast(int msg_type, const google::protobuf::Message& msg) { + return broadcast_call_(msg_type, msg); +} + +int ProtocolBase::Commit(const google::protobuf::Message& msg) { + return commit_(msg); +} + +} // namespace protocol +} // namespace resdb diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.h b/platform/consensus/ordering/common/algorithm/protocol_base.h new file mode 100644 index 000000000..a93be8223 --- /dev/null +++ b/platform/consensus/ordering/common/algorithm/protocol_base.h @@ -0,0 +1,64 @@ +#pragma once + +#include +#include +#include "common/crypto/signature_verifier.h" + +namespace resdb { +namespace common { + +class ProtocolBase { + public: + typedef std::function SingleCallFuncType; + typedef std::function BroadcastCallFuncType; + typedef std::function CommitFuncType; + + ProtocolBase( + int id, + int f, + int total_num, + SingleCallFuncType single_call, + BroadcastCallFuncType broadcast_call, + CommitFuncType commit + ); + + ProtocolBase( int id, int f, int total_num); + + + virtual ~ProtocolBase(); + + void Stop(); + + inline + void SetSingleCallFunc(SingleCallFuncType single_call) { single_call_ = single_call; } + + inline + void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) { broadcast_call_ = broadcast_call; } + + inline + void SetCommitFunc(CommitFuncType commit_func) { commit_ = commit_func; } + + inline + void SetSignatureVerifier(SignatureVerifier* verifier) { verifier_ = verifier;} + + protected: + int SendMessage(int msg_type, const google::protobuf::Message& msg, int node_id); + int Broadcast(int msg_type, const google::protobuf::Message& msg); + int Commit(const google::protobuf::Message& msg); + + bool IsStop(); + + protected: + int id_; + int f_; + int total_num_; + std::function single_call_; + std::function broadcast_call_; + std::function commit_; + std::atomic stop_; + + SignatureVerifier* verifier_; +}; + +} // namespace protocol +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/BUILD b/platform/consensus/ordering/common/framework/BUILD new file mode 100644 index 000000000..82e03a0fb --- /dev/null +++ b/platform/consensus/ordering/common/framework/BUILD @@ -0,0 +1,49 @@ +package(default_visibility = ["//platform/consensus/ordering:__subpackages__"]) + +cc_library( + name = "consensus", + srcs = ["consensus.cpp"], + hdrs = ["consensus.h"], + deps = [ + ":performance_manager", + ":response_manager", + "//common/utils", + "//executor/common:transaction_manager", + "//platform/consensus/execution:transaction_executor", + "//platform/consensus/ordering/common/algorithm:protocol_base", + "//platform/networkstrate:consensus_manager", + ], +) + +cc_library( + name = "performance_manager", + srcs = ["performance_manager.cpp"], + hdrs = ["performance_manager.h"], + deps = [ + ":transaction_utils", + "//platform/networkstrate:replica_communicator", + "//platform/networkstrate:server_comm", + ], +) + + +cc_library( + name = "response_manager", + srcs = ["response_manager.cpp"], + hdrs = ["response_manager.h"], + deps = [ + ":transaction_utils", + "//platform/networkstrate:replica_communicator", + "//platform/networkstrate:server_comm", + ], +) + +cc_library( + name = "transaction_utils", + srcs = ["transaction_utils.cpp"], + hdrs = ["transaction_utils.h"], + visibility = ["//visibility:public"], + deps = [ + "//platform/proto:resdb_cc_proto", + ], +) diff --git a/platform/consensus/ordering/common/framework/consensus.cpp b/platform/consensus/ordering/common/framework/consensus.cpp new file mode 100644 index 000000000..683bce29e --- /dev/null +++ b/platform/consensus/ordering/common/framework/consensus.cpp @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/common/framework/consensus.h" + +#include +#include + +#include "common/utils/utils.h" + +namespace resdb { +namespace common { + +Consensus::Consensus(const ResDBConfig& config, + std::unique_ptr executor) + : ConsensusManager(config), + replica_communicator_(GetBroadCastClient()), + transaction_executor_(std::make_unique( + config, + [&](std::unique_ptr request, + std::unique_ptr resp_msg) { + ResponseMsg(*resp_msg); + }, + nullptr, std::move(executor))) { + LOG(INFO) << "is running is performance mode:" + << config_.IsPerformanceRunning(); + is_stop_ = false; + global_stats_ = Stats::GetGlobalStats(); +} + +void Consensus::Init(){ + if(performance_manager_ == nullptr){ + performance_manager_ = + config_.IsPerformanceRunning() + ? std::make_unique( + config_, GetBroadCastClient(), GetSignatureVerifier()) + : nullptr; + } + + if(response_manager_ == nullptr){ + response_manager_ = + !config_.IsPerformanceRunning() + ? std::make_unique(config_, GetBroadCastClient(), + GetSignatureVerifier()) + : nullptr; + } +} + +void Consensus::InitProtocol(ProtocolBase * protocol){ + protocol->SetSingleCallFunc( + [&](int type, const google::protobuf::Message& msg, int node_id) { + return SendMsg(type, msg, node_id); + }); + + protocol->SetBroadcastCallFunc( + [&](int type, const google::protobuf::Message& msg) { + return Broadcast(type, msg); + }); + + protocol->SetCommitFunc( + [&](const google::protobuf::Message& msg) { + return CommitMsg(msg); + }); +} + +Consensus::~Consensus(){ + is_stop_ = true; +} + +void Consensus::SetPerformanceManager(std::unique_ptr performance_manager){ + performance_manager_ = std::move(performance_manager); +} + +bool Consensus::IsStop(){ + return is_stop_; +} + +void Consensus::SetupPerformanceDataFunc(std::function func) { + performance_manager_->SetDataFunc(func); +} + +void Consensus::SetCommunicator(ReplicaCommunicator* replica_communicator) { + replica_communicator_ = replica_communicator; +} + +int Consensus::Broadcast(int type, const google::protobuf::Message& msg) { + Request request; + msg.SerializeToString(request.mutable_data()); + request.set_type(Request::TYPE_CUSTOM_CONSENSUS); + request.set_user_type(type); + request.set_sender_id(config_.GetSelfInfo().id()); + + replica_communicator_->BroadCast(request); + return 0; +} + +int Consensus::SendMsg(int type, const google::protobuf::Message& msg, + int node_id) { + Request request; + msg.SerializeToString(request.mutable_data()); + request.set_type(Request::TYPE_CUSTOM_CONSENSUS); + request.set_user_type(type); + request.set_sender_id(config_.GetSelfInfo().id()); + replica_communicator_->SendMessage(request, node_id); + return 0; +} + +std::vector Consensus::GetReplicas() { + return config_.GetReplicaInfos(); +} + +int Consensus::CommitMsg(const google::protobuf::Message &txn) { + return 0; +} + +// The implementation of PBFT. +int Consensus::ConsensusCommit(std::unique_ptr context, + std::unique_ptr request) { + switch (request->type()) { + case Request::TYPE_CLIENT_REQUEST: + if (config_.IsPerformanceRunning()) { + return performance_manager_->StartEval(); + } + case Request::TYPE_RESPONSE: + if (config_.IsPerformanceRunning()) { + return performance_manager_->ProcessResponseMsg(std::move(context), + std::move(request)); + } + case Request::TYPE_NEW_TXNS: { + return ProcessNewTransaction(std::move(request)); + } + case Request::TYPE_CUSTOM_CONSENSUS: { + return ProcessCustomConsensus(std::move(request)); + } + } + return 0; +} + +int Consensus::ProcessCustomConsensus(std::unique_ptr request) { + return 0; +} + +int Consensus::ProcessNewTransaction(std::unique_ptr request) { + return 0; +} + +int Consensus::ResponseMsg(const BatchUserResponse& batch_resp) { + Request request; + request.set_seq(batch_resp.seq()); + request.set_type(Request::TYPE_RESPONSE); + request.set_sender_id(config_.GetSelfInfo().id()); + request.set_proxy_id(batch_resp.proxy_id()); + batch_resp.SerializeToString(request.mutable_data()); + replica_communicator_->SendMessage(request, request.proxy_id()); + return 0; +} + +} // namespace common +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/consensus.h b/platform/consensus/ordering/common/framework/consensus.h new file mode 100644 index 000000000..881dc72bb --- /dev/null +++ b/platform/consensus/ordering/common/framework/consensus.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once + +#include "executor/common/transaction_manager.h" +#include "platform/consensus/execution/transaction_executor.h" +#include "platform/consensus/ordering/common/algorithm/protocol_base.h" +#include "platform/consensus/ordering/common/framework/performance_manager.h" +#include "platform/consensus/ordering/common/framework/response_manager.h" +#include "platform/networkstrate/consensus_manager.h" + +namespace resdb { +namespace common { + +class Consensus : public ConsensusManager { + public: + Consensus(const ResDBConfig& config, + std::unique_ptr transaction_manager); + virtual ~Consensus(); + + int ConsensusCommit(std::unique_ptr context, + std::unique_ptr request) override; + std::vector GetReplicas() override; + + void SetupPerformanceDataFunc(std::function func); + + void SetCommunicator(ReplicaCommunicator* replica_communicator); + + void InitProtocol(ProtocolBase * protocol); + + protected: + virtual int ProcessCustomConsensus(std::unique_ptr request); + virtual int ProcessNewTransaction(std::unique_ptr request); + virtual int CommitMsg(const google::protobuf::Message& msg); + + protected: + int SendMsg(int type, const google::protobuf::Message& msg, int node_id); + int Broadcast(int type, const google::protobuf::Message& msg); + int ResponseMsg(const BatchUserResponse& batch_resp); + void AsyncSend(); + bool IsStop(); + + protected: + void Init(); + void SetPerformanceManager(std::unique_ptr performance_manger); + + protected: + ReplicaCommunicator* replica_communicator_; + std::unique_ptr performance_manager_; + std::unique_ptr response_manager_; + std::unique_ptr transaction_executor_; + Stats* global_stats_; + + LockFreeQueue resp_queue_; + std::vector send_thread_; + bool is_stop_; +}; + +} // namespace common +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/performance_manager.cpp b/platform/consensus/ordering/common/framework/performance_manager.cpp new file mode 100644 index 000000000..089f3bd5e --- /dev/null +++ b/platform/consensus/ordering/common/framework/performance_manager.cpp @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/common/framework/performance_manager.h" + +#include + +#include "common/utils/utils.h" + +namespace resdb { +namespace common { + +using comm::CollectorResultCode; + +PerformanceManager::PerformanceManager( + const ResDBConfig& config, ReplicaCommunicator* replica_communicator, + SignatureVerifier* verifier) + : config_(config), + replica_communicator_(replica_communicator), + batch_queue_("user request"), + verifier_(verifier) { + stop_ = false; + eval_started_ = false; + eval_ready_future_ = eval_ready_promise_.get_future(); + if (config_.GetPublicKeyCertificateInfo() + .public_key() + .public_key_info() + .type() == CertificateKeyInfo::CLIENT) { + for (int i = 0; i < 1; ++i) { + user_req_thread_[i] = + std::thread(&PerformanceManager::BatchProposeMsg, this); + } + } + global_stats_ = Stats::GetGlobalStats(); + send_num_ = 0; + total_num_ = 0; + replica_num_ = config_.GetReplicaNum(); + id_ = config_.GetSelfInfo().id(); + primary_ = id_ % replica_num_; + if (primary_ == 0) primary_ = replica_num_; + local_id_ = 1; + sum_ = 0; +} + +PerformanceManager::~PerformanceManager() { + stop_ = true; + for (int i = 0; i < 16; ++i) { + if (user_req_thread_[i].joinable()) { + user_req_thread_[i].join(); + } + } +} + +int PerformanceManager::GetPrimary() { return primary_; } + +std::unique_ptr PerformanceManager::GenerateUserRequest() { + std::unique_ptr request = std::make_unique(); + request->set_data(data_func_()); + return request; +} + +void PerformanceManager::SetDataFunc(std::function func) { + data_func_ = std::move(func); +} + +int PerformanceManager::StartEval() { + if (eval_started_) { + return 0; + } + eval_started_ = true; + for (int i = 0; i < 100000000; ++i) { + // for (int i = 0; i < 60000000000; ++i) { + std::unique_ptr queue_item = std::make_unique(); + queue_item->context = nullptr; + queue_item->user_request = GenerateUserRequest(); + batch_queue_.Push(std::move(queue_item)); + if (i == 2000000) { + eval_ready_promise_.set_value(true); + } + } + LOG(WARNING) << "start eval done"; + return 0; +} + +// =================== response ======================== +// handle the response message. If receive f+1 commit messages, send back to the +// user. +int PerformanceManager::ProcessResponseMsg(std::unique_ptr context, + std::unique_ptr request) { + std::unique_ptr response; + // Add the response message, and use the call back to collect the received + // messages. + // The callback will be triggered if it received f+1 messages. + if (request->ret() == -2) { + // LOG(INFO) << "get response fail:" << request->ret(); + send_num_--; + return 0; + } + + //LOG(INFO) << "get response:" << request->seq() << " sender:"<sender_id(); + std::unique_ptr batch_response = nullptr; + CollectorResultCode ret = + AddResponseMsg(std::move(request), [&](std::unique_ptr request) { + batch_response = std::move(request); + return; + }); + + if (ret == CollectorResultCode::STATE_CHANGED) { + assert(batch_response); + SendResponseToClient(*batch_response); + } + return ret == CollectorResultCode::INVALID ? -2 : 0; +} + +CollectorResultCode PerformanceManager::AddResponseMsg( + std::unique_ptr request, + std::function)> response_call_back) { + if (request == nullptr) { + return CollectorResultCode::INVALID; + } + + //uint64_t seq = request->seq(); + + std::unique_ptr batch_response = std::make_unique(); + if (!batch_response->ParseFromString(request->data())) { + LOG(ERROR) << "parse response fail:"<data().size() + <<" seq:"<seq(); return CollectorResultCode::INVALID; + } + + uint64_t seq = batch_response->local_id(); + //LOG(ERROR)<<"receive seq:"< lk(response_lock_[idx]); + if (response_[idx].find(seq) == response_[idx].end()) { + //LOG(ERROR)<<"has done local seq:"<seq(); + return CollectorResultCode::OK; + } + response_[idx][seq]++; + //LOG(ERROR)<<"get seq :"<seq()<<" local id:"<= config_.GetMinClientReceiveNum()) { + //LOG(ERROR)<<"get seq :"<seq()<<" local id:"< 0) { + uint64_t run_time = GetCurrentTime() - create_time; + LOG(ERROR)<<"receive current:"<AddLatency(run_time); + } else { + } + //send_num_-=10; + send_num_--; +} + +// =================== request ======================== +int PerformanceManager::BatchProposeMsg() { + LOG(WARNING) << "batch wait time:" << config_.ClientBatchWaitTimeMS() + << " batch num:" << config_.ClientBatchNum() + << " max txn:" << config_.GetMaxProcessTxn(); + std::vector> batch_req; + eval_ready_future_.get(); + bool start = false; + while (!stop_) { + if (send_num_ > config_.GetMaxProcessTxn()) { + // LOG(ERROR)<<"wait send num:"< item = + batch_queue_.Pop(config_.ClientBatchWaitTimeMS()); + if (item == nullptr) { + if(start){ + LOG(ERROR)<<"no data"; + } + continue; + } + batch_req.push_back(std::move(item)); + if (batch_req.size() < config_.ClientBatchNum()) { + continue; + } + } + start = true; + for(int i = 0; i < 1;++i){ + int ret = DoBatch(batch_req); + } + batch_req.clear(); + } + return 0; +} + +int PerformanceManager::DoBatch( + const std::vector>& batch_req) { + auto new_request = comm::NewRequest(Request::TYPE_NEW_TXNS, Request(), + config_.GetSelfInfo().id()); + if (new_request == nullptr) { + return -2; + } + + BatchUserRequest batch_request; + for (size_t i = 0; i < batch_req.size(); ++i) { + BatchUserRequest::UserRequest* req = batch_request.add_user_requests(); + *req->mutable_request() = *batch_req[i]->user_request.get(); + req->set_id(i); + } + + batch_request.set_local_id(local_id_++); + + { + int idx = batch_request.local_id() % response_set_size_; + std::unique_lock lk(response_lock_[idx]); + response_[idx][batch_request.local_id()]++; + } + + batch_request.set_proxy_id(config_.GetSelfInfo().id()); + batch_request.set_createtime(GetCurrentTime()); + batch_request.SerializeToString(new_request->mutable_data()); + if (verifier_) { + auto signature_or = verifier_->SignMessage(new_request->data()); + if (!signature_or.ok()) { + LOG(ERROR) << "Sign message fail"; + return -2; + } + *new_request->mutable_data_signature() = *signature_or; + } + + new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data())); + new_request->set_proxy_id(config_.GetSelfInfo().id()); + new_request->set_user_seq(batch_request.local_id()); + + SendMessage(*new_request); + + global_stats_->BroadCastMsg(); + send_num_++; + sum_ += batch_req.size(); + //LOG(ERROR)<<"send num:"<IncClientCall(); + return 0; +} + +void PerformanceManager::SendMessage(const Request& request){ + replica_communicator_->SendMessage(request, GetPrimary()); +} + +} // namespace common +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/performance_manager.h b/platform/consensus/ordering/common/framework/performance_manager.h new file mode 100644 index 000000000..5a874baa9 --- /dev/null +++ b/platform/consensus/ordering/common/framework/performance_manager.h @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once + +#include + +#include "platform/config/resdb_config.h" +#include "platform/consensus/ordering/common/framework/transaction_utils.h" +#include "platform/networkstrate/replica_communicator.h" +#include "platform/networkstrate/server_comm.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace common { + +class PerformanceManager { + public: + PerformanceManager(const ResDBConfig& config, + ReplicaCommunicator* replica_communicator, + SignatureVerifier* verifier); + + virtual ~PerformanceManager(); + + int StartEval(); + + int ProcessResponseMsg(std::unique_ptr context, + std::unique_ptr request); + void SetDataFunc(std::function func); + + protected: + virtual void SendMessage(const Request& request); + + private: + // Add response messages which will be sent back to the caller + // if there are f+1 same messages. + comm::CollectorResultCode AddResponseMsg( + std::unique_ptr request, + std::function)> call_back); + void SendResponseToClient(const BatchUserResponse& batch_response); + + struct QueueItem { + std::unique_ptr context; + std::unique_ptr user_request; + }; + int DoBatch(const std::vector>& batch_req); + int BatchProposeMsg(); + int GetPrimary(); + std::unique_ptr GenerateUserRequest(); + + protected: + ResDBConfig config_; + ReplicaCommunicator* replica_communicator_; + + private: + LockFreeQueue batch_queue_; + std::thread user_req_thread_[16]; + std::atomic stop_; + Stats* global_stats_; + std::atomic send_num_; + std::mutex mutex_; + std::atomic total_num_; + SignatureVerifier* verifier_; + SignatureInfo sig_; + std::function data_func_; + std::future eval_ready_future_; + std::promise eval_ready_promise_; + std::atomic eval_started_; + std::atomic fail_num_; + static const int response_set_size_ = 6000000; + std::map response_[response_set_size_]; + std::mutex response_lock_[response_set_size_]; + int replica_num_; + int id_; + int primary_; + std::atomic local_id_; + std::atomic sum_; +}; + +} // namespace common +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/response_manager.cpp b/platform/consensus/ordering/common/framework/response_manager.cpp new file mode 100644 index 000000000..ae6b55cc0 --- /dev/null +++ b/platform/consensus/ordering/common/framework/response_manager.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/common/framework/response_manager.h" + +#include + +#include "common/utils/utils.h" + +namespace resdb { +namespace common { + +using namespace resdb::comm; + +ResponseManager::ResponseManager(const ResDBConfig& config, + ReplicaCommunicator* replica_communicator, + SignatureVerifier* verifier) + : config_(config), + replica_communicator_(replica_communicator), + batch_queue_("user request"), + verifier_(verifier) { + stop_ = false; + local_id_ = 1; + + if (config_.GetPublicKeyCertificateInfo() + .public_key() + .public_key_info() + .type() == CertificateKeyInfo::CLIENT || + config_.IsTestMode()) { + user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this); + } + global_stats_ = Stats::GetGlobalStats(); + send_num_ = 0; +} + +ResponseManager::~ResponseManager() { + stop_ = true; + if (user_req_thread_.joinable()) { + user_req_thread_.join(); + } +} + +// use system info +int ResponseManager::GetPrimary() { return 1; } + +int ResponseManager::NewUserRequest(std::unique_ptr context, + std::unique_ptr user_request) { + context->client = nullptr; + + std::unique_ptr queue_item = std::make_unique(); + queue_item->context = std::move(context); + queue_item->user_request = std::move(user_request); + + batch_queue_.Push(std::move(queue_item)); + return 0; +} + +// =================== response ======================== +// handle the response message. If receive f+1 commit messages, send back to the +// caller. +int ResponseManager::ProcessResponseMsg(std::unique_ptr context, + std::unique_ptr request) { + std::unique_ptr response; + // Add the response message, and use the call back to collect the received + // messages. + // The callback will be triggered if it received f+1 messages. + if (request->ret() == -2) { + LOG(ERROR) << "get response fail:" << request->ret(); + send_num_--; + return 0; + } + CollectorResultCode ret = + AddResponseMsg(std::move(request), [&](const Request& request) { + response = std::make_unique(request); + return; + }); + + if (ret == CollectorResultCode::STATE_CHANGED) { + BatchUserResponse batch_response; + if (batch_response.ParseFromString(response->data())) { + SendResponseToClient(batch_response); + } else { + LOG(ERROR) << "parse response fail:"; + } + } + return ret == CollectorResultCode::INVALID ? -2 : 0; +} + +CollectorResultCode ResponseManager::AddResponseMsg( + std::unique_ptr request, + std::function response_call_back) { + if (request == nullptr) { + return CollectorResultCode::INVALID; + } + + int type = request->type(); + uint64_t seq = request->seq(); + bool done = false; + { + int idx = seq % response_set_size_; + std::unique_lock lk(response_lock_[idx]); + if (response_[idx][seq] == -1) { + return CollectorResultCode::OK; + } + response_[idx][seq]++; + if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) { + response_[idx][seq] = -1; + done = true; + } + } + if (done) { + response_call_back(*request); + return CollectorResultCode::STATE_CHANGED; + } + return CollectorResultCode::OK; +} + +void ResponseManager::SendResponseToClient( + const BatchUserResponse& batch_response) { + uint64_t create_time = batch_response.createtime(); + uint64_t local_id = batch_response.local_id(); + if (create_time > 0) { + uint64_t run_time = GetCurrentTime() - create_time; + global_stats_->AddLatency(run_time); + } else { + LOG(ERROR) << "seq:" << local_id << " no resp"; + } + send_num_--; +} + +// =================== request ======================== +int ResponseManager::BatchProposeMsg() { + LOG(INFO) << "batch wait time:" << config_.ClientBatchWaitTimeMS() + << " batch num:" << config_.ClientBatchNum(); + std::vector> batch_req; + while (!stop_) { + if (send_num_ > config_.GetMaxProcessTxn()) { + LOG(ERROR) << "send num too high, wait:" << send_num_; + usleep(100); + continue; + } + if (batch_req.size() < config_.ClientBatchNum()) { + std::unique_ptr item = + batch_queue_.Pop(config_.ClientBatchWaitTimeMS()); + if (item != nullptr) { + batch_req.push_back(std::move(item)); + if (batch_req.size() < config_.ClientBatchNum()) { + continue; + } + } + } + if (batch_req.empty()) { + continue; + } + int ret = DoBatch(batch_req); + batch_req.clear(); + if (ret != 0) { + Response response; + response.set_result(Response::ERROR); + for (size_t i = 0; i < batch_req.size(); ++i) { + if (batch_req[i]->context && batch_req[i]->context->client) { + int ret = batch_req[i]->context->client->SendRawMessage(response); + if (ret) { + LOG(ERROR) << "send resp" << response.DebugString() + << " fail ret:" << ret; + } + } + } + } + } + return 0; +} + +int ResponseManager::DoBatch( + const std::vector>& batch_req) { + auto new_request = + NewRequest(Request::TYPE_NEW_TXNS, Request(), config_.GetSelfInfo().id()); + if (new_request == nullptr) { + return -2; + } + std::vector> context_list; + + BatchUserRequest batch_request; + for (size_t i = 0; i < batch_req.size(); ++i) { + BatchUserRequest::UserRequest* req = batch_request.add_user_requests(); + *req->mutable_request() = *batch_req[i]->user_request.get(); + *req->mutable_signature() = batch_req[i]->context->signature; + req->set_id(i); + context_list.push_back(std::move(batch_req[i]->context)); + } + + if (!config_.IsPerformanceRunning()) { + LOG(ERROR) << "add context list:" << new_request->seq() + << " list size:" << context_list.size(); + batch_request.set_local_id(local_id_); + } + batch_request.set_createtime(GetCurrentTime()); + std::string data; + batch_request.SerializeToString(&data); + if (verifier_) { + auto signature_or = verifier_->SignMessage(data); + if (!signature_or.ok()) { + LOG(ERROR) << "Sign message fail"; + return -2; + } + *new_request->mutable_data_signature() = *signature_or; + } + + batch_request.SerializeToString(new_request->mutable_data()); + new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data())); + new_request->set_proxy_id(config_.GetSelfInfo().id()); + replica_communicator_->SendMessage(*new_request, GetPrimary()); + send_num_++; + LOG(INFO) << "send msg to primary:" << GetPrimary() + << " batch size:" << batch_req.size(); + return 0; +} + +} // namespace common +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/response_manager.h b/platform/consensus/ordering/common/framework/response_manager.h new file mode 100644 index 000000000..1c7043964 --- /dev/null +++ b/platform/consensus/ordering/common/framework/response_manager.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once + +#include "platform/config/resdb_config.h" +#include "platform/consensus/ordering/common/framework/transaction_utils.h" +#include "platform/networkstrate/replica_communicator.h" +#include "platform/networkstrate/server_comm.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace common { + +class ResponseManager { + public: + ResponseManager(const ResDBConfig& config, + ReplicaCommunicator* replica_communicator, + SignatureVerifier* verifier); + + ~ResponseManager(); + + std::vector> FetchContextList(uint64_t id); + + int NewUserRequest(std::unique_ptr context, + std::unique_ptr user_request); + + int ProcessResponseMsg(std::unique_ptr context, + std::unique_ptr request); + + private: + // Add response messages which will be sent back to the caller + // if there are f+1 same messages. + comm::CollectorResultCode AddResponseMsg( + std::unique_ptr request, + std::function call_back); + void SendResponseToClient(const BatchUserResponse& batch_response); + + struct QueueItem { + std::unique_ptr context; + std::unique_ptr user_request; + }; + int DoBatch(const std::vector>& batch_req); + int BatchProposeMsg(); + int GetPrimary(); + + private: + ResDBConfig config_; + ReplicaCommunicator* replica_communicator_; + LockFreeQueue batch_queue_; + std::thread user_req_thread_; + std::atomic stop_; + uint64_t local_id_ = 0; + Stats* global_stats_; + std::atomic send_num_; + SignatureVerifier* verifier_; + static const int response_set_size_ = 6000000; + std::map response_[response_set_size_]; + std::mutex response_lock_[response_set_size_]; +}; + +} // common +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/transaction_utils.cpp b/platform/consensus/ordering/common/framework/transaction_utils.cpp new file mode 100644 index 000000000..08a8e5444 --- /dev/null +++ b/platform/consensus/ordering/common/framework/transaction_utils.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/common/framework/transaction_utils.h" + +namespace resdb { +namespace comm { + +std::unique_ptr NewRequest(Request::Type type, const Request& request, + int sender_id) { + auto new_request = std::make_unique(request); + new_request->set_type(type); + new_request->set_sender_id(sender_id); + return new_request; +} + +std::unique_ptr NewRequest(Request::Type type, const Request& request, + int sender_id, int region_id) { + auto new_request = std::make_unique(request); + new_request->set_type(type); + new_request->set_sender_id(sender_id); + new_request->mutable_region_info()->set_region_id(region_id); + return new_request; +} + +} // namespace comm +} // namespace resdb diff --git a/platform/consensus/ordering/common/framework/transaction_utils.h b/platform/consensus/ordering/common/framework/transaction_utils.h new file mode 100644 index 000000000..3055cf44c --- /dev/null +++ b/platform/consensus/ordering/common/framework/transaction_utils.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once +#include "platform/proto/replica_info.pb.h" +#include "platform/proto/resdb.pb.h" + +namespace resdb { +namespace comm { + +enum CollectorResultCode { + INVALID = -2, + OK = 0, + STATE_CHANGED = 1, +}; + +std::unique_ptr NewRequest(Request::Type type, const Request& request, + int sender_id); + +std::unique_ptr NewRequest(Request::Type type, const Request& request, + int sender_id, int region_info); +} // namespace comm +} // namespace resdb diff --git a/platform/consensus/ordering/poe/algorithm/BUILD b/platform/consensus/ordering/poe/algorithm/BUILD new file mode 100644 index 000000000..357f56d8b --- /dev/null +++ b/platform/consensus/ordering/poe/algorithm/BUILD @@ -0,0 +1,15 @@ +package(default_visibility = ["//platform/consensus/ordering/poe:__subpackages__"]) + +cc_library( + name = "poe", + srcs = ["poe.cpp"], + hdrs = ["poe.h"], + deps = [ + "//platform/statistic:stats", + "//common:comm", + "//platform/consensus/ordering/poe/proto:proposal_cc_proto", + "//common/crypto:signature_verifier", + "//platform/consensus/ordering/common/algorithm:protocol_base", + "//platform/common/queue:lock_free_queue", + ], +) diff --git a/platform/consensus/ordering/poe/algorithm/poe.cpp b/platform/consensus/ordering/poe/algorithm/poe.cpp new file mode 100644 index 000000000..8a9b3a317 --- /dev/null +++ b/platform/consensus/ordering/poe/algorithm/poe.cpp @@ -0,0 +1,75 @@ +#include "platform/consensus/ordering/poe/algorithm/poe.h" + +#include + +#include "common/crypto/signature_verifier.h" +#include "common/utils/utils.h" + +namespace resdb { +namespace poe { + +PoE::PoE(int id, int f, int total_num, SignatureVerifier* verifier) + : ProtocolBase(id, f, total_num), verifier_(verifier) { + + LOG(ERROR) << "get proposal graph"; + id_ = id; + total_num_ = total_num; + f_ = f; + is_stop_ = false; + seq_ = 0; +} + +PoE::~PoE() { + is_stop_ = true; +} + +bool PoE::IsStop() { return is_stop_; } + +bool PoE::ReceiveTransaction(std::unique_ptr txn) { + // LOG(ERROR)<<"recv txn:"; + txn->set_create_time(GetCurrentTime()); + txn->set_seq(seq_++); + txn->set_proposer(id_); + + Broadcast(MessageType::Propose, *txn); + return true; +} + +bool PoE::ReceivePropose(std::unique_ptr txn) { + std::string hash = txn->hash(); + int64_t seq = txn->seq(); + int proposer = txn->proposer(); + { + std::unique_lock lk(mutex_); + data_[txn->hash()]=std::move(txn); + } + + Proposal proposal; + proposal.set_hash(hash); + proposal.set_seq(seq); + proposal.set_proposer(id_); + Broadcast(MessageType::Prepare, proposal); + return true; +} + +bool PoE::ReceivePrepare(std::unique_ptr proposal) { + std::unique_ptr txn = nullptr; + { + std::unique_lock lk(mutex_); + received_[proposal->hash()].insert(proposal->proposer()); + auto it = data_.find(proposal->hash()); + if(it != data_.end()){ + if(received_[proposal->hash()].size()>=2*f_+1){ + txn = std::move(it->second); + data_.erase(it); + } + } + } + if(txn != nullptr){ + commit_(*txn); + } + return true; +} + +} // namespace poe +} // namespace resdb diff --git a/platform/consensus/ordering/poe/algorithm/poe.h b/platform/consensus/ordering/poe/algorithm/poe.h new file mode 100644 index 000000000..20cc71a93 --- /dev/null +++ b/platform/consensus/ordering/poe/algorithm/poe.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include +#include + +#include "platform/common/queue/lock_free_queue.h" +#include "platform/consensus/ordering/common/algorithm/protocol_base.h" +#include "platform/consensus/ordering/poe/proto/proposal.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace poe { + +class PoE: public common::ProtocolBase { + public: + PoE(int id, int f, int total_num, SignatureVerifier* verifier); + ~PoE(); + + bool ReceiveTransaction(std::unique_ptr txn); + bool ReceivePropose(std::unique_ptr txn); + bool ReceivePrepare(std::unique_ptr proposal); + + private: + bool IsStop(); + + private: + std::mutex mutex_; + std::map > received_; + std::map > data_; + + int64_t seq_; + bool is_stop_; + SignatureVerifier* verifier_; + Stats* global_stats_; +}; + +} // namespace cassandra +} // namespace resdb diff --git a/platform/consensus/ordering/poe/framework/BUILD b/platform/consensus/ordering/poe/framework/BUILD new file mode 100644 index 000000000..7030d2a0d --- /dev/null +++ b/platform/consensus/ordering/poe/framework/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//visibility:private"]) + +cc_library( + name = "consensus", + srcs = ["consensus.cpp"], + hdrs = ["consensus.h"], + visibility = [ + "//visibility:public", + ], + deps = [ + "//common/utils", + "//platform/consensus/ordering/common/framework:consensus", + "//platform/consensus/ordering/poe/algorithm:poe", + ], +) + diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp b/platform/consensus/ordering/poe/framework/consensus.cpp new file mode 100644 index 000000000..b401adaf6 --- /dev/null +++ b/platform/consensus/ordering/poe/framework/consensus.cpp @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/poe/framework/consensus.h" + +#include +#include + +#include "common/utils/utils.h" + +namespace resdb { +namespace poe { + +Consensus::Consensus(const ResDBConfig& config, + std::unique_ptr executor) + : common::Consensus(config, std::move(executor)){ + int total_replicas = config_.GetReplicaNum(); + int f = (total_replicas - 1) / 3; + + Init(); + + start_ = 0; + + if (config_.GetPublicKeyCertificateInfo() + .public_key() + .public_key_info() + .type() != CertificateKeyInfo::CLIENT) { + poe_ = std::make_unique( + config_.GetSelfInfo().id(), f, + total_replicas, GetSignatureVerifier()); + InitProtocol(poe_.get()); + } +} + +int Consensus::ProcessCustomConsensus(std::unique_ptr request) { + if (request->user_type() == MessageType::Propose) { + std::unique_ptr txn = std::make_unique(); + if (!txn->ParseFromString(request->data())) { + assert(1 == 0); + LOG(ERROR) << "parse proposal fail"; + return -1; + } + poe_->ReceivePropose(std::move(txn)); + return 0; + } else if (request->user_type() == MessageType::Prepare) { + std::unique_ptr proposal = std::make_unique(); + if (!proposal->ParseFromString(request->data())) { + LOG(ERROR) << "parse proposal fail"; + assert(1 == 0); + return -1; + } + poe_->ReceivePrepare(std::move(proposal)); + return 0; + } + return 0; +} + +int Consensus::ProcessNewTransaction(std::unique_ptr request) { + std::unique_ptr txn = std::make_unique(); + txn->set_data(request->data()); + txn->set_hash(request->hash()); + txn->set_proxy_id(request->proxy_id()); + txn->set_uid(request->uid()); + return poe_->ReceiveTransaction(std::move(txn)); +} + +int Consensus::CommitMsg(const google::protobuf::Message& msg) { + return CommitMsgInternal(dynamic_cast(msg)); +} + +int Consensus::CommitMsgInternal(const Transaction& txn) { + std::unique_ptr request = std::make_unique(); + request->set_data(txn.data()); + request->set_seq(txn.seq()); + request->set_uid(txn.uid()); + request->set_proxy_id(txn.proxy_id()); + + transaction_executor_->Commit(std::move(request)); + return 0; +} + +} // namespace poe +} // namespace resdb diff --git a/platform/consensus/ordering/poe/framework/consensus.h b/platform/consensus/ordering/poe/framework/consensus.h new file mode 100644 index 000000000..72e56e181 --- /dev/null +++ b/platform/consensus/ordering/poe/framework/consensus.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once + +#include "executor/common/transaction_manager.h" +#include "platform/consensus/ordering/common/framework/consensus.h" +#include "platform/consensus/ordering/poe/algorithm/poe.h" +#include "platform/networkstrate/consensus_manager.h" + +namespace resdb { +namespace poe { + +class Consensus : public common::Consensus { + public: + Consensus(const ResDBConfig& config, + std::unique_ptr transaction_manager); + virtual ~Consensus() = default; + + private: + int ProcessCustomConsensus(std::unique_ptr request) override; + int ProcessNewTransaction(std::unique_ptr request) override; + int CommitMsg(const google::protobuf::Message& msg) override; + int CommitMsgInternal(const Transaction& txn); + + int Prepare(const Transaction& txn); + + protected: + std::unique_ptr poe_; + Stats* global_stats_; + int64_t start_; + std::mutex mutex_; + int send_num_[200]; +}; + +} // namespace cassandra +} // namespace resdb diff --git a/platform/consensus/ordering/poe/framework/consensus_test.cpp b/platform/consensus/ordering/poe/framework/consensus_test.cpp new file mode 100644 index 000000000..2c8834a8b --- /dev/null +++ b/platform/consensus/ordering/poe/framework/consensus_test.cpp @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/cassandra/framework/consensus.h" + +#include +#include +#include + +#include + +#include "common/test/test_macros.h" +#include "executor/common/mock_transaction_manager.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/networkstrate/mock_replica_communicator.h" + +namespace resdb { +namespace cassandra { +namespace { + +using ::resdb::testing::EqualsProto; +using ::testing::_; +using ::testing::Invoke; +using ::testing::Test; + +ResDBConfig GetConfig() { + ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(1, "127.0.0.1", 1234)); + return config; +} + +class ConsensusTest : public Test { + public: + ConsensusTest() : config_(GetConfig()) { + auto transaction_manager = + std::make_unique(); + mock_transaction_manager_ = transaction_manager.get(); + consensus_ = + std::make_unique(config_, std::move(transaction_manager)); + consensus_->SetCommunicator(&replica_communicator_); + } + + void AddTransaction(const std::string& data) { + auto request = std::make_unique(); + request->set_type(Request::TYPE_NEW_TXNS); + + Transaction txn; + + BatchUserRequest batch_request; + auto req = batch_request.add_user_requests(); + req->mutable_request()->set_data(data); + + batch_request.set_local_id(1); + batch_request.SerializeToString(txn.mutable_data()); + + txn.SerializeToString(request->mutable_data()); + + EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0); + } + + protected: + ResDBConfig config_; + MockTransactionExecutorDataImpl* mock_transaction_manager_; + MockReplicaCommunicator replica_communicator_; + std::unique_ptr transaction_manager_; + std::unique_ptr consensus_; +}; + +TEST_F(ConsensusTest, NormalCase) { + std::promise commit_done; + std::future commit_done_future = commit_done.get_future(); + + EXPECT_CALL(replica_communicator_, BroadCast) + .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) { + Request request = *dynamic_cast(&msg); + + if (request.user_type() == MessageType::NewProposal) { + LOG(ERROR) << "bc new proposal"; + consensus_->ConsensusCommit(nullptr, + std::make_unique(request)); + LOG(ERROR) << "recv proposal done"; + } + if (request.user_type() == MessageType::Vote) { + LOG(ERROR) << "bc vote"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique(request); + ack_msg.SerializeToString(new_req->mutable_data()); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + // LOG(ERROR)<<"bc type:"<type()<<" user + // type:"<user_type(); + if (request.user_type() == MessageType::Prepare) { + LOG(ERROR) << "bc prepare"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique(request); + ack_msg.SerializeToString(new_req->mutable_data()); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + if (request.user_type() == MessageType::Voteprep) { + LOG(ERROR) << "bc voterep:"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique(request); + ack_msg.SerializeToString(new_req->mutable_data()); + LOG(ERROR) << "new request type:" << new_req->user_type(); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + LOG(ERROR) << "done"; + return 0; + })); + + EXPECT_CALL(*mock_transaction_manager_, ExecuteData) + .WillOnce(Invoke([&](const std::string& msg) { + LOG(ERROR) << "execute txn:" << msg; + EXPECT_EQ(msg, "transaction1"); + return nullptr; + })); + + EXPECT_CALL(replica_communicator_, SendMessage(_, 0)) + .WillRepeatedly( + Invoke([&](const google::protobuf::Message& msg, int64_t) { + Request request = *dynamic_cast(&msg); + if (request.type() == Request::TYPE_RESPONSE) { + LOG(ERROR) << "get response"; + commit_done.set_value(true); + } + return; + })); + + AddTransaction("transaction1"); + + commit_done_future.get(); +} + +} // namespace +} // namespace cassandra +} // namespace resdb diff --git a/platform/consensus/ordering/poe/proto/BUILD b/platform/consensus/ordering/poe/proto/BUILD new file mode 100644 index 000000000..8088db092 --- /dev/null +++ b/platform/consensus/ordering/poe/proto/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//platform/consensus/ordering/poe:__subpackages__"]) + +load("@rules_cc//cc:defs.bzl", "cc_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@rules_proto_grpc//python:defs.bzl", "python_proto_library") + +proto_library( + name = "proposal_proto", + srcs = ["proposal.proto"], + #visibility = ["//visibility:public"], +) + +cc_proto_library( + name = "proposal_cc_proto", + deps = [":proposal_proto"], +) diff --git a/platform/consensus/ordering/poe/proto/proposal.proto b/platform/consensus/ordering/poe/proto/proposal.proto new file mode 100644 index 000000000..8302752ad --- /dev/null +++ b/platform/consensus/ordering/poe/proto/proposal.proto @@ -0,0 +1,28 @@ + +syntax = "proto3"; + +package resdb.poe; + +message Transaction{ + int32 id = 1; + bytes data = 2; + bytes hash = 3; + int32 proxy_id = 4; + int32 proposer = 5; + int64 uid = 6; + int64 create_time = 7; + int64 seq = 9; +} + +message Proposal { + bytes hash = 1; + int32 proposer = 2; + int64 seq =3 ; +} + +enum MessageType { + None = 0; + Propose = 1; + Prepare = 2; +} + diff --git a/platform/networkstrate/replica_communicator.cpp b/platform/networkstrate/replica_communicator.cpp index 4057e27d9..7083fe6ec 100644 --- a/platform/networkstrate/replica_communicator.cpp +++ b/platform/networkstrate/replica_communicator.cpp @@ -257,7 +257,7 @@ void ReplicaCommunicator::SendMessage(const google::protobuf::Message& message, } if (target_replica.ip().empty()) { - LOG(ERROR) << "no replica info"; + LOG(ERROR) << "no replica info node:"<