Skip to content

Commit

Permalink
Add application defined sequential consistency for certification
Browse files Browse the repository at this point in the history
The before_prepare() takes now as argument a callback which
is called by the provider after it can guarantee sequential
consistency. This allows application threads which wish to
maintain sequential consistency to enter before_prepare()
calls concurrently without waiting the prior call to finish.
  • Loading branch information
temeo committed Nov 26, 2024
1 parent 1c61b80 commit 3b27be5
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 21 deletions.
20 changes: 19 additions & 1 deletion include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,25 @@ namespace wsrep

/** @name Commit ordering interface */
/** @{ */
int before_prepare();

/**
* This method should be called before the transaction
* is prepared. This call certifies the transaction and
* assigns write set meta data.
*
* @param seq_cb Callback which is passed to underlying
* certify() call. See wsrep::provider::certify().
*
* @return Zero on success, non-zero on failure.
*/
int before_prepare(const wsrep::provider::seq_cb_t* seq_cb);

/** Same as before_prepare() above, but nullptr is passed
* to seq_cb. */
int before_prepare()
{
return before_prepare(nullptr);
}

int after_prepare();

Expand Down
33 changes: 30 additions & 3 deletions include/wsrep/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,37 @@ namespace wsrep
virtual int append_key(wsrep::ws_handle&, const wsrep::key&) = 0;
virtual enum status append_data(
wsrep::ws_handle&, const wsrep::const_buffer&) = 0;

/**
* Callback for application defined sequential consistency.
* The provider will call
* the callback once it can guarantee sequential consistency. */
typedef struct seq_cb {
/** Opaque caller context */
void *ctx;
/** Function to be called by the provider when sequential
* consistency is guaranteed. */
void (*fn)(void *ctx);
} seq_cb_t;

