Skip to content

Commit

Permalink
Add safety checks for accessing registry pointers
Browse files Browse the repository at this point in the history
Add a new function pair `checked` and `checked_deref` to the `internal`
namespace that check whether a nullable type (such as `std::shared_ptr`)
contains a value before forwarding or using it. Both functions throw an
exception with a customizable error message if the check fails.

We use the new functions to make sure member variables are initialized
using a non-null value or to make sure a registry pointer is valid
before dereferencing it.
  • Loading branch information
Neverlord authored and ckreibich committed Jul 11, 2024
1 parent 1160e9e commit 6050f42
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 17 deletions.
24 changes: 24 additions & 0 deletions libbroker/broker/internal/checked.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <stdexcept>

namespace broker::internal {

// Helper function to check a nullable value and throw an exception if it is
// `nullptr` / `nullopt`. This function is used to simplify error handling in
// the constructor when initializing member functions.
template <class Nullable>
auto checked(Nullable what, const char* msg) {
if (!what)
throw std::logic_error(msg);
return what;
}

template <class Nullable>
auto& checked_deref(Nullable& what, const char* msg) {
if (!what)
throw std::logic_error(msg);
return *what;
}

} // namespace broker::internal
4 changes: 2 additions & 2 deletions libbroker/broker/internal/clone_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ clone_state::clone_state(caf::event_based_actor* ptr,
caf::async::consumer_resource<command_message> in_res,
caf::async::producer_resource<command_message> out_res)
: super(ptr), input(this), max_sync_interval(master_timeout) {
super::init(reg, this_endpoint, ep_clock, std::move(nm), std::move(parent),
std::move(in_res), std::move(out_res));
super::init(std::move(reg), this_endpoint, ep_clock, std::move(nm),
std::move(parent), std::move(in_res), std::move(out_res));
master_topic = store_name / topic::master_suffix();
super::init(input);
max_get_delay = caf::get_or(ptr->config(), "broker.store.max-get-delay",
Expand Down
8 changes: 5 additions & 3 deletions libbroker/broker/internal/core_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "broker/domain_options.hh"
#include "broker/filter_type.hh"
#include "broker/format/bin.hh"
#include "broker/internal/checked.hh"
#include "broker/internal/clone_actor.hh"
#include "broker/internal/killswitch.hh"
#include "broker/internal/master_actor.hh"
Expand Down Expand Up @@ -143,10 +144,11 @@ core_actor_state::core_actor_state(caf::event_based_actor* self, //
id(this_peer),
filter(std::make_shared<shared_filter_type>(std::move(initial_filter))),
clock(clock),
metrics(*reg),
registry(checked(std::move(reg),
"cannot construct the core actor without registry")),
metrics(*registry),
unsafe_inputs(self),
flow_inputs(self),
registry(reg) {
flow_inputs(self) {
// Read config and check for extra configuration parameters.
ttl = caf::get_or(self->config(), "broker.ttl", defaults::ttl);
if (adaptation && adaptation->disable_forwarding) {
Expand Down
6 changes: 3 additions & 3 deletions libbroker/broker/internal/core_actor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ public:
/// Enables manual time management by the user.
endpoint::clock* clock;

/// Stores a reference to the metrics registry.
prometheus_registry_ptr registry;

/// Caches pointers to the Broker metrics.
metrics_t metrics;

Expand Down Expand Up @@ -353,9 +356,6 @@ public:
/// Counts messages that were published directly via message, i.e., without
/// using the back-pressure of flows.
int64_t published_via_async_msg = 0;

/// Stores a reference to the metrics registry.
prometheus_registry_ptr registry;
};

using core_actor = caf::stateful_actor<core_actor_state>;
Expand Down
3 changes: 1 addition & 2 deletions libbroker/broker/internal/core_actor.test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct fixture : test_coordinator_fixture<config> {

std::vector<caf::actor> bridges;

prometheus_registry_ptr registry;
prometheus_registry_ptr registry = std::make_shared<prometheus::Registry>();

using data_message_list = std::vector<data_message>;

Expand Down Expand Up @@ -62,7 +62,6 @@ struct fixture : test_coordinator_fixture<config> {
}

fixture() {
registry = std::make_shared<prometheus::Registry>();
// We don't do networking, but our flares use the socket API.
ep1.id = endpoint_id::random(1);
ep2.id = endpoint_id::random(2);
Expand Down
11 changes: 8 additions & 3 deletions libbroker/broker/internal/master_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "broker/detail/abstract_backend.hh"
#include "broker/detail/assert.hh"
#include "broker/detail/die.hh"
#include "broker/internal/checked.hh"
#include "broker/internal/master_actor.hh"
#include "broker/internal/metric_factory.hh"
#include "broker/store.hh"
Expand Down Expand Up @@ -58,9 +59,13 @@ master_state::master_state(
caf::actor parent, endpoint::clock* ep_clock,
caf::async::consumer_resource<command_message> in_res,
caf::async::producer_resource<command_message> out_res)
: super(ptr), output(this), metrics(*reg, nm) {
super::init(reg, this_endpoint, ep_clock, std::move(nm), std::move(parent),
std::move(in_res), std::move(out_res));
: super(ptr),
output(this),
metrics(checked_deref(reg,
"cannot construct a master actor without registry"),
nm) {
super::init(std::move(reg), this_endpoint, ep_clock, std::move(nm),
std::move(parent), std::move(in_res), std::move(out_res));
super::init(output);
clones_topic = store_name / topic::clone_suffix();
backend = std::move(bp);
Expand Down
2 changes: 1 addition & 1 deletion libbroker/broker/internal/store_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void store_actor_state::init(prometheus_registry_ptr reg,
consumer_resource<command_message> in_res,
producer_resource<command_message> out_res) {
BROKER_ASSERT(clock != nullptr);
this->registry = reg;
this->registry = std::move(reg);
this->clock = clock;
this->store_name = std::move(store_name);
this->id.endpoint = this_endpoint;
Expand Down
12 changes: 9 additions & 3 deletions libbroker/broker/internal/store_actor.hh
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <algorithm>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
Expand All @@ -24,6 +23,7 @@
#include "broker/entity_id.hh"
#include "broker/fwd.hh"
#include "broker/internal/channel.hh"
#include "broker/internal/checked.hh"
#include "broker/internal/type_id.hh"
#include "broker/topic.hh"

Expand Down Expand Up @@ -82,7 +82,10 @@ public:
defaults::store::heartbeat_interval));
out.connection_timeout_factor(get_or(cfg, "broker.store.connection-timeout",
defaults::store::connection_timeout));
out.metrics().init(*registry, store_name);
out.metrics().init(
checked_deref(registry,
"cannot initialize a store actor without registry"),
store_name);
}

template <class Backend>
Expand All @@ -100,7 +103,10 @@ public:
in.heartbeat_interval(heartbeat_interval);
in.connection_timeout_factor(connection_timeout);
in.nack_timeout(nack_timeout);
in.metrics().init(*registry, store_name);
in.metrics().init(
checked_deref(registry,
"cannot initialize a store actor without registry"),
store_name);
}

template <class... Fs>
Expand Down

0 comments on commit 6050f42

Please sign in to comment.