Skip to content

Commit

Permalink
add concurrency config schema
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 22, 2025
1 parent 0b5f237 commit 9f65155
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 25 deletions.
10 changes: 9 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,10 @@ def get_run_queue_config(self) -> Optional["RunQueueConfig"]:
if not isinstance(self.run_coordinator, QueuedRunCoordinator):
return None

return self.run_coordinator.get_run_queue_config()
run_coordinator_run_queue_config = self.run_coordinator.get_run_queue_config()
return run_coordinator_run_queue_config.with_concurrency_settings(
self.get_settings("concurrency")
)

@property
def run_launcher(self) -> "RunLauncher":
Expand Down Expand Up @@ -966,6 +969,11 @@ def auto_materialize_use_sensors(self) -> int:

@property
def global_op_concurrency_default_limit(self) -> Optional[int]:
default_limit = self.get_settings("concurrency").get("pools", {}).get("default_limit")
if default_limit is not None:
return default_limit

# fallback to the old settings
return self.get_settings("concurrency").get("default_op_concurrency_limit")

# python logs
Expand Down
191 changes: 168 additions & 23 deletions python_modules/dagster/dagster/_core/instance/config.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import logging
import os
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, cast

from dagster import (
Array,
Bool,
String,
_check as check,
)
from dagster._config import (
Field,
IntSource,
Noneable,
Permissive,
ScalarUnion,
Selector,
Shape,
StringSource,
validate_config,
)
Expand Down Expand Up @@ -55,7 +58,7 @@ def dagster_instance_config(
f" desired behavior, create an empty {config_filename} file in {base_dir}."
)

dagster_config_dict = merge_dicts(load_yaml_from_globs(config_yaml_path) or {}, overrides)
dagster_config_dict = merge_dicts(load_yaml_from_globs(config_yaml_path) or {}, overrides or {})

if "instance_class" in dagster_config_dict:
custom_instance_class_data = dagster_config_dict["instance_class"]
Expand Down Expand Up @@ -120,18 +123,7 @@ def dagster_instance_config(

# validate default op concurrency limits
if "concurrency" in dagster_config_dict:
default_concurrency_limit = dagster_config_dict["concurrency"].get(
"default_op_concurrency_limit"
)
if default_concurrency_limit is not None:
max_limit = get_max_concurrency_limit_value()
if default_concurrency_limit < 0 or default_concurrency_limit > max_limit:
raise DagsterInvalidConfigError(
f"Found value `{default_concurrency_limit}` for `default_op_concurrency_limit`, "
f"Expected value between 0-{max_limit}.",
[],
None,
)
validate_concurrency_config(dagster_config_dict)

dagster_config = validate_config(schema, dagster_config_dict)
if not dagster_config.success:
Expand All @@ -157,6 +149,90 @@ def run_queue_config_schema() -> Field:
)


def validate_concurrency_config(dagster_config_dict: Mapping[str, Any]):
concurrency_config = dagster_config_dict["concurrency"]
if "pools" in concurrency_config:
if concurrency_config.get("default_op_concurrency_limit") is not None:
raise DagsterInvalidConfigError(
"Found config for `default_op_concurrency_limit` which is incompatible with `pools` config. Use `pools > default_limit` instead.",
[],
None,
)

default_concurrency_limit = check.opt_inst(
pluck_config_value(concurrency_config, ["pools", "default_limit"]), int
)
if default_concurrency_limit is not None:
max_limit = get_max_concurrency_limit_value()
if default_concurrency_limit < 0 or default_concurrency_limit > max_limit:
raise DagsterInvalidConfigError(
f"Found value `{default_concurrency_limit}` for `pools > default_limit`, "
f"Expected value between 0-{max_limit}.",
[],
None,
)
elif "default_op_concurrency_limit" in concurrency_config:
default_concurrency_limit = check.opt_inst(
pluck_config_value(concurrency_config, ["default_op_concurrency_limit"]), int
)
if default_concurrency_limit is not None:
max_limit = get_max_concurrency_limit_value()
if default_concurrency_limit < 0 or default_concurrency_limit > max_limit:
raise DagsterInvalidConfigError(
f"Found value `{default_concurrency_limit}` for `default_op_concurrency_limit`, "
f"Expected value between 0-{max_limit}.",
[],
None,
)