/**
* Certify the write set.
*
* @param client_id[in] Id of the client session.
* @param ws_handle[in,out] Write set handle associated to the current
* transaction.
* @param flags[in] Flags associated to the write set (see struct flag).
* @param ws_meta[out] Write set meta data associated to the
* replicated write set.
* @param seq_cb[in] Optional callback for application defined
* sequential consistency.
*
* @return Status code defined in struct status.
*/
virtual enum status
certify(wsrep::client_id, wsrep::ws_handle&,
int,
wsrep::ws_meta&) = 0;
certify(wsrep::client_id client_id, wsrep::ws_handle& ws_handle,
int flags, wsrep::ws_meta& ws_meta, const seq_cb_t* seq_cb)
= 0;
/**
* BF abort a transaction inside provider.
*
Expand Down
6 changes: 4 additions & 2 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ namespace wsrep

int after_row();

int before_prepare(wsrep::unique_lock<wsrep::mutex>&);
int before_prepare(wsrep::unique_lock<wsrep::mutex>&,
const wsrep::provider::seq_cb_t*);

int after_prepare(wsrep::unique_lock<wsrep::mutex>&);

Expand Down Expand Up @@ -248,7 +249,8 @@ namespace wsrep
bool abort_or_interrupt(wsrep::unique_lock<wsrep::mutex>&);
int streaming_step(wsrep::unique_lock<wsrep::mutex>&, bool force = false);
int certify_fragment(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&);
int certify_commit(wsrep::unique_lock<wsrep::mutex>&,
const wsrep::provider::seq_cb_t*);
int append_sr_keys_for_commit();
int release_commit_order(wsrep::unique_lock<wsrep::mutex>&);
void remove_fragments_in_storage_service_scope(
Expand Down
4 changes: 2 additions & 2 deletions src/client_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ int wsrep::client_state::next_fragment(const wsrep::ws_meta& meta)
return transaction_.next_fragment(meta);
}

int wsrep::client_state::before_prepare()
int wsrep::client_state::before_prepare(const wsrep::provider::seq_cb_t* seq_cb)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
assert(owning_thread_id_ == wsrep::this_thread::get_id());
assert(state_ == s_exec);
return transaction_.before_prepare(lock);
return transaction_.before_prepare(lock, seq_cb);
}

int wsrep::client_state::after_prepare()
Expand Down
20 changes: 10 additions & 10 deletions src/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ int wsrep::transaction::after_row()
return ret;
}

int wsrep::transaction::before_prepare(
wsrep::unique_lock<wsrep::mutex>& lock)
int wsrep::transaction::before_prepare(wsrep::unique_lock<wsrep::mutex>& lock,
const wsrep::provider::seq_cb_t* seq_cb)
{
assert(lock.owns_lock());
int ret(0);
Expand Down Expand Up @@ -349,7 +349,7 @@ int wsrep::transaction::before_prepare(
}
else
{
ret = certify_commit(lock);
ret = certify_commit(lock, seq_cb);
}

assert((ret == 0 && state() == s_preparing) ||
Expand Down Expand Up @@ -465,7 +465,7 @@ int wsrep::transaction::before_commit()
case wsrep::client_state::m_local:
if (state() == s_executing)
{
ret = before_prepare(lock) || after_prepare(lock);
ret = before_prepare(lock, nullptr) || after_prepare(lock);
assert((ret == 0 &&
(state() == s_committing || state() == s_prepared))
||
Expand Down Expand Up @@ -495,7 +495,7 @@ int wsrep::transaction::before_commit()

if (ret == 0 && state() == s_prepared)
{
ret = certify_commit(lock);
ret = certify_commit(lock, nullptr);
assert((ret == 0 && state() == s_committing) ||
(state() == s_must_abort ||
state() == s_must_replay ||
Expand Down Expand Up @@ -543,7 +543,7 @@ int wsrep::transaction::before_commit()
}
else if (state() == s_executing || state() == s_replaying)
{
ret = before_prepare(lock) || after_prepare(lock);
ret = before_prepare(lock, nullptr) || after_prepare(lock);
}
else
{
Expand Down Expand Up @@ -1195,7 +1195,7 @@ int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
provider().certify(client_state_.id(),
ws_handle_,
flags(),
meta));
meta, nullptr));

int ret;
if (cert_ret == wsrep::provider::success)
Expand Down Expand Up @@ -1622,7 +1622,7 @@ int wsrep::transaction::certify_fragment(
cert_ret = provider().certify(client_state_.id(),
ws_handle_,
flags(),
sr_ws_meta);
sr_ws_meta, nullptr);
client_service_.debug_crash(
"crash_replicate_fragment_after_certify");

Expand Down Expand Up @@ -1744,7 +1744,7 @@ int wsrep::transaction::certify_fragment(
}

int wsrep::transaction::certify_commit(
wsrep::unique_lock<wsrep::mutex>& lock)
wsrep::unique_lock<wsrep::mutex>& lock, const provider::seq_cb_t* seq_cb)
{
assert(lock.owns_lock());
assert(active());
Expand Down Expand Up @@ -1828,7 +1828,7 @@ int wsrep::transaction::certify_commit(
cert_ret(provider().certify(client_state_.id(),
ws_handle_,
flags(),
ws_meta_));
ws_meta_, seq_cb));
client_service_.debug_sync("wsrep_after_certification");

lock.lock();
Expand Down
3 changes: 2 additions & 1 deletion src/wsrep_provider_v26.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,8 @@ enum wsrep::provider::status
wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
wsrep::ws_handle& ws_handle,
int flags,
wsrep::ws_meta& ws_meta)
wsrep::ws_meta& ws_meta,
const seq_cb_t* /* todo: use wsrep_cert_fn_v1 extension if provider has it */)
{
mutable_ws_handle mwsh(ws_handle);
mutable_ws_meta mmeta(ws_meta, flags);
Expand Down
2 changes: 1 addition & 1 deletion src/wsrep_provider_v26.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace wsrep
enum wsrep::provider::status
certify(wsrep::client_id, wsrep::ws_handle&,
int,
wsrep::ws_meta&) WSREP_OVERRIDE;
wsrep::ws_meta&, const seq_cb_t*) WSREP_OVERRIDE;
enum wsrep::provider::status
bf_abort(wsrep::seqno,
wsrep::transaction_id,
Expand Down
3 changes: 2 additions & 1 deletion test/mock_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ namespace wsrep
certify(wsrep::client_id client_id,
wsrep::ws_handle& ws_handle,
int flags,
wsrep::ws_meta& ws_meta)
wsrep::ws_meta& ws_meta,
const seq_cb* /* Ignored in unit tests. */)
WSREP_OVERRIDE
{
ws_handle = wsrep::ws_handle(ws_handle.transaction_id(), (void*)1);
Expand Down

0 comments on commit 3b27be5

Please sign in to comment.