From 9f6515588547580c505ea8c8425640a2d0c3be6b Mon Sep 17 00:00:00 2001 From: prha Date: Thu, 2 Jan 2025 15:26:16 -0800 Subject: [PATCH] add concurrency config schema --- .../dagster/_core/instance/__init__.py | 10 +- .../dagster/dagster/_core/instance/config.py | 191 +++++++++++++++--- .../run_coordinator/queued_run_coordinator.py | 18 ++ ..._run_coordinator_concurrency_mismatch.yaml | 8 + .../error_run_queue_concurrency_mismatch.yaml | 5 + .../merged_run_coordinator_concurrency.yaml | 15 ++ .../merged_run_queue_concurrency.yaml | 12 ++ .../instance_tests/test_instance_config.py | 43 +++- 8 files changed, 277 insertions(+), 25 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_coordinator_concurrency_mismatch.yaml create mode 100644 python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_queue_concurrency_mismatch.yaml create mode 100644 python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml create mode 100644 python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 1d288f347f0b6..d94ed41e9c5da 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -789,7 +789,10 @@ def get_run_queue_config(self) -> Optional["RunQueueConfig"]: if not isinstance(self.run_coordinator, QueuedRunCoordinator): return None - return self.run_coordinator.get_run_queue_config() + run_coordinator_run_queue_config = self.run_coordinator.get_run_queue_config() + return run_coordinator_run_queue_config.with_concurrency_settings( + self.get_settings("concurrency") + ) @property def run_launcher(self) -> "RunLauncher": @@ -966,6 +969,11 @@ def auto_materialize_use_sensors(self) -> int: @property def global_op_concurrency_default_limit(self) -> Optional[int]: + default_limit = self.get_settings("concurrency").get("pools", {}).get("default_limit") + if default_limit is not None: + return default_limit + + # fallback to the old settings return self.get_settings("concurrency").get("default_op_concurrency_limit") # python logs diff --git a/python_modules/dagster/dagster/_core/instance/config.py b/python_modules/dagster/dagster/_core/instance/config.py index 8ebf060c3135a..61a5ce630f3a3 100644 --- a/python_modules/dagster/dagster/_core/instance/config.py +++ b/python_modules/dagster/dagster/_core/instance/config.py @@ -1,19 +1,22 @@ import logging import os -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, Optional, cast from dagster import ( Array, Bool, + String, _check as check, ) from dagster._config import ( Field, IntSource, + Noneable, Permissive, ScalarUnion, Selector, + Shape, StringSource, validate_config, ) @@ -55,7 +58,7 @@ def dagster_instance_config( f" desired behavior, create an empty {config_filename} file in {base_dir}." ) - dagster_config_dict = merge_dicts(load_yaml_from_globs(config_yaml_path) or {}, overrides) + dagster_config_dict = merge_dicts(load_yaml_from_globs(config_yaml_path) or {}, overrides or {}) if "instance_class" in dagster_config_dict: custom_instance_class_data = dagster_config_dict["instance_class"] @@ -120,18 +123,7 @@ def dagster_instance_config( # validate default op concurrency limits if "concurrency" in dagster_config_dict: - default_concurrency_limit = dagster_config_dict["concurrency"].get( - "default_op_concurrency_limit" - ) - if default_concurrency_limit is not None: - max_limit = get_max_concurrency_limit_value() - if default_concurrency_limit < 0 or default_concurrency_limit > max_limit: - raise DagsterInvalidConfigError( - f"Found value `{default_concurrency_limit}` for `default_op_concurrency_limit`, " - f"Expected value between 0-{max_limit}.", - [], - None, - ) + validate_concurrency_config(dagster_config_dict) dagster_config = validate_config(schema, dagster_config_dict) if not dagster_config.success: @@ -157,6 +149,90 @@ def run_queue_config_schema() -> Field: ) +def validate_concurrency_config(dagster_config_dict: Mapping[str, Any]): + concurrency_config = dagster_config_dict["concurrency"] + if "pools" in concurrency_config: + if concurrency_config.get("default_op_concurrency_limit") is not None: + raise DagsterInvalidConfigError( + "Found config for `default_op_concurrency_limit` which is incompatible with `pools` config. Use `pools > default_limit` instead.", + [], + None, + ) + + default_concurrency_limit = check.opt_inst( + pluck_config_value(concurrency_config, ["pools", "default_limit"]), int + ) + if default_concurrency_limit is not None: + max_limit = get_max_concurrency_limit_value() + if default_concurrency_limit < 0 or default_concurrency_limit > max_limit: + raise DagsterInvalidConfigError( + f"Found value `{default_concurrency_limit}` for `pools > default_limit`, " + f"Expected value between 0-{max_limit}.", + [], + None, + ) + elif "default_op_concurrency_limit" in concurrency_config: + default_concurrency_limit = check.opt_inst( + pluck_config_value(concurrency_config, ["default_op_concurrency_limit"]), int + ) + if default_concurrency_limit is not None: + max_limit = get_max_concurrency_limit_value() + if default_concurrency_limit < 0 or default_concurrency_limit > max_limit: + raise DagsterInvalidConfigError( + f"Found value `{default_concurrency_limit}` for `default_op_concurrency_limit`, " + f"Expected value between 0-{max_limit}.", + [], + None, + ) + + using_concurrency_config = "runs" in concurrency_config or "pools" in concurrency_config + if using_concurrency_config: + conflicting_run_queue_fields = [ + ["max_concurrent_runs"], + ["tag_concurrency_limits"], + ["block_op_concurrency_limited_runs", "op_concurrency_slot_buffer"], + ] + if "run_queue" in dagster_config_dict: + for field in conflicting_run_queue_fields: + if pluck_config_value(dagster_config_dict, ["run_queue", *field]) is not None: + raise DagsterInvalidConfigError( + f"Found config value for `{field}` in `run_queue` which is incompatible with the `concurrency > runs` config", + [], + None, + ) + + if "run_coordinator" in dagster_config_dict: + if ( + pluck_config_value(dagster_config_dict, ["run_coordinator", "class"]) + == "QueuedRunCoordinator" + ): + for field in conflicting_run_queue_fields: + if ( + pluck_config_value( + dagster_config_dict, ["run_coordinator", "config", *field] + ) + is not None + ): + raise DagsterInvalidConfigError( + f"Found config value for `{field}` in `run_coordinator` which is incompatible with the `concurrency > runs` config", + [], + None, + ) + + +def pluck_config_value(config: Mapping[str, Any], path: Sequence[str]): + value = config + for part in path: + if not isinstance(value, dict): + return None + + value = value.get(part) + if value is None: + return value + + return value + + def storage_config_schema() -> Field: return Field( Selector( @@ -345,6 +421,83 @@ def secrets_loader_config_schema() -> Field: ) +def get_concurrency_config() -> Field: + return Field( + { + "pools": Field( + { + "default_limit": Field( + int, + is_required=False, + description="The default maximum number of concurrent operations for an unconfigured pool", + ), + "granularity": Field( + str, + is_required=False, + description="The granularity of the concurrency enforcement of the pool. One of `run` or `op`.", + ), + "op_run_buffer": Field( + int, + is_required=False, + description=( + "When the pool scope is set to `op`, this determines the number of runs " + "that can be launched with all of its steps blocked waiting for pool slots " + "to be freed." + ), + ), + } + ), + "runs": Field( + { + "max_concurrent_runs": Field( + int, + is_required=False, + description=( + "The maximum number of runs that are allowed to be in progress at once." + " Defaults to 10. Set to -1 to disable the limit. Set to 0 to stop any runs" + " from launching. Any other negative values are disallowed." + ), + ), + "tag_concurrency_limits": Field( + config=Noneable( + Array( + Shape( + { + "key": String, + "value": Field( + ScalarUnion( + scalar_type=String, + non_scalar_schema=Shape( + {"applyLimitPerUniqueValue": Bool} + ), + ), + is_required=False, + ), + "limit": Field(int), + } + ) + ) + ), + is_required=False, + description=( + "A set of limits that are applied to runs with particular tags. If a value is" + " set, the limit is applied to only that key-value pair. If no value is set," + " the limit is applied across all values of that key. If the value is set to a" + " dict with `applyLimitPerUniqueValue: true`, the limit will apply to the" + " number of unique values for that key." + ), + ), + } + ), + "default_op_concurrency_limit": Field( + int, + is_required=False, + description="[Deprecated] The default maximum number of concurrent operations for an unconfigured concurrency key", + ), + } + ) + + def dagster_instance_config_schema() -> Mapping[str, Field]: return { "local_artifact_storage": config_field_for_configurable_class(), @@ -432,13 +585,5 @@ def dagster_instance_config_schema() -> Mapping[str, Field]: ), } ), - "concurrency": Field( - { - "default_op_concurrency_limit": Field( - int, - is_required=False, - description="The default maximum number of concurrent operations for an unconfigured concurrency key", - ), - } - ), + "concurrency": get_concurrency_config(), } diff --git a/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py b/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py index bfbd06278e74c..90ba66764e696 100644 --- a/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py +++ b/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py @@ -54,6 +54,24 @@ def __new__( check.int_param(op_concurrency_slot_buffer, "op_concurrency_slot_buffer"), ) + def with_concurrency_settings( + self, concurrency_settings: Mapping[str, Any] + ) -> "RunQueueConfig": + run_settings = concurrency_settings.get("runs", {}) + pool_settings = concurrency_settings.get("pools", {}) + return RunQueueConfig( + max_concurrent_runs=run_settings.get("max_concurrent_runs", self.max_concurrent_runs), + tag_concurrency_limits=run_settings.get( + "tag_concurrency_limits", self.tag_concurrency_limits + ), + max_user_code_failure_retries=self.max_user_code_failure_retries, + user_code_failure_retry_delay=self.user_code_failure_retry_delay, + should_block_op_concurrency_limited_runs=self.should_block_op_concurrency_limited_runs, + op_concurrency_slot_buffer=pool_settings.get( + "op_run_buffer", self.op_concurrency_slot_buffer + ), + ) + class QueuedRunCoordinator(RunCoordinator[T_DagsterInstance], ConfigurableClass): """Enqueues runs via the run storage, to be deqeueued by the Dagster Daemon process. Requires diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_coordinator_concurrency_mismatch.yaml b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_coordinator_concurrency_mismatch.yaml new file mode 100644 index 0000000000000..923fc6bda2106 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_coordinator_concurrency_mismatch.yaml @@ -0,0 +1,8 @@ +concurrency: + runs: + max_concurrent_runs: 5 +run_coordinator: + module: dagster.core.run_coordinator + class: QueuedRunCoordinator + config: + max_concurrent_runs: 6 diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_queue_concurrency_mismatch.yaml b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_queue_concurrency_mismatch.yaml new file mode 100644 index 0000000000000..8dd4de4556011 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/error_run_queue_concurrency_mismatch.yaml @@ -0,0 +1,5 @@ +concurrency: + runs: + max_concurrent_runs: 5 +run_queue: + max_concurrent_runs: 6 diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml new file mode 100644 index 0000000000000..df80ed4be8fba --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml @@ -0,0 +1,15 @@ +concurrency: + pools: + granularity: op + op_run_buffer: 1 + runs: + max_concurrent_runs: 5 + tag_concurrency_limits: + - key: "dagster/solid_selection" + limit: 2 +run_coordinator: + module: dagster.core.run_coordinator + class: QueuedRunCoordinator + config: + max_user_code_failure_retries: 3 + user_code_failure_retry_delay: 10 diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml new file mode 100644 index 0000000000000..72f76cfcca2a9 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml @@ -0,0 +1,12 @@ +concurrency: + pools: + granularity: op + op_run_buffer: 1 + runs: + max_concurrent_runs: 5 + tag_concurrency_limits: + - key: "dagster/solid_selection" + limit: 2 +run_queue: + max_user_code_failure_retries: 3 + user_code_failure_retry_delay: 10 diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py index 6865cc833533c..29d506902c986 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py @@ -1,7 +1,8 @@ import pytest from dagster import file_relative_path +from dagster._core.errors import DagsterInvalidConfigError from dagster._core.instance.config import dagster_instance_config -from dagster._core.test_utils import environ +from dagster._core.test_utils import environ, instance_for_test @pytest.mark.parametrize("config_filename", ("dagster.yaml", "something.yaml")) @@ -10,3 +11,43 @@ def test_instance_yaml_config_not_set(config_filename, caplog): with environ({"DAGSTER_HOME": base_dir}): dagster_instance_config(base_dir, config_filename) assert "No dagster instance configuration file" in caplog.text + + +@pytest.mark.parametrize( + "config_filename", + ( + "merged_run_coordinator_concurrency.yaml", + "merged_run_queue_concurrency.yaml", + ), +) +def test_concurrency_config(config_filename, caplog): + base_dir = file_relative_path(__file__, "./test_config") + with environ({"DAGSTER_HOME": base_dir}): + instance_config, _ = dagster_instance_config(base_dir, config_filename) + with instance_for_test(overrides=instance_config) as instance: + run_queue_config = instance.get_run_queue_config() + assert run_queue_config + assert run_queue_config.max_concurrent_runs == 5 + assert run_queue_config.tag_concurrency_limits == [ + { + "key": "dagster/solid_selection", + "limit": 2, + } + ] + assert run_queue_config.max_user_code_failure_retries == 3 + assert run_queue_config.user_code_failure_retry_delay == 10 + assert run_queue_config.op_concurrency_slot_buffer == 1 + + +@pytest.mark.parametrize( + "config_filename", + ( + "error_run_coordinator_concurrency_mismatch.yaml", + "error_run_queue_concurrency_mismatch.yaml", + ), +) +def test_concurrency_config_mismatch(config_filename, caplog): + base_dir = file_relative_path(__file__, "./test_config") + with environ({"DAGSTER_HOME": base_dir}): + with pytest.raises(DagsterInvalidConfigError, match="the `concurrency > "): + dagster_instance_config(base_dir, config_filename)