using_concurrency_config = "runs" in concurrency_config or "pools" in concurrency_config
if using_concurrency_config:
conflicting_run_queue_fields = [
["max_concurrent_runs"],
["tag_concurrency_limits"],
["block_op_concurrency_limited_runs", "op_concurrency_slot_buffer"],
]
if "run_queue" in dagster_config_dict:
for field in conflicting_run_queue_fields:
if pluck_config_value(dagster_config_dict, ["run_queue", *field]) is not None:
raise DagsterInvalidConfigError(
f"Found config value for `{field}` in `run_queue` which is incompatible with the `concurrency > runs` config",
[],
None,
)

if "run_coordinator" in dagster_config_dict:
if (
pluck_config_value(dagster_config_dict, ["run_coordinator", "class"])
== "QueuedRunCoordinator"
):
for field in conflicting_run_queue_fields:
if (
pluck_config_value(
dagster_config_dict, ["run_coordinator", "config", *field]
)
is not None
):
raise DagsterInvalidConfigError(
f"Found config value for `{field}` in `run_coordinator` which is incompatible with the `concurrency > runs` config",
[],
None,
)


def pluck_config_value(config: Mapping[str, Any], path: Sequence[str]):
value = config
for part in path:
if not isinstance(value, dict):
return None

value = value.get(part)
if value is None:
return value

return value


def storage_config_schema() -> Field:
return Field(
Selector(
Expand Down Expand Up @@ -345,6 +421,83 @@ def secrets_loader_config_schema() -> Field:
)


def get_concurrency_config() -> Field:
return Field(
{
"pools": Field(
{
"default_limit": Field(
int,
is_required=False,
description="The default maximum number of concurrent operations for an unconfigured pool",
),
"granularity": Field(
str,
is_required=False,
description="The granularity of the concurrency enforcement of the pool. One of `run` or `op`.",
),
"op_run_buffer": Field(
int,
is_required=False,
description=(
"When the pool scope is set to `op`, this determines the number of runs "
"that can be launched with all of its steps blocked waiting for pool slots "
"to be freed."
),
),
}
),
"runs": Field(
{
"max_concurrent_runs": Field(
int,
is_required=False,
description=(
"The maximum number of runs that are allowed to be in progress at once."
" Defaults to 10. Set to -1 to disable the limit. Set to 0 to stop any runs"
" from launching. Any other negative values are disallowed."
),
),
"tag_concurrency_limits": Field(
config=Noneable(
Array(
Shape(
{
"key": String,
"value": Field(
ScalarUnion(
scalar_type=String,
non_scalar_schema=Shape(
{"applyLimitPerUniqueValue": Bool}
),
),
is_required=False,
),
"limit": Field(int),
}
)
)
),
is_required=False,
description=(
"A set of limits that are applied to runs with particular tags. If a value is"
" set, the limit is applied to only that key-value pair. If no value is set,"
" the limit is applied across all values of that key. If the value is set to a"
" dict with `applyLimitPerUniqueValue: true`, the limit will apply to the"
" number of unique values for that key."
),
),
}
),
"default_op_concurrency_limit": Field(
int,
is_required=False,
description="[Deprecated] The default maximum number of concurrent operations for an unconfigured concurrency key",
),
}
)


def dagster_instance_config_schema() -> Mapping[str, Field]:
return {
"local_artifact_storage": config_field_for_configurable_class(),
Expand Down Expand Up @@ -432,13 +585,5 @@ def dagster_instance_config_schema() -> Mapping[str, Field]:
),
}
),
"concurrency": Field(
{
"default_op_concurrency_limit": Field(
int,
is_required=False,
description="The default maximum number of concurrent operations for an unconfigured concurrency key",
),
}
),
"concurrency": get_concurrency_config(),
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ def __new__(
check.int_param(op_concurrency_slot_buffer, "op_concurrency_slot_buffer"),
)

