From 78e36ac7635ba7519c36d24c4965dccddd8c9130 Mon Sep 17 00:00:00 2001 From: Anika Kumar Date: Thu, 31 Oct 2024 10:35:09 -0700 Subject: [PATCH] incorporate free storage metrics into autoscaler tuning --- .../elastic_graph/datastore_core/client.rbs | 1 + .../lib/elastic_graph/elasticsearch/client.rb | 4 + .../elasticsearch/client_spec.rb | 2 + ...ticgraph-indexer_autoscaler_lambda.gemspec | 1 + .../indexer_autoscaler_lambda.rb | 14 ++- .../concurrency_scaler.rb | 52 +++++++- .../details_logger.rb | 18 ++- .../lambda_function.rb | 1 + .../concurrency_scaler.rbs | 7 +- .../details_logger.rbs | 5 + .../indexer_autoscaler_lambda.rbs | 4 + .../spec/support/builds_indexer_autoscaler.rb | 2 + .../concurrency_scaler_spec.rb | 113 ++++++++++++++---- .../lambda_function_spec.rb | 1 + .../lib/elastic_graph/opensearch/client.rb | 4 + .../elastic_graph/opensearch/client_spec.rb | 2 + .../datastore_client_shared_examples.rb | 6 + 17 files changed, 204 insertions(+), 33 deletions(-) diff --git a/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/client.rbs b/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/client.rbs index e5b94e3e..3f8e68b9 100644 --- a/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/client.rbs +++ b/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/client.rbs @@ -9,6 +9,7 @@ module ElasticGraph def get_cluster_health: () -> ::Hash[::String, untyped] def get_node_os_stats: () -> ::Hash[::String, untyped] + def get_node_os_roles: () -> ::Hash[::String, untyped] def get_flat_cluster_settings: () -> ::Hash[::String, untyped] def put_persistent_cluster_settings: (::Hash[::Symbol | ::String, untyped]) -> void diff --git a/elasticgraph-elasticsearch/lib/elastic_graph/elasticsearch/client.rb b/elasticgraph-elasticsearch/lib/elastic_graph/elasticsearch/client.rb index 272cdea1..2b76a0e6 100644 --- a/elasticgraph-elasticsearch/lib/elastic_graph/elasticsearch/client.rb +++ b/elasticgraph-elasticsearch/lib/elastic_graph/elasticsearch/client.rb @@ -72,6 +72,10 @@ def get_cluster_health def get_node_os_stats transform_errors { |c| c.nodes.stats(metric: "os").body } end + + def get_node_roles + transform_errors { |c| c.nodes.stats(metric: "roles").body } + end def get_flat_cluster_settings transform_errors { |c| c.cluster.get_settings(flat_settings: true).body } diff --git a/elasticgraph-elasticsearch/spec/unit/elastic_graph/elasticsearch/client_spec.rb b/elasticgraph-elasticsearch/spec/unit/elastic_graph/elasticsearch/client_spec.rb index 5d4936fa..b65f36f5 100644 --- a/elasticgraph-elasticsearch/spec/unit/elastic_graph/elasticsearch/client_spec.rb +++ b/elasticgraph-elasticsearch/spec/unit/elastic_graph/elasticsearch/client_spec.rb @@ -24,6 +24,8 @@ def define_stubs(stub, requested_stubs) stub.get("/_cluster/health") { |env| response_for(body, env) } in :get_node_os_stats stub.get("/_nodes/stats/os") { |env| response_for(body, env) } + in :get_node_roles + stub.get("/_nodes/stats/roles") { |env| response_for(body, env) } in :get_flat_cluster_settings stub.get("/_cluster/settings?flat_settings=true") { |env| response_for(body, env) } in :put_persistent_cluster_settings diff --git a/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec b/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec index 2d19185f..3d18c925 100644 --- a/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec +++ b/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec @@ -16,6 +16,7 @@ ElasticGraphGemspecHelper.define_elasticgraph_gem(gemspec_file: __FILE__, catego spec.add_dependency "aws-sdk-lambda", "~> 1.125" spec.add_dependency "aws-sdk-sqs", "~> 1.80" + spec.add_dependency "aws-sdk-cloudwatch", "~> 1.10" # aws-sdk-sqs requires an XML library be available. On Ruby < 3 it'll use rexml from the standard library but on Ruby 3.0+ # we have to add an explicit dependency. It supports ox, oga, libxml, nokogiri or rexml, and of those, ox seems to be the diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb index 50bc9a58..02bc3ff2 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb @@ -32,11 +32,13 @@ def self.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block) def initialize( datastore_core:, sqs_client: nil, - lambda_client: nil + lambda_client: nil, + cloudwatch_client: nil ) @datastore_core = datastore_core @sqs_client = sqs_client @lambda_client = lambda_client + @cloudwatch_client = cloudwatch_client end def sqs_client @@ -53,13 +55,21 @@ def lambda_client end end + def cloudwatch_client + @cloudwatch_client ||= begin + require "aws-sdk-cloudwatch" + Aws::CloudWatch::Client.new + end + end + def concurrency_scaler @concurrency_scaler ||= begin require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler" ConcurrencyScaler.new( datastore_core: @datastore_core, sqs_client: sqs_client, - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) end end diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb index ab90a8a5..bb7f31e9 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb @@ -12,16 +12,17 @@ module ElasticGraph class IndexerAutoscalerLambda # @private class ConcurrencyScaler - def initialize(datastore_core:, sqs_client:, lambda_client:) + def initialize(datastore_core:, sqs_client:, lambda_client:, cloudwatch_client:) @logger = datastore_core.logger @datastore_core = datastore_core @sqs_client = sqs_client @lambda_client = lambda_client + @cloudwatch_client = cloudwatch_client end MINIMUM_CONCURRENCY = 2 - def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:) + def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, minimum_free_storage:, indexer_function_name:) queue_attributes = get_queue_attributes(queue_urls) queue_arns = queue_attributes.fetch(:queue_arns) num_messages = queue_attributes.fetch(:total_messages) @@ -37,6 +38,8 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi new_target_concurrency = if num_messages.positive? + free_storage = get_min_free_storage + cpu_utilization = get_max_cpu_utilization cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0 @@ -45,13 +48,17 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi if current_concurrency.nil? details_logger.log_unset nil + elsif free_storage < minimum_free_storage + details_logger.log_pause(free_storage) + 0 elsif cpu_utilization < min_cpu_target increase_factor = (cpu_midpoint / cpu_utilization).clamp(0.0, 1.5) (current_concurrency * increase_factor).round.tap do |new_concurrency| details_logger.log_increase( cpu_utilization: cpu_utilization, + min_free_storage: free_storage, current_concurrency: current_concurrency, - new_concurrency: new_concurrency + new_concurrency: new_concurrency, ) end elsif cpu_utilization > max_cpu_target @@ -59,6 +66,7 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi (current_concurrency - (current_concurrency * decrease_factor)).round.tap do |new_concurrency| details_logger.log_decrease( cpu_utilization: cpu_utilization, + min_free_storage: free_storage, current_concurrency: current_concurrency, new_concurrency: new_concurrency ) @@ -66,6 +74,7 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi else details_logger.log_no_change( cpu_utilization: cpu_utilization, + min_free_storage: free_storage, current_concurrency: current_concurrency ) current_concurrency @@ -94,6 +103,43 @@ def get_max_cpu_utilization end.max.to_f end + def get_min_free_storage + metric_data_queries = get_data_node_ids_by_cluster_name.map(&:first).map do |cluster_name, node_id| + { + id: node_id, + metric_stat: { + metric: { + namespace: 'AWS/ES', + metric_name: 'FreeStorageSpace', + dimensions: [ + { name: 'DomainName', value: cluster_name }, + { name: 'NodeId', value: node_id } + ] + }, + period: 30, # seconds + stat: 'Minimum' + }, + return_data: true + } + end + + metric_response = @cloudwatch_client.get_metric_data({ + start_time: ::Time.now - 900, # past 15 minutes + end_time: ::Time.now, + metric_data_queries: metric_data_queries + }) + + metric_response.metric_data_results.map { |result| result.values.first }.min / (1024 * 1024) # result is in bytes + end + + def get_data_node_ids_by_cluster_name + @datastore_core.clients_by_name.flat_map do |name, client| + client.get_node_roles.map do |id, roles| + roles["roles"].include?("data") ? { name => id } : nil + end + end.compact + end + def get_queue_attributes(queue_urls) attributes_per_queue = queue_urls.map do |queue_url| @sqs_client.get_queue_attributes( diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb index 94936583..3803ebf4 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb @@ -30,29 +30,39 @@ def initialize( } end - def log_increase(cpu_utilization:, current_concurrency:, new_concurrency:) + def log_increase(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:) log_result({ "action" => "increase", "cpu_utilization" => cpu_utilization, + "min_free_storage" => min_free_storage, "current_concurrency" => current_concurrency, "new_concurrency" => new_concurrency }) end - def log_decrease(cpu_utilization:, current_concurrency:, new_concurrency:) + def log_decrease(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:) log_result({ "action" => "decrease", "cpu_utilization" => cpu_utilization, + "min_free_storage" => min_free_storage, "current_concurrency" => current_concurrency, "new_concurrency" => new_concurrency }) end - def log_no_change(cpu_utilization:, current_concurrency:) + def log_no_change(cpu_utilization:, min_free_storage:, current_concurrency:) log_result({ "action" => "no_change", "cpu_utilization" => cpu_utilization, - "current_concurrency" => current_concurrency + "min_free_storage" => min_free_storage, + "current_concurrency" => current_concurrency, + }) + end + + def log_pause(min_free_storage) + log_result({ + "action" => "pause", + "min_free_storage" => min_free_storage }) end diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb index f9d550f3..7a277347 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb @@ -26,6 +26,7 @@ def handle_request(event:, context:) min_cpu_target: event.fetch("min_cpu_target"), max_cpu_target: event.fetch("max_cpu_target"), maximum_concurrency: event.fetch("maximum_concurrency"), + minimum_free_storage: event.fetch("minimum_free_storage"), indexer_function_name: event.fetch("indexer_function_name") ) end diff --git a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs index 29a25142..9a2e9eff 100644 --- a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs +++ b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs @@ -4,7 +4,8 @@ module ElasticGraph def initialize: ( datastore_core: DatastoreCore, sqs_client: Aws::SQS::Client, - lambda_client: Aws::Lambda::Client + lambda_client: Aws::Lambda::Client, + cloudwatch_client: Aws::CloudWatch::Client ) -> void MINIMUM_CONCURRENCY: ::Integer @@ -14,6 +15,7 @@ module ElasticGraph min_cpu_target: ::Integer, max_cpu_target: ::Integer, maximum_concurrency: ::Integer, + minimum_free_storage: ::Integer, indexer_function_name: ::String ) -> void @@ -23,8 +25,11 @@ module ElasticGraph @datastore_core: DatastoreCore @sqs_client: Aws::SQS::Client @lambda_client: Aws::Lambda::Client + @cloudwatch_client: Aws::CloudWatch::Client def get_max_cpu_utilization: () -> ::Float + def get_min_free_storage: () -> ::Float + def get_data_node_ids_by_cluster_name: () -> ::Hash[::String, ::String] def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] } def get_concurrency: (::String) -> ::Integer? diff --git a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs index b14e081a..d7b6078c 100644 --- a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs +++ b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs @@ -12,21 +12,26 @@ module ElasticGraph def log_increase: ( cpu_utilization: ::Float, + min_free_storage: ::Float, current_concurrency: ::Integer, new_concurrency: ::Integer ) -> void def log_decrease: ( cpu_utilization: ::Float, + min_free_storage: ::Float, current_concurrency: ::Integer, new_concurrency: ::Integer ) -> void def log_no_change: ( cpu_utilization: ::Float, + min_free_storage: ::Float, current_concurrency: ::Integer ) -> void + def log_pause: (::String) -> void + def log_reset: () -> void def log_unset: () -> void diff --git a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs index 0a5260ae..c484a597 100644 --- a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs +++ b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs @@ -11,6 +11,7 @@ module ElasticGraph datastore_core: DatastoreCore, ?sqs_client: Aws::SQS::Client?, ?lambda_client: Aws::Lambda::Client?, + ?cloudwatch_client: Aws::CloudWatch::Client?, ) -> void @sqs_client: Aws::SQS::Client? @@ -19,6 +20,9 @@ module ElasticGraph @lambda_client: Aws::Lambda::Client? def lambda_client: () -> Aws::Lambda::Client + @cloudwatch_client: Aws::CloudWatch::Client? + def cloudwatch_client: () -> Aws::CloudWatch::Client + @concurrency_scaler: ConcurrencyScaler? def concurrency_scaler: () -> ConcurrencyScaler end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb b/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb index 11cf7e90..da3657aa 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb @@ -16,6 +16,7 @@ module BuildsIndexerAutoscalerLambda def build_indexer_autoscaler( sqs_client: nil, lambda_client: nil, + cloudwatch_client: nil, **datastore_core_options, &customize_datastore_config ) @@ -28,6 +29,7 @@ def build_indexer_autoscaler( IndexerAutoscalerLambda.new( sqs_client: sqs_client, lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client, datastore_core: datastore_core ) end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb index 1e949061..249c7e8a 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb @@ -8,6 +8,7 @@ require "aws-sdk-lambda" require "aws-sdk-sqs" +require "aws-sdk-cloudwatch" require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler" require "support/builds_indexer_autoscaler" @@ -22,13 +23,16 @@ class IndexerAutoscalerLambda let(:max_cpu_target) { 80 } let(:cpu_midpoint) { 75 } let(:maximum_concurrency) { 1000 } + let(:minimum_free_storage) { 10000 } it "1.5x the concurrency when the CPU usage is significantly below the minimum target" do lambda_client = lambda_client_with_concurrency(200) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(10.0), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -39,10 +43,12 @@ class IndexerAutoscalerLambda it "increases concurrency by a factor CPU usage when CPU is slightly below the minimum target" do # CPU is at 50% and our target range is 70-80. 75 / 50 = 1.5, so increase it by 50%. lambda_client = lambda_client_with_concurrency(200) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(50.0), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -53,10 +59,12 @@ class IndexerAutoscalerLambda it "sets concurrency to the max when it cannot be increased anymore when CPU usage is under the limit" do current_concurrency = maximum_concurrency - 1 lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(10), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -67,10 +75,12 @@ class IndexerAutoscalerLambda it "decreases concurrency by a factor of the CPU when the CPU usage is over the limit" do # CPU is at 90% and our target range is 70-80. 90 / 75 = 1.2, so decrease it by 20%. lambda_client = lambda_client_with_concurrency(500) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(90.0), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -81,10 +91,12 @@ class IndexerAutoscalerLambda it "leaves concurrency unchanged when it cannot be decreased anymore when CPU utilization is over the limit" do current_concurrency = 0 lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(100), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -94,11 +106,13 @@ class IndexerAutoscalerLambda it "does not adjust concurrency when the CPU is within the target range" do lambda_client = lambda_client_with_concurrency(500) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) [min_cpu_target, cpu_midpoint, max_cpu_target].each do |cpu_usage| concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(cpu_usage), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -113,10 +127,12 @@ class IndexerAutoscalerLambda expect(high_cpu_usage).to be > max_cpu_target lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(min_cpu_target, high_cpu_usage), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -125,13 +141,30 @@ class IndexerAutoscalerLambda expect(updated_concurrency_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08 end + it "resets the concurrency when free storage space drops below the minimum regardless of cpu" do + lambda_client = lambda_client_with_concurrency(500) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1, minimum_free_storage - 1) + concurrency_scaler = build_concurrency_scaler( + datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1), + sqs_client: sqs_client_with_number_of_messages(1), + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client + ) + + tune_indexer_concurrency(concurrency_scaler) + + expect(updated_concurrency_requested_from(lambda_client)).to eq [2] # 2 is the minimum + end + it "sets concurrency to the min when there are no messages in the queue" do current_concurrency = 500 lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1), sqs_client: sqs_client_with_number_of_messages(0), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -141,12 +174,14 @@ class IndexerAutoscalerLambda it "leaves concurrency unset if it is currently unset" do lambda_client = lambda_client_without_concurrency + cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1) # CPU is at 50% and our target range is 70-80. concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(50), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -165,24 +200,34 @@ def updated_concurrency_requested_from(lambda_client) end def datastore_client_with_cpu_usage(percent, percent2 = percent) - stubbed_datastore_client(get_node_os_stats: { - "nodes" => { - "node1" => { - "os" => { - "cpu" => { - "percent" => percent + stubbed_datastore_client( + get_node_os_stats: { + "nodes" => { + "node1" => { + "os" => { + "cpu" => { + "percent" => percent + } + } + }, + "node2" => { + "os" => { + "cpu" => { + "percent" => percent2 + } } } + } + }, + get_node_roles: { + "node1" => { + "roles" => ["data"] }, "node2" => { - "os" => { - "cpu" => { - "percent" => percent2 - } - } + "roles" => ["data"] } } - }) + ) end def sqs_client_with_number_of_messages(num_messages) @@ -204,6 +249,26 @@ def lambda_client_with_concurrency(concurrency) end end + def cloudwatch_client_with_storage_metrics(free_storage, free_storage2 = free_storage) + ::Aws::CloudWatch::Client.new(stub_responses: true).tap do |cloudwatch_client| + cloudwatch_client.stub_responses(:get_metric_data, { + # return values are in bytes + metric_data_results: [ + { + id: "node1", + values: [(free_storage * 1024 * 1024).to_f], + timestamps: [::Time.parse("2024-10-30T12:00:00Z")] + }, + { + id: "node2", + values: [(free_storage2 * 1024 * 1024).to_f], + timestamps: [::Time.parse("2024-10-30T12:00:00Z")] + } + ] + }) + end + end + # If the lambda is using unreserved concurrency, reserved_concurrent_executions on the Lambda client will be nil. def lambda_client_without_concurrency ::Aws::Lambda::Client.new(stub_responses: true).tap do |lambda_client| @@ -213,11 +278,12 @@ def lambda_client_without_concurrency end end - def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:) + def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:, cloudwatch_client:) build_indexer_autoscaler( clients_by_name: {"main" => datastore_client}, sqs_client: sqs_client, - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ).concurrency_scaler end @@ -227,6 +293,7 @@ def tune_indexer_concurrency(concurrency_scaler) min_cpu_target: min_cpu_target, max_cpu_target: max_cpu_target, maximum_concurrency: maximum_concurrency, + minimum_free_storage: minimum_free_storage, indexer_function_name: indexer_function_name ) end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb index 971e911c..09fdd64f 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb @@ -37,6 +37,7 @@ "min_cpu_target" => 70, "max_cpu_target" => 80, "maximum_concurrency" => 1000, + "minimum_free_storage" => 100, "indexer_function_name" => "some-eg-app-indexer" } lambda_function.handle_request(event: event, context: {}) diff --git a/elasticgraph-opensearch/lib/elastic_graph/opensearch/client.rb b/elasticgraph-opensearch/lib/elastic_graph/opensearch/client.rb index b86fc1bb..452eaba7 100644 --- a/elasticgraph-opensearch/lib/elastic_graph/opensearch/client.rb +++ b/elasticgraph-opensearch/lib/elastic_graph/opensearch/client.rb @@ -75,6 +75,10 @@ def get_node_os_stats transform_errors { |c| c.nodes.stats(metric: "os") } end + def get_node_roles + transform_errors { |c| c.nodes.stats(metric: "roles") } + end + def get_flat_cluster_settings transform_errors { |c| c.cluster.get_settings(flat_settings: true) } end diff --git a/elasticgraph-opensearch/spec/unit/elastic_graph/opensearch/client_spec.rb b/elasticgraph-opensearch/spec/unit/elastic_graph/opensearch/client_spec.rb index f0e130c5..d85e8f3d 100644 --- a/elasticgraph-opensearch/spec/unit/elastic_graph/opensearch/client_spec.rb +++ b/elasticgraph-opensearch/spec/unit/elastic_graph/opensearch/client_spec.rb @@ -47,6 +47,8 @@ def define_stubs(stub, requested_stubs) stub.get("/_cluster/health") { |env| response_for(body, env) } in :get_node_os_stats stub.get("/_nodes/stats/os") { |env| response_for(body, env) } + in :get_node_roles + stub.get("/_nodes/stats/roles") { |env| response_for(body, env) } in :get_flat_cluster_settings stub.get("/_cluster/settings?flat_settings=true") { |env| response_for(body, env) } in :put_persistent_cluster_settings diff --git a/spec_support/lib/elastic_graph/spec_support/datastore_client_shared_examples.rb b/spec_support/lib/elastic_graph/spec_support/datastore_client_shared_examples.rb index e98508b1..2c4c22d7 100644 --- a/spec_support/lib/elastic_graph/spec_support/datastore_client_shared_examples.rb +++ b/spec_support/lib/elastic_graph/spec_support/datastore_client_shared_examples.rb @@ -41,6 +41,12 @@ module ElasticGraph expect(client.get_node_os_stats).to eq "Node stats" end + it "supports `get_node_roles`" do + client = build_client({get_node_roles: "Node roles"}) + + expect(client.get_node_roles).to eq "Node roles" + end + it "supports `get_flat_cluster_settings`" do client = build_client({get_flat_cluster_settings: "Flat cluster settings!"})