Skip to content

Commit

Permalink
k/server: added ability to limit concurrent produce requests
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Jan 22, 2025
1 parent 198a96a commit e0b15ae
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 9 deletions.
7 changes: 7 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2893,6 +2893,13 @@ configuration::configuration()
"List of superuser usernames.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
{})
, max_concurrent_produce_requests(
*this,
"max_concurrent_produce_requests",
"Maximum number of requests that results in Raft replication that may be "
"pending in Kafka server",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::nullopt)
, kafka_qdc_latency_alpha(
*this,
"kafka_qdc_latency_alpha",
Expand Down
3 changes: 2 additions & 1 deletion src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,8 @@ struct configuration final : public config_store {
property<uint64_t> cloud_storage_inventory_max_hash_size_during_parse;

one_or_many_property<ss::sstring> superusers;

// kafka
property<std::optional<size_t>> max_concurrent_produce_requests;
// kakfa queue depth control: latency ewma
property<double> kafka_qdc_latency_alpha;
property<std::chrono::milliseconds> kafka_qdc_window_size_ms;
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ ss::future<session_resources> connection_context::throttle_request(
auto mem_units = co_await reserve_request_units(
r_data.request_key, request_size);

auto qd_units = co_await server().get_request_unit();
auto qd_units = co_await server().get_request_unit(r_data.request_key);

auto& h_probe = _server.handler_probe(r_data.request_key);
auto tracker = std::make_unique<request_tracker>(_server.probe(), h_probe);
Expand Down
31 changes: 31 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "features/enterprise_feature_messages.h"
#include "features/feature_table.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/produce.h"
#include "kafka/protocol/schemata/list_groups_response.h"
#include "kafka/server/connection_context.h"
#include "kafka/server/coordinator_ntp_mapper.h"
Expand Down Expand Up @@ -182,6 +183,11 @@ server::server(
cfg->local().max_service_memory_per_core
* config::shard_local_cfg().kafka_memory_share_for_fetch()),
"kafka/server-mem-fetch")
, _max_concurrent_produce_requests(
config::shard_local_cfg().max_concurrent_produce_requests.bind())
, _produce_requests_sem(
_max_concurrent_produce_requests().value_or(ss::semaphore::max_counter()),
"kafka/max-produce-requests")
, _probe(std::make_unique<class kafka_probe>())
, _sasl_probe(std::make_unique<class sasl_probe>())
, _read_dist_probe(std::make_unique<read_distribution_probe>())
Expand All @@ -202,6 +208,13 @@ server::server(

_sasl_probe->setup_metrics(cfg->local().name);
_read_dist_probe->setup_metrics();
_max_concurrent_produce_requests.watch([this] {
// when max concurrent produce requests is not set we use the max
// capacity
_produce_requests_sem.set_capacity(
_max_concurrent_produce_requests().value_or(
ss::semaphore::max_counter()));
});
}

void server::setup_metrics() {
Expand All @@ -221,12 +234,30 @@ void server::setup_metrics() {
});
}

ss::future<ssx::semaphore_units> server::get_request_unit(api_key key) {
if (_qdc_mon) [[unlikely]] {
return _qdc_mon->qdc.get_unit();
}
if (
key == produce_api::key || key == offset_commit_api::key
|| key == txn_offset_commit_api::key) {
return _produce_requests_sem.get_units(1);
}
return ss::make_ready_future<ssx::semaphore_units>(ssx::semaphore_units());
}

ss::scheduling_group server::fetch_scheduling_group() const {
return config::shard_local_cfg().use_fetch_scheduler_group()
? _fetch_scheduling_group
: ss::default_scheduling_group();
}

ss::scheduling_group server::produce_scheduling_group() const {
return config::shard_local_cfg().use_produce_scheduler_group()
? _produce_scheduling_group
: ss::default_scheduling_group();
}

coordinator_ntp_mapper& server::coordinator_mapper() {
return _group_router.local().coordinator_mapper().local();
}
Expand Down
11 changes: 4 additions & 7 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "security/gssapi_principal_mapper.h"
#include "security/krb5_configurator.h"
#include "security/mtls.h"
#include "utils/adjustable_semaphore.h"
#include "utils/ema.h"

#include <seastar/core/future.hh>
Expand Down Expand Up @@ -162,13 +163,7 @@ class server final
}
}

ss::future<ssx::semaphore_units> get_request_unit() {
if (_qdc_mon) {
return _qdc_mon->qdc.get_unit();
}
return ss::make_ready_future<ssx::semaphore_units>(
ssx::semaphore_units());
}
ss::future<ssx::semaphore_units> get_request_unit(api_key);

cluster::controller_api& controller_api() {
return _controller_api.local();
Expand Down Expand Up @@ -259,6 +254,8 @@ class server final
security::gssapi_principal_mapper _gssapi_principal_mapper;
security::krb5::configurator _krb_configurator;
ssx::semaphore _memory_fetch_sem;
config::binding<std::optional<size_t>> _max_concurrent_produce_requests;
adjustable_semaphore _produce_requests_sem;

handler_probe_manager _handler_probes;
metrics::internal_metric_groups _metrics;
Expand Down

0 comments on commit e0b15ae

Please sign in to comment.