def with_concurrency_settings(
self, concurrency_settings: Mapping[str, Any]
) -> "RunQueueConfig":
run_settings = concurrency_settings.get("runs", {})
pool_settings = concurrency_settings.get("pools", {})
return RunQueueConfig(
max_concurrent_runs=run_settings.get("max_concurrent_runs", self.max_concurrent_runs),
tag_concurrency_limits=run_settings.get(
"tag_concurrency_limits", self.tag_concurrency_limits
),
max_user_code_failure_retries=self.max_user_code_failure_retries,
user_code_failure_retry_delay=self.user_code_failure_retry_delay,
should_block_op_concurrency_limited_runs=self.should_block_op_concurrency_limited_runs,
op_concurrency_slot_buffer=pool_settings.get(
"op_run_buffer", self.op_concurrency_slot_buffer
),
)


class QueuedRunCoordinator(RunCoordinator[T_DagsterInstance], ConfigurableClass):
"""Enqueues runs via the run storage, to be deqeueued by the Dagster Daemon process. Requires
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
concurrency:
runs:
max_concurrent_runs: 5
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
concurrency:
runs:
max_concurrent_runs: 5
run_queue:
max_concurrent_runs: 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
concurrency:
pools:
granularity: op
op_run_buffer: 1
runs:
max_concurrent_runs: 5
tag_concurrency_limits:
- key: "dagster/solid_selection"
limit: 2
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_user_code_failure_retries: 3
user_code_failure_retry_delay: 10
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
concurrency:
pools:
granularity: op
op_run_buffer: 1
runs:
max_concurrent_runs: 5
tag_concurrency_limits:
- key: "dagster/solid_selection"
limit: 2
run_queue:
max_user_code_failure_retries: 3
user_code_failure_retry_delay: 10
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from dagster import file_relative_path
from dagster._core.errors import DagsterInvalidConfigError
from dagster._core.instance.config import dagster_instance_config
from dagster._core.test_utils import environ
from dagster._core.test_utils import environ, instance_for_test


@pytest.mark.parametrize("config_filename", ("dagster.yaml", "something.yaml"))
Expand All @@ -10,3 +11,43 @@ def test_instance_yaml_config_not_set(config_filename, caplog):
with environ({"DAGSTER_HOME": base_dir}):
dagster_instance_config(base_dir, config_filename)
assert "No dagster instance configuration file" in caplog.text


@pytest.mark.parametrize(
"config_filename",
(
"merged_run_coordinator_concurrency.yaml",
"merged_run_queue_concurrency.yaml",
),
)
def test_concurrency_config(config_filename, caplog):
base_dir = file_relative_path(__file__, "./test_config")
with environ({"DAGSTER_HOME": base_dir}):
instance_config, _ = dagster_instance_config(base_dir, config_filename)
with instance_for_test(overrides=instance_config) as instance:
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 5
assert run_queue_config.tag_concurrency_limits == [
{
"key": "dagster/solid_selection",
"limit": 2,
}
]
assert run_queue_config.max_user_code_failure_retries == 3
assert run_queue_config.user_code_failure_retry_delay == 10
assert run_queue_config.op_concurrency_slot_buffer == 1


@pytest.mark.parametrize(
"config_filename",
(
"error_run_coordinator_concurrency_mismatch.yaml",
"error_run_queue_concurrency_mismatch.yaml",
),
)
def test_concurrency_config_mismatch(config_filename, caplog):
base_dir = file_relative_path(__file__, "./test_config")
with environ({"DAGSTER_HOME": base_dir}):
with pytest.raises(DagsterInvalidConfigError, match="the `concurrency > "):
dagster_instance_config(base_dir, config_filename)

0 comments on commit 9f65155

Please sign in to comment.