diff --git a/libbroker/broker/internal/checked.hh b/libbroker/broker/internal/checked.hh new file mode 100644 index 00000000..26f408c6 --- /dev/null +++ b/libbroker/broker/internal/checked.hh @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace broker::internal { + +// Helper function to check a nullable value and throw an exception if it is +// `nullptr` / `nullopt`. This function is used to simplify error handling in +// the constructor when initializing member functions. +template +auto checked(Nullable what, const char* msg) { + if (!what) + throw std::logic_error(msg); + return what; +} + +template +auto& checked_deref(Nullable& what, const char* msg) { + if (!what) + throw std::logic_error(msg); + return *what; +} + +} // namespace broker::internal diff --git a/libbroker/broker/internal/clone_actor.cc b/libbroker/broker/internal/clone_actor.cc index 5cf4f5bc..8343b5d0 100644 --- a/libbroker/broker/internal/clone_actor.cc +++ b/libbroker/broker/internal/clone_actor.cc @@ -47,8 +47,8 @@ clone_state::clone_state(caf::event_based_actor* ptr, caf::async::consumer_resource in_res, caf::async::producer_resource out_res) : super(ptr), input(this), max_sync_interval(master_timeout) { - super::init(reg, this_endpoint, ep_clock, std::move(nm), std::move(parent), - std::move(in_res), std::move(out_res)); + super::init(std::move(reg), this_endpoint, ep_clock, std::move(nm), + std::move(parent), std::move(in_res), std::move(out_res)); master_topic = store_name / topic::master_suffix(); super::init(input); max_get_delay = caf::get_or(ptr->config(), "broker.store.max-get-delay", diff --git a/libbroker/broker/internal/core_actor.cc b/libbroker/broker/internal/core_actor.cc index a0761b6b..e71ebbef 100644 --- a/libbroker/broker/internal/core_actor.cc +++ b/libbroker/broker/internal/core_actor.cc @@ -26,6 +26,7 @@ #include "broker/domain_options.hh" #include "broker/filter_type.hh" #include "broker/format/bin.hh" +#include "broker/internal/checked.hh" #include "broker/internal/clone_actor.hh" #include "broker/internal/killswitch.hh" #include "broker/internal/master_actor.hh" @@ -143,10 +144,11 @@ core_actor_state::core_actor_state(caf::event_based_actor* self, // id(this_peer), filter(std::make_shared(std::move(initial_filter))), clock(clock), - metrics(*reg), + registry(checked(std::move(reg), + "cannot construct the core actor without registry")), + metrics(*registry), unsafe_inputs(self), - flow_inputs(self), - registry(reg) { + flow_inputs(self) { // Read config and check for extra configuration parameters. ttl = caf::get_or(self->config(), "broker.ttl", defaults::ttl); if (adaptation && adaptation->disable_forwarding) { diff --git a/libbroker/broker/internal/core_actor.hh b/libbroker/broker/internal/core_actor.hh index 5d855d07..636cdf61 100644 --- a/libbroker/broker/internal/core_actor.hh +++ b/libbroker/broker/internal/core_actor.hh @@ -243,6 +243,9 @@ public: /// Enables manual time management by the user. endpoint::clock* clock; + /// Stores a reference to the metrics registry. + prometheus_registry_ptr registry; + /// Caches pointers to the Broker metrics. metrics_t metrics; @@ -353,9 +356,6 @@ public: /// Counts messages that were published directly via message, i.e., without /// using the back-pressure of flows. int64_t published_via_async_msg = 0; - - /// Stores a reference to the metrics registry. - prometheus_registry_ptr registry; }; using core_actor = caf::stateful_actor; diff --git a/libbroker/broker/internal/core_actor.test.cc b/libbroker/broker/internal/core_actor.test.cc index ab6e8f69..eb799af0 100644 --- a/libbroker/broker/internal/core_actor.test.cc +++ b/libbroker/broker/internal/core_actor.test.cc @@ -30,7 +30,7 @@ struct fixture : test_coordinator_fixture { std::vector bridges; - prometheus_registry_ptr registry; + prometheus_registry_ptr registry = std::make_shared(); using data_message_list = std::vector; @@ -62,7 +62,6 @@ struct fixture : test_coordinator_fixture { } fixture() { - registry = std::make_shared(); // We don't do networking, but our flares use the socket API. ep1.id = endpoint_id::random(1); ep2.id = endpoint_id::random(2); diff --git a/libbroker/broker/internal/master_actor.cc b/libbroker/broker/internal/master_actor.cc index 40ddbcb0..c92c9fec 100644 --- a/libbroker/broker/internal/master_actor.cc +++ b/libbroker/broker/internal/master_actor.cc @@ -15,6 +15,7 @@ #include "broker/detail/abstract_backend.hh" #include "broker/detail/assert.hh" #include "broker/detail/die.hh" +#include "broker/internal/checked.hh" #include "broker/internal/master_actor.hh" #include "broker/internal/metric_factory.hh" #include "broker/store.hh" @@ -58,9 +59,13 @@ master_state::master_state( caf::actor parent, endpoint::clock* ep_clock, caf::async::consumer_resource in_res, caf::async::producer_resource out_res) - : super(ptr), output(this), metrics(*reg, nm) { - super::init(reg, this_endpoint, ep_clock, std::move(nm), std::move(parent), - std::move(in_res), std::move(out_res)); + : super(ptr), + output(this), + metrics(checked_deref(reg, + "cannot construct a master actor without registry"), + nm) { + super::init(std::move(reg), this_endpoint, ep_clock, std::move(nm), + std::move(parent), std::move(in_res), std::move(out_res)); super::init(output); clones_topic = store_name / topic::clone_suffix(); backend = std::move(bp); diff --git a/libbroker/broker/internal/store_actor.cc b/libbroker/broker/internal/store_actor.cc index 3e444f36..cab041d6 100644 --- a/libbroker/broker/internal/store_actor.cc +++ b/libbroker/broker/internal/store_actor.cc @@ -72,7 +72,7 @@ void store_actor_state::init(prometheus_registry_ptr reg, consumer_resource in_res, producer_resource out_res) { BROKER_ASSERT(clock != nullptr); - this->registry = reg; + this->registry = std::move(reg); this->clock = clock; this->store_name = std::move(store_name); this->id.endpoint = this_endpoint; diff --git a/libbroker/broker/internal/store_actor.hh b/libbroker/broker/internal/store_actor.hh index e849466a..c3612a9e 100644 --- a/libbroker/broker/internal/store_actor.hh +++ b/libbroker/broker/internal/store_actor.hh @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -24,6 +23,7 @@ #include "broker/entity_id.hh" #include "broker/fwd.hh" #include "broker/internal/channel.hh" +#include "broker/internal/checked.hh" #include "broker/internal/type_id.hh" #include "broker/topic.hh" @@ -82,7 +82,10 @@ public: defaults::store::heartbeat_interval)); out.connection_timeout_factor(get_or(cfg, "broker.store.connection-timeout", defaults::store::connection_timeout)); - out.metrics().init(*registry, store_name); + out.metrics().init( + checked_deref(registry, + "cannot initialize a store actor without registry"), + store_name); } template @@ -100,7 +103,10 @@ public: in.heartbeat_interval(heartbeat_interval); in.connection_timeout_factor(connection_timeout); in.nack_timeout(nack_timeout); - in.metrics().init(*registry, store_name); + in.metrics().init( + checked_deref(registry, + "cannot initialize a store actor without registry"), + store_name); } template