Skip to content

Commit

Permalink
factories: renaming to mainThreadDispatcher (envoyproxy#18122)
Browse files Browse the repository at this point in the history
factories: renaming to mainThreadDispatcher
Also using the new factory API in one more place which I can back out if you prefer.

Risk Level: Low
Testing: n/a
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Sep 17, 2021
1 parent f2c69dc commit a3cc673
Show file tree
Hide file tree
Showing 95 changed files with 277 additions and 195 deletions.
4 changes: 2 additions & 2 deletions contrib/rocketmq_proxy/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ Network::FilterFactoryCb RocketmqProxyFilterConfigFactory::createFilterFactoryFr
Server::Configuration::FactoryContext& context) {
std::shared_ptr<ConfigImpl> filter_config = std::make_shared<ConfigImpl>(proto_config, context);
return [filter_config, &context](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(
std::make_shared<ConnectionManager>(*filter_config, context.dispatcher().timeSource()));
filter_manager.addReadFilter(std::make_shared<ConnectionManager>(
*filter_config, context.mainThreadDispatcher().timeSource()));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ActiveMessageTest : public testing::Test {
ActiveMessageTest()
: stats_(RocketmqFilterStats::generateStats("test.", store_)),
config_(rocketmq_proxy_config_, factory_context_),
connection_manager_(config_, factory_context_.dispatcher().timeSource()) {
connection_manager_(config_, factory_context_.mainThreadDispatcher().timeSource()) {
connection_manager_.initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class RocketmqConnectionManagerTest : public Event::TestUsingSimulatedTime, publ
TestUtility::validate(proto_config_);
}
config_ = std::make_unique<TestConfigImpl>(proto_config_, factory_context_, stats_);
conn_manager_ =
std::make_unique<ConnectionManager>(*config_, factory_context_.dispatcher().timeSource());
conn_manager_ = std::make_unique<ConnectionManager>(
*config_, factory_context_.mainThreadDispatcher().timeSource());
conn_manager_->initializeReadFilterCallbacks(filter_callbacks_);
conn_manager_->onNewConnection();
current_ = factory_context_.dispatcher().timeSource().monotonicTime();
current_ = factory_context_.mainThreadDispatcher().timeSource().monotonicTime();
}

void initializeCluster() {
Expand Down
2 changes: 1 addition & 1 deletion contrib/rocketmq_proxy/filters/network/test/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RocketmqRouterTestBase {
cluster_info_(std::make_shared<Upstream::MockClusterInfo>()) {
context_.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
conn_manager_ =
std::make_unique<ConnectionManager>(config_, context_.dispatcher().timeSource());
std::make_unique<ConnectionManager>(config_, context_.mainThreadDispatcher().timeSource());
conn_manager_->initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/sxg/filters/http/test/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ TEST_F(FilterTest, SdsDynamicGenericSecret) {
NiceMock<Event::MockDispatcher> dispatcher;
EXPECT_CALL(secret_context, localInfo()).WillRepeatedly(ReturnRef(local_info));
EXPECT_CALL(secret_context, api()).WillRepeatedly(ReturnRef(*api));
EXPECT_CALL(secret_context, dispatcher()).WillRepeatedly(ReturnRef(dispatcher));
EXPECT_CALL(secret_context, mainThreadDispatcher()).WillRepeatedly(ReturnRef(dispatcher));
EXPECT_CALL(secret_context, stats()).WillRepeatedly(ReturnRef(stats));
EXPECT_CALL(secret_context, initManager()).WillRepeatedly(ReturnRef(init_manager));
EXPECT_CALL(init_manager, add(_))
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/factory_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class FactoryContextBase {
* @return Event::Dispatcher& the main thread's dispatcher. This dispatcher should be used
* for all singleton processing.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/**
* @return Api::Api& a reference to the api object.
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/health_checker_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class HealthCheckerFactoryContext {
* @return Event::Dispatcher& the main thread's dispatcher. This dispatcher should be used
* for all singleton processing.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/*
* @return Upstream::HealthCheckEventLoggerPtr the health check event logger for the
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/resource_monitor_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ResourceMonitorFactoryContext {
* @return Event::Dispatcher& the main thread's dispatcher. This dispatcher should be used
* for all singleton processing.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/**
* @return Server::Options& the command-line options that Envoy was started with.
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/transport_socket_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TransportSocketFactoryContext {
/**
* @return Event::Dispatcher& the main thread's dispatcher.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/**
* @return Server::Options& the command-line options that Envoy was started with.
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost,
vhost_.globalRouteConfig().maxDirectResponseBodySizeBytes())),
per_filter_configs_(route.typed_per_filter_config(), optional_http_filters, factory_context,
validator),
route_name_(route.name()), time_source_(factory_context.dispatcher().timeSource()) {
route_name_(route.name()), time_source_(factory_context.mainThreadDispatcher().timeSource()) {
if (route.route().has_metadata_match()) {
const auto filter_it = route.route().metadata_match().filter_metadata().find(
Envoy::Config::MetadataFilters::get().ENVOY_LB);
Expand Down
8 changes: 5 additions & 3 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,11 @@ void RdsRouteConfigProviderImpl::requestVirtualHostsUpdate(
// execute the callback. still_alive shared_ptr will be deallocated when the current instance of
// the RdsRouteConfigProviderImpl is deallocated; we rely on a weak_ptr to still_alive flag to
// determine if the RdsRouteConfigProviderImpl instance is still valid.
factory_context_.dispatcher().post([this, maybe_still_alive = std::weak_ptr<bool>(still_alive_),
alias, &thread_local_dispatcher,
route_config_updated_cb]() -> void {
factory_context_.mainThreadDispatcher().post([this,
maybe_still_alive =
std::weak_ptr<bool>(still_alive_),
alias, &thread_local_dispatcher,
route_config_updated_cb]() -> void {
if (maybe_still_alive.lock()) {
subscription_->updateOnDemand(alias);
config_update_callbacks_.push_back({alias, thread_local_dispatcher, route_config_updated_cb});
Expand Down
4 changes: 2 additions & 2 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ void ScopedRdsConfigSubscription::onDemandRdsUpdate(
std::shared_ptr<Router::ScopeKey> scope_key, Event::Dispatcher& thread_local_dispatcher,
Http::RouteConfigUpdatedCallback&& route_config_updated_cb,
std::weak_ptr<Envoy::Config::ConfigSubscriptionCommonBase> weak_subscription) {
factory_context_.dispatcher().post([this, &thread_local_dispatcher, scope_key,
route_config_updated_cb, weak_subscription]() {
factory_context_.mainThreadDispatcher().post([this, &thread_local_dispatcher, scope_key,
route_config_updated_cb, weak_subscription]() {
// If the subscription has been destroyed, return immediately.
if (!weak_subscription.lock()) {
thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); });
Expand Down
20 changes: 12 additions & 8 deletions source/common/secret/sds_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ class TlsCertificateSdsApi : public SdsApi, public TlsCertificateConfigProvider
Config::Utility::checkLocalInfo("TlsCertificateSdsApi", secret_provider_context.localInfo());
return std::make_shared<TlsCertificateSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}

TlsCertificateSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
Expand Down Expand Up @@ -226,9 +227,10 @@ class CertificateValidationContextSdsApi : public SdsApi,
secret_provider_context.localInfo());
return std::make_shared<CertificateValidationContextSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}
CertificateValidationContextSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
const std::string& sds_config_name,
Expand Down Expand Up @@ -320,9 +322,10 @@ class TlsSessionTicketKeysSdsApi : public SdsApi, public TlsSessionTicketKeysCon
secret_provider_context.localInfo());
return std::make_shared<TlsSessionTicketKeysSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}

TlsSessionTicketKeysSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
Expand Down Expand Up @@ -392,9 +395,10 @@ class GenericSecretSdsApi : public SdsApi, public GenericSecretConfigProvider {
Config::Utility::checkLocalInfo("GenericSecretSdsApi", secret_provider_context.localInfo());
return std::make_shared<GenericSecretSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}

GenericSecretSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ envoy_cc_library(
"//source/common/upstream:priority_conn_pool_map_impl_lib",
"//source/common/upstream:upstream_lib",
"//source/common/quic:quic_stat_names_lib",
"//source/server:factory_context_base_impl_lib",
"@envoy_api//envoy/admin/v3:pkg_cc_proto",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
Expand Down
8 changes: 4 additions & 4 deletions source/common/upstream/cluster_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ ClusterFactoryImplBase::selectDnsResolver(const envoy::config::cluster::v3::Clus
resolvers.push_back(Network::Address::resolveProtoAddress(resolver_addr));
}
}
return context.dispatcher().createDnsResolver(resolvers, dns_resolver_options);
return context.mainThreadDispatcher().createDnsResolver(resolvers, dns_resolver_options);
}

return context.dnsResolver();
Expand All @@ -127,7 +127,7 @@ ClusterFactoryImplBase::create(const envoy::config::cluster::v3::Cluster& cluste
transport_factory_context =
std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
context.admin(), context.sslContextManager(), *stats_scope, context.clusterManager(),
context.localInfo(), context.dispatcher(), context.stats(),
context.localInfo(), context.mainThreadDispatcher(), context.stats(),
context.singletonManager(), context.threadLocal(), context.messageValidationVisitor(),
context.api(), context.options());

Expand All @@ -141,13 +141,13 @@ ClusterFactoryImplBase::create(const envoy::config::cluster::v3::Cluster& cluste
} else {
new_cluster_pair.first->setHealthChecker(HealthCheckerFactory::create(
cluster.health_checks()[0], *new_cluster_pair.first, context.runtime(),
context.dispatcher(), context.logManager(), context.messageValidationVisitor(),
context.mainThreadDispatcher(), context.logManager(), context.messageValidationVisitor(),
context.api()));
}
}

new_cluster_pair.first->setOutlierDetector(Outlier::DetectorImplFactory::createForCluster(
*new_cluster_pair.first, cluster, context.dispatcher(), context.runtime(),
*new_cluster_pair.first, cluster, context.mainThreadDispatcher(), context.runtime(),
context.outlierEventLogger()));

new_cluster_pair.first->setTransportFactoryContext(std::move(transport_factory_context));
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ClusterFactoryContextImpl : public ClusterFactoryContext {
Network::DnsResolverSharedPtr dnsResolver() override { return dns_resolver_; }
Ssl::ContextManager& sslContextManager() override { return ssl_context_manager_; }
Runtime::Loader& runtime() override { return runtime_; }
Event::Dispatcher& dispatcher() override { return dispatcher_; }
Event::Dispatcher& mainThreadDispatcher() override { return dispatcher_; }
AccessLog::AccessLogManager& logManager() override { return log_manager_; }
const LocalInfo::LocalInfo& localInfo() const override { return local_info_; }
const Server::Options& options() override { return options_; }
Expand Down
44 changes: 23 additions & 21 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1643,8 +1643,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::tcpConnPoolIsIdle(
ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto(
const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
return ClusterManagerPtr{new ClusterManagerImpl(
bootstrap, *this, stats_, tls_, runtime_, local_info_, log_manager_, main_thread_dispatcher_,
admin_, validation_context_, api_, http_context_, grpc_context_, router_context_)};
bootstrap, *this, stats_, tls_, context_.runtime(), context_.localInfo(), log_manager_,
context_.mainThreadDispatcher(), context_.admin(), validation_context_, context_.api(),
http_context_, grpc_context_, router_context_)};
}

Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
Expand All @@ -1655,7 +1656,8 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
TimeSource& source, ClusterConnectivityState& state) {
if (protocols.size() == 3 && runtime_.snapshot().featureEnabled("upstream.use_http3", 100)) {
if (protocols.size() == 3 &&
context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
ASSERT(contains(protocols,
{Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}));
Http::AlternateProtocolsCacheSharedPtr alternate_protocols_cache;
Expand All @@ -1667,30 +1669,30 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
// TODO(RyanTheOptimist): Plumb an actual alternate protocols cache.
Envoy::Http::ConnectivityGrid::ConnectivityOptions coptions{protocols};
return std::make_unique<Http::ConnectivityGrid>(
dispatcher, api_.randomGenerator(), host, priority, options, transport_socket_options,
state, source, alternate_protocols_cache, std::chrono::milliseconds(300), coptions,
quic_stat_names_, stats_);
dispatcher, context_.api().randomGenerator(), host, priority, options,
transport_socket_options, state, source, alternate_protocols_cache,
std::chrono::milliseconds(300), coptions, quic_stat_names_, stats_);
#else
// Should be blocked by configuration checking at an earlier point.
NOT_REACHED_GCOVR_EXCL_LINE;
#endif
}
if (protocols.size() >= 2) {
ASSERT(contains(protocols, {Http::Protocol::Http11, Http::Protocol::Http2}));
return std::make_unique<Http::HttpConnPoolImplMixed>(dispatcher, api_.randomGenerator(), host,
priority, options,
transport_socket_options, state);
return std::make_unique<Http::HttpConnPoolImplMixed>(
dispatcher, context_.api().randomGenerator(), host, priority, options,
transport_socket_options, state);
}
if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 &&
runtime_.snapshot().featureEnabled("upstream.use_http2", 100)) {
return Http::Http2::allocateConnPool(dispatcher, api_.randomGenerator(), host, priority,
options, transport_socket_options, state);
context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) {
return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
priority, options, transport_socket_options, state);
}
if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http3 &&
runtime_.snapshot().featureEnabled("upstream.use_http3", 100)) {
context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
#ifdef ENVOY_ENABLE_QUIC
return Http::Http3::allocateConnPool(dispatcher, api_.randomGenerator(), host, priority,
options, transport_socket_options, state, source,
return Http::Http3::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
priority, options, transport_socket_options, state, source,
quic_stat_names_, stats_);
#else
UNREFERENCED_PARAMETER(source);
Expand All @@ -1699,8 +1701,8 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
#endif
}
ASSERT(protocols.size() == 1 && protocols[0] == Http::Protocol::Http11);
return Http::Http1::allocateConnPool(dispatcher, api_.randomGenerator(), host, priority, options,
transport_socket_options, state);
return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
options, transport_socket_options, state);
}

Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool(
Expand All @@ -1722,12 +1724,12 @@ std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> ProdClusterManagerFactor
const envoy::config::cluster::v3::Cluster& cluster, ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) {
return ClusterFactoryImplBase::create(
cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_,
main_thread_dispatcher_, log_manager_, local_info_, admin_, singleton_manager_,
outlier_event_logger, added_via_api,
cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_, context_.runtime(),
context_.mainThreadDispatcher(), log_manager_, context_.localInfo(), admin_,
singleton_manager_, outlier_event_logger, added_via_api,
added_via_api ? validation_context_.dynamicValidationVisitor()
: validation_context_.staticValidationVisitor(),
api_, options_);
context_.api(), context_.options());
}

CdsApiPtr
Expand Down
Loading

0 comments on commit a3cc673

Please sign in to comment.