Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new concurrency config to consolidate concurrency settings #26931

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,10 @@ def get_run_queue_config(self) -> Optional["RunQueueConfig"]:
if not isinstance(self.run_coordinator, QueuedRunCoordinator):
return None

return self.run_coordinator.get_run_queue_config()
run_coordinator_run_queue_config = self.run_coordinator.get_run_queue_config()
return run_coordinator_run_queue_config.with_concurrency_settings(
self.get_settings("concurrency")
)

@property
def run_launcher(self) -> "RunLauncher":
Expand Down Expand Up @@ -966,6 +969,11 @@ def auto_materialize_use_sensors(self) -> int:

@property
def global_op_concurrency_default_limit(self) -> Optional[int]:
default_limit = self.get_settings("concurrency").get("pools", {}).get("default_limit")
if default_limit is not None:
return default_limit

# fallback to the old settings
return self.get_settings("concurrency").get("default_op_concurrency_limit")

# python logs
Expand Down
191 changes: 168 additions & 23 deletions python_modules/dagster/dagster/_core/instance/config.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import logging
import os
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, cast

from dagster import (
Array,
Bool,
String,
_check as check,
)
from dagster._config import (
Field,
IntSource,
Noneable,
Permissive,
ScalarUnion,
Selector,
Shape,
StringSource,
validate_config,
)
Expand Down Expand Up @@ -55,7 +58,7 @@ def dagster_instance_config(
f" desired behavior, create an empty {config_filename} file in {base_dir}."
)

dagster_config_dict = merge_dicts(load_yaml_from_globs(config_yaml_path) or {}, overrides)
dagster_config_dict = merge_dicts(load_yaml_from_globs(config_yaml_path) or {}, overrides or {})

if "instance_class" in dagster_config_dict:
custom_instance_class_data = dagster_config_dict["instance_class"]
Expand Down Expand Up @@ -120,18 +123,7 @@ def dagster_instance_config(

# validate default op concurrency limits
if "concurrency" in dagster_config_dict:
default_concurrency_limit = dagster_config_dict["concurrency"].get(
"default_op_concurrency_limit"
)
if default_concurrency_limit is not None:
max_limit = get_max_concurrency_limit_value()
if default_concurrency_limit < 0 or default_concurrency_limit > max_limit:
raise DagsterInvalidConfigError(
f"Found value `{default_concurrency_limit}` for `default_op_concurrency_limit`, "
f"Expected value between 0-{max_limit}.",
[],
None,
)
validate_concurrency_config(dagster_config_dict)

dagster_config = validate_config(schema, dagster_config_dict)
if not dagster_config.success:
Expand All @@ -157,6 +149,90 @@ def run_queue_config_schema() -> Field:
)


def validate_concurrency_config(dagster_config_dict: Mapping[str, Any]):
concurrency_config = dagster_config_dict["concurrency"]
if "pools" in concurrency_config:
if concurrency_config.get("default_op_concurrency_limit") is not None:
raise DagsterInvalidConfigError(
"Found config for `default_op_concurrency_limit` which is incompatible with `pools` config. Use `pools > default_limit` instead.",
[],
None,
)

default_concurrency_limit = check.opt_inst(
pluck_config_value(concurrency_config, ["pools", "default_limit"]), int
)
if default_concurrency_limit is not None:
max_limit = get_max_concurrency_limit_value()
if default_concurrency_limit < 0 or default_concurrency_limit > max_limit:
raise DagsterInvalidConfigError(
f"Found value `{default_concurrency_limit}` for `pools > default_limit`, "
f"Expected value between 0-{max_limit}.",
[],
None,
)
elif "default_op_concurrency_limit" in concurrency_config:
default_concurrency_limit = check.opt_inst(
pluck_config_value(concurrency_config, ["default_op_concurrency_limit"]), int
)
if default_concurrency_limit is not None:
max_limit = get_max_concurrency_limit_value()
if default_concurrency_limit < 0 or default_concurrency_limit > max_limit:
raise DagsterInvalidConfigError(
f"Found value `{default_concurrency_limit}` for `default_op_concurrency_limit`, "
f"Expected value between 0-{max_limit}.",
[],
None,
)

using_concurrency_config = "runs" in concurrency_config or "pools" in concurrency_config
if using_concurrency_config:
conflicting_run_queue_fields = [
["max_concurrent_runs"],
["tag_concurrency_limits"],
["block_op_concurrency_limited_runs", "op_concurrency_slot_buffer"],
]
if "run_queue" in dagster_config_dict:
for field in conflicting_run_queue_fields:
if pluck_config_value(dagster_config_dict, ["run_queue", *field]) is not None:
raise DagsterInvalidConfigError(
f"Found config value for `{field}` in `run_queue` which is incompatible with the `concurrency > runs` config",
[],
None,
)

if "run_coordinator" in dagster_config_dict:
if (
pluck_config_value(dagster_config_dict, ["run_coordinator", "class"])
== "QueuedRunCoordinator"
):
for field in conflicting_run_queue_fields:
if (
pluck_config_value(
dagster_config_dict, ["run_coordinator", "config", *field]
)
is not None
):
raise DagsterInvalidConfigError(
f"Found config value for `{field}` in `run_coordinator` which is incompatible with the `concurrency > runs` config",
[],
None,
)


def pluck_config_value(config: Mapping[str, Any], path: Sequence[str]):
value = config
for part in path:
if not isinstance(value, dict):
return None

value = value.get(part)
if value is None:
return value

return value


def storage_config_schema() -> Field:
return Field(
Selector(
Expand Down Expand Up @@ -345,6 +421,83 @@ def secrets_loader_config_schema() -> Field:
)


def get_concurrency_config() -> Field:
return Field(
{
"pools": Field(
{
"default_limit": Field(
int,
is_required=False,
description="The default maximum number of concurrent operations for an unconfigured pool",
),
"granularity": Field(
str,
is_required=False,
description="The granularity of the concurrency enforcement of the pool. One of `run` or `op`.",
),
"op_run_buffer": Field(
int,
is_required=False,
description=(
"When the pool scope is set to `op`, this determines the number of runs "
"that can be launched with all of its steps blocked waiting for pool slots "
"to be freed."
),
),
}
),
"runs": Field(
{
"max_concurrent_runs": Field(
int,
is_required=False,
description=(
"The maximum number of runs that are allowed to be in progress at once."
" Defaults to 10. Set to -1 to disable the limit. Set to 0 to stop any runs"
" from launching. Any other negative values are disallowed."
),
),
"tag_concurrency_limits": Field(
config=Noneable(
Array(
Shape(
{
"key": String,
"value": Field(
ScalarUnion(
scalar_type=String,
non_scalar_schema=Shape(
{"applyLimitPerUniqueValue": Bool}
),
),
is_required=False,
),
"limit": Field(int),
}
)
)
),
is_required=False,
description=(
"A set of limits that are applied to runs with particular tags. If a value is"
" set, the limit is applied to only that key-value pair. If no value is set,"
" the limit is applied across all values of that key. If the value is set to a"
" dict with `applyLimitPerUniqueValue: true`, the limit will apply to the"
" number of unique values for that key."
),
),
}
),
"default_op_concurrency_limit": Field(
int,
is_required=False,
description="[Deprecated] The default maximum number of concurrent operations for an unconfigured concurrency key",
),
}
)


def dagster_instance_config_schema() -> Mapping[str, Field]:
return {
"local_artifact_storage": config_field_for_configurable_class(),
Expand Down Expand Up @@ -432,13 +585,5 @@ def dagster_instance_config_schema() -> Mapping[str, Field]:
),
}
),
"concurrency": Field(
{
"default_op_concurrency_limit": Field(
int,
is_required=False,
description="The default maximum number of concurrent operations for an unconfigured concurrency key",
),
}
),
"concurrency": get_concurrency_config(),
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ def __new__(
check.int_param(op_concurrency_slot_buffer, "op_concurrency_slot_buffer"),
)

def with_concurrency_settings(
self, concurrency_settings: Mapping[str, Any]
) -> "RunQueueConfig":
run_settings = concurrency_settings.get("runs", {})
pool_settings = concurrency_settings.get("pools", {})
return RunQueueConfig(
max_concurrent_runs=run_settings.get("max_concurrent_runs", self.max_concurrent_runs),
tag_concurrency_limits=run_settings.get(
"tag_concurrency_limits", self.tag_concurrency_limits
),
max_user_code_failure_retries=self.max_user_code_failure_retries,
user_code_failure_retry_delay=self.user_code_failure_retry_delay,
should_block_op_concurrency_limited_runs=self.should_block_op_concurrency_limited_runs,
op_concurrency_slot_buffer=pool_settings.get(
"op_run_buffer", self.op_concurrency_slot_buffer
),
)


class QueuedRunCoordinator(RunCoordinator[T_DagsterInstance], ConfigurableClass):
"""Enqueues runs via the run storage, to be deqeueued by the Dagster Daemon process. Requires
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
concurrency:
runs:
max_concurrent_runs: 5
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
concurrency:
runs:
max_concurrent_runs: 5
run_queue:
max_concurrent_runs: 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
concurrency:
pools:
granularity: op
op_run_buffer: 1
runs:
max_concurrent_runs: 5
tag_concurrency_limits:
- key: "dagster/solid_selection"
limit: 2
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_user_code_failure_retries: 3
user_code_failure_retry_delay: 10
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
concurrency:
pools:
granularity: op
op_run_buffer: 1
runs:
max_concurrent_runs: 5
tag_concurrency_limits:
- key: "dagster/solid_selection"
limit: 2
run_queue:
max_user_code_failure_retries: 3
user_code_failure_retry_delay: 10
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from dagster import file_relative_path
from dagster._core.errors import DagsterInvalidConfigError
from dagster._core.instance.config import dagster_instance_config
from dagster._core.test_utils import environ
from dagster._core.test_utils import environ, instance_for_test


@pytest.mark.parametrize("config_filename", ("dagster.yaml", "something.yaml"))
Expand All @@ -10,3 +11,43 @@ def test_instance_yaml_config_not_set(config_filename, caplog):
with environ({"DAGSTER_HOME": base_dir}):
dagster_instance_config(base_dir, config_filename)
assert "No dagster instance configuration file" in caplog.text


@pytest.mark.parametrize(
"config_filename",
(
"merged_run_coordinator_concurrency.yaml",
"merged_run_queue_concurrency.yaml",
),
)
def test_concurrency_config(config_filename, caplog):
base_dir = file_relative_path(__file__, "./test_config")
with environ({"DAGSTER_HOME": base_dir}):
instance_config, _ = dagster_instance_config(base_dir, config_filename)
with instance_for_test(overrides=instance_config) as instance:
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 5
assert run_queue_config.tag_concurrency_limits == [
{
"key": "dagster/solid_selection",
"limit": 2,
}
]
assert run_queue_config.max_user_code_failure_retries == 3
assert run_queue_config.user_code_failure_retry_delay == 10
assert run_queue_config.op_concurrency_slot_buffer == 1


@pytest.mark.parametrize(
"config_filename",
(
"error_run_coordinator_concurrency_mismatch.yaml",
"error_run_queue_concurrency_mismatch.yaml",
),
)
def test_concurrency_config_mismatch(config_filename, caplog):
base_dir = file_relative_path(__file__, "./test_config")
with environ({"DAGSTER_HOME": base_dir}):
with pytest.raises(DagsterInvalidConfigError, match="the `concurrency > "):
dagster_instance_config(base_dir, config_filename)
Loading