Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

- Initialize member cluster ID only on connection to cluster. #14

Merged
merged 2 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbsim/db_high_priority_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,

void db::high_priority_service::after_apply()
{
client_.client_state_.after_statement();
client_.client_state_.after_applying();
}

bool db::high_priority_service::is_replaying() const
Expand Down
3 changes: 1 addition & 2 deletions dbsim/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@

db::server::server(simulator& simulator,
const std::string& name,
const std::string& server_id,
const std::string& address)
: simulator_(simulator)
, storage_engine_(simulator_.params())
, mutex_()
, cond_()
, server_service_(*this)
, server_state_(*this, server_service_,
name, server_id, address, "dbsim_" + name + "_data")
name, address, "dbsim_" + name + "_data")
, last_client_id_(0)
, last_transaction_id_(0)
, appliers_()
Expand Down
1 change: 0 additions & 1 deletion dbsim/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ namespace db
public:
server(simulator& simulator,
const std::string& name,
const std::string& id,
const std::string& address);
void applier_thread();
void start_applier();
Expand Down
4 changes: 2 additions & 2 deletions dbsim/db_server_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ bool db::server_service::sst_before_init() const
std::string db::server_service::sst_request()
{
std::ostringstream os;
os << server_.server_state().id();
os << server_.server_state().name();
wsrep::log_info() << "SST request: "
<< server_.server_state().id();
<< server_.server_state().name();

return os.str();
}
Expand Down
2 changes: 0 additions & 2 deletions dbsim/db_server_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ namespace db
server_state(db::server& server,
wsrep::server_service& server_service,
const std::string& name,
const std::string& server_id,
const std::string& address,
const std::string& working_dir)
: wsrep::server_state(
mutex_,
cond_,
server_service,
name,
server_id,
"",
address,
working_dir,
Expand Down
25 changes: 17 additions & 8 deletions dbsim/db_simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,29 @@ void db::simulator::sst(db::server& server,
const wsrep::gtid& gtid,
bool bypass)
{
wsrep::id id;
std::istringstream is(request);
is >> id;
// The request may contain extra trailing '\0' after it goes
// through the provider, strip it first.
std::string name(request);
name.erase(std::find(name.begin(), name.end(), '\0'), name.end());

wsrep::unique_lock<wsrep::mutex> lock(mutex_);
auto i(servers_.find(id));
wsrep::log_info() << "SST request";
auto i(servers_.find(name));
wsrep::log_info() << "SST request '" << name << "'";
if (i == servers_.end())
{
wsrep::log_error() << "Server " << request << " not found";
wsrep::log_info() << "servers:";
for (const auto& s : servers_)
{
wsrep::log_info() << "server: " << s.first;
}
throw wsrep::runtime_error("Server " + request + " not found");
}
if (bypass == false)
{
wsrep::log_info() << "SST " << server.server_state().id() << " -> " << id;
wsrep::log_info() << "SST "
<< server.server_state().name()
<< " -> " << request;
}
i->second->server_state().sst_received(gtid, 0);
server.server_state().sst_sent(gtid, 0);
Expand Down Expand Up @@ -105,11 +115,10 @@ void db::simulator::start()
wsrep::id server_id(id_os.str());
auto it(servers_.insert(
std::make_pair(
server_id,
name_os.str(),
std::make_unique<db::server>(
*this,
name_os.str(),
id_os.str(),
address_os.str()))));
if (it.second == false)
{
Expand Down
2 changes: 1 addition & 1 deletion dbsim/db_simulator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ namespace db

wsrep::default_mutex mutex_;
const db::params& params_;
std::map<wsrep::id, std::unique_ptr<db::server>> servers_;
std::map<std::string, std::unique_ptr<db::server>> servers_;
std::chrono::time_point<std::chrono::steady_clock> clients_start_;
std::chrono::time_point<std::chrono::steady_clock> clients_stop_;
public:
Expand Down
5 changes: 2 additions & 3 deletions include/wsrep/server_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ namespace wsrep
* A method which will be called when the server
* has been joined to the cluster
*/
void on_connect(const wsrep::gtid& gtid);
void on_connect(const wsrep::view& view);

/**
* A method which will be called when a view
Expand Down Expand Up @@ -540,7 +540,6 @@ namespace wsrep
wsrep::condition_variable& cond,
wsrep::server_service& server_service,
const std::string& name,
const std::string& id,
const std::string& incoming_address,
const std::string& address,
const std::string& working_dir,
Expand All @@ -565,7 +564,7 @@ namespace wsrep
, streaming_appliers_()
, provider_()
, name_(name)
, id_(id)
, id_(wsrep::id::undefined())
, incoming_address_(incoming_address)
, address_(address)
, working_dir_(working_dir)
Expand Down
9 changes: 9 additions & 0 deletions include/wsrep/view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "seqno.hpp"
#include "gtid.hpp"
#include <vector>
#include <iostream>

namespace wsrep
{
Expand Down Expand Up @@ -111,6 +112,8 @@ namespace wsrep
return (members_.empty() && own_index_ == -1);
}

void print(std::ostream& os) const;

private:
wsrep::gtid state_id_;
wsrep::seqno view_seqno_;
Expand All @@ -120,6 +123,12 @@ namespace wsrep
int protocol_version_;
std::vector<wsrep::view::member> members_;
};

static inline
std::ostream& operator<<(std::ostream& os, const wsrep::view& v)
{
v.print(os); return os;
}
}

#endif // WSREP_VIEW
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ add_library(wsrep-lib
logger.cpp
provider.cpp
seqno.cpp
view.cpp
server_state.cpp
transaction.cpp
wsrep_provider_v26.cpp)
Expand Down
46 changes: 28 additions & 18 deletions src/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,14 +565,38 @@ wsrep::server_state::causal_read(int timeout) const
return provider_->causal_read(timeout);
}

void wsrep::server_state::on_connect(const wsrep::gtid& gtid)
void wsrep::server_state::on_connect(const wsrep::view& view)
{
// Sanity checks
if (id_.is_undefined() == false)
{
wsrep::log_warning() << "Unexpected connection in connected state. "
<< "Received view: " << view
<< "Previous ID: " << id_;
assert(0);
}

if (view.own_index() < 0 ||
size_t(view.own_index()) >= view.members().size())
{
std::ostringstream os;
os << "Invalid view on connect: own index out of range: " << view;
wsrep::log_error() << os.str();
assert(0);
throw wsrep::runtime_error(os.str());
}

id_ = view.members()[view.own_index()].id();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have some input validation for view.own_index(). Assert/throw if it is negative or >= view.members().size().


wsrep::log_info() << "Server "
<< name_
<< " connected to cluster at position "
<< gtid;
<< view.state_id()
<< " with ID "
<< id_;

wsrep::unique_lock<wsrep::mutex> lock(mutex_);
connected_gtid_ = gtid;
connected_gtid_ = view.state_id();
state(lock, s_connected);
}

Expand All @@ -599,22 +623,7 @@ void wsrep::server_state::on_view(const wsrep::view& view,
if (view.status() == wsrep::view::primary)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (view.own_index() >= 0)
{
if (id_.is_undefined())
{
// No identifier was passed during server state initialization
// and the ID was generated by the provider.
id_ = view.members()[view.own_index()].id();
}
else
{
// Own identifier must not change between views.
// assert(id_ == view.members()[view.own_index()].id());
}
}
assert(view.final() == false);

//
// Reached primary from connected state. This may mean the following
//
Expand Down Expand Up @@ -703,6 +712,7 @@ void wsrep::server_state::on_view(const wsrep::view& view,
{
close_transactions_at_disconnect(*high_priority_service);
}
id_ = id::undefined();
state(lock, s_disconnected);
}
else if (state_ != s_disconnecting)
Expand Down
38 changes: 38 additions & 0 deletions src/view.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2018 Codership Oy <[email protected]>
*
* This file is part of wsrep-lib.
*
* Wsrep-lib is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* Wsrep-lib is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
*/

#include "wsrep/view.hpp"

void wsrep::view::print(std::ostream& os) const
{
os << " id: " << state_id() << "\n"
<< " status: " << status() << "\n"
<< " prococol_version: " << protocol_version() << "\n"
<< " final: " << final() << "\n"
<< " own_index: " << own_index() << "\n"
<< " members(" << members().size() << "):\n";

for (std::vector<wsrep::view::member>::const_iterator i(members().begin());
i != members().end(); ++i)
{
os << "\t" << (i - members().begin()) /* ordinal index */
<< ") id: " << i->id()
<< ", name: " << i->name() << "\n";
}
}
32 changes: 27 additions & 5 deletions src/wsrep_provider_v26.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ namespace
{
return capabilities;
}
wsrep::view view_from_native(const wsrep_view_info& view_info)
wsrep::view view_from_native(const wsrep_view_info& view_info,
const wsrep::id& own_id)
{
std::vector<wsrep::view::member> members;
for (int i(0); i < view_info.memb_num; ++i)
Expand All @@ -303,6 +304,25 @@ namespace
sizeof(view_info.members[i].incoming)));
members.push_back(wsrep::view::member(id, name, incoming));
}

int own_idx(-1);
if (own_id.is_undefined())
{
// If own ID is undefined, obtain it from the view. This is
// the case on the initial connect to cluster.
own_idx = view_info.my_idx;
}
else
{
// If the node has already obtained its ID from cluster,
// its position in the view (or lack thereof) must be determined
// by the ID.
for (size_t i(0); i < members.size(); ++i)
{
if (own_id == members[i].id()) { own_idx = i; break; }
}
}

return wsrep::view(
wsrep::gtid(
wsrep::id(view_info.state_id.uuid.data,
Expand All @@ -311,7 +331,7 @@ namespace
wsrep::seqno(view_info.view),
map_view_status_from_native(view_info.status),
map_capabilities_from_native(view_info.capabilities),
view_info.my_idx,
own_idx,
view_info.proto_ver,
members);
}
Expand All @@ -325,12 +345,14 @@ namespace
const wsrep_view_info_t* view_info)
{
assert(app_ctx);
wsrep::view view(view_from_native(*view_info));
wsrep::server_state& server_state(
*reinterpret_cast<wsrep::server_state*>(app_ctx));
assert(server_state.id().is_undefined());
wsrep::view view(view_from_native(*view_info, server_state.id()));
assert(view.own_index() >= 0);
try
{
server_state.on_connect(view.state_id());
server_state.on_connect(view);
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
Expand All @@ -354,7 +376,7 @@ namespace
reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
try
{
wsrep::view view(view_from_native(*view_info));
wsrep::view view(view_from_native(*view_info, server_state.id()));
server_state.on_view(view, high_priority_service);
return WSREP_CB_SUCCESS;
}
Expand Down
Loading