Skip to content

Commit

Permalink
Cherry-pick some bug fix from 2.0 (#2465)
Browse files Browse the repository at this point in the history
* cherry pick #431 from 2.0

* cherry pick #424 from 2.0

Co-authored-by: Yee <[email protected]>
  • Loading branch information
critical27 and yixinglu authored Apr 7, 2021
1 parent 02bf8aa commit f71d502
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 132 deletions.
1 change: 0 additions & 1 deletion src/common/thrift/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
nebula_add_library(
thrift_obj OBJECT
ReconnectingRequestChannel.cpp
ThriftClientManager.cpp
)
92 changes: 0 additions & 92 deletions src/common/thrift/ReconnectingRequestChannel.cpp

This file was deleted.

80 changes: 43 additions & 37 deletions src/common/thrift/ThriftClientManager.inl
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include <thrift/lib/cpp2/async/ReconnectingRequestChannel.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include <thrift/lib/cpp/async/TAsyncSocket.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/system/ThreadName.h>
#include "network/NetworkUtils.h"

Expand All @@ -18,51 +18,57 @@ namespace thrift {
template<class ClientType>
std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(
const HostAddr& host, folly::EventBase* evb, bool compatibility, uint32_t timeout) {
VLOG(2) << "Getting a client to "
<< network::NetworkUtils::intToIPv4(host.first)
<< ":" << host.second;

if (evb == nullptr) {
evb = folly::EventBaseManager::get()->getEventBase();
}

auto it = clientMap_->find(std::make_pair(host, evb));
if (it != clientMap_->end()) {
return it->second;
}

// Need to create a new client
auto ipAddr = network::NetworkUtils::intToIPv4(host.first);
auto port = host.second;
VLOG(2) << "There is no existing client to "
<< ipAddr << ":" << port
<< ", trying to create one";
auto channel = apache::thrift::ReconnectingRequestChannel::newChannel(
*evb, [compatibility, ipAddr, port, timeout] (folly::EventBase& eb) mutable {
static thread_local int connectionCount = 0;
VLOG(2) << "Connecting to " << ipAddr << ":" << port
<< " for " << ++connectionCount << " times";
std::shared_ptr<apache::thrift::async::TAsyncSocket> socket;
eb.runImmediatelyOrRunInEventBaseThreadAndWait(
[&socket, &eb, ipAddr, port]() {
socket = apache::thrift::async::TAsyncSocket::newSocket(
&eb, ipAddr, port, FLAGS_conn_timeout_ms);
});
auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket);
if (timeout > 0) {
headerClientChannel->setTimeout(timeout);
// Get client from client manager if it is ok.
auto it = clientMap_->find(std::make_pair(host, evb));
if (it != clientMap_->end()) {
do {
auto channel =
dynamic_cast<apache::thrift::HeaderClientChannel*>(it->second->getChannel());
if (channel == nullptr || !channel->good()) {
// Remove bad connection to create a new one.
clientMap_->erase(it);
VLOG(2) << "Invalid Channel: " << channel << " for host " << ipAddr << ":" << port;
break;
}
if (compatibility) {
headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL);
headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED);
auto transport = dynamic_cast<folly::AsyncSocket*>(channel->getTransport());
if (transport == nullptr || transport->hangup()) {
clientMap_->erase(it);
VLOG(2) << "Transport is closed by peers " << transport << " for host "
<< ipAddr << ":" << port;
break;
}
return headerClientChannel;
});
std::shared_ptr<ClientType> client(new ClientType(std::move(channel)), [evb](auto* p) {
evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] {
delete p;
});
VLOG(2) << "Getting a client to " << ipAddr << ":" << port;
return it->second;
} while (false);
}

// Need to create a new client and insert it to client map.
VLOG(2) << "There is no existing client to " << host << ", trying to create one";
static thread_local int connectionCount = 0;

VLOG(2) << "Connecting to " << host << " for " << ++connectionCount << " times";
std::shared_ptr<apache::thrift::async::TAsyncSocket> socket;
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&socket, evb, ipAddr, port]() {
socket = apache::thrift::async::TAsyncSocket::newSocket(
evb, ipAddr, port, FLAGS_conn_timeout_ms);
});
auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket);
if (timeout > 0) {
headerClientChannel->setTimeout(timeout);
}
if (compatibility) {
headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL);
headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED);
}
std::shared_ptr<ClientType> client(
new ClientType(std::move(headerClientChannel)),
[evb](auto* p) { evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] { delete p; }); });
clientMap_->emplace(std::make_pair(host, evb), client);
return client;
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <folly/ScopeGuard.h>


DEFINE_int32(heartbeat_interval_secs, 3, "Heartbeat interval");
DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval");
DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry");
DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry");
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
Expand Down Expand Up @@ -1828,8 +1828,8 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
leaderIds_.clear();
leaderIds_ = leaderIds;
}
req.set_leader_partIds(std::move(leaderIds));
}
req.set_leader_partIds(std::move(leaderIds));
} else {
req.set_leader_partIds(std::move(leaderIds));
}
Expand Down

0 comments on commit f71d502

Please sign in to comment.