Skip to content

Commit

Permalink
hoists the run queue config from the run coordinator up to the instance
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 8, 2025
1 parent 3b5969f commit 13f2c3c
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,9 @@ def resolve_runQueuingSupported(self, _graphene_info: ResolveInfo):
return isinstance(self._instance.run_coordinator, QueuedRunCoordinator)

def resolve_runQueueConfig(self, _graphene_info: ResolveInfo):
from dagster._core.run_coordinator import QueuedRunCoordinator

if isinstance(self._instance.run_coordinator, QueuedRunCoordinator):
return GrapheneRunQueueConfig(self._instance.run_coordinator.get_run_queue_config())
run_queue_config = self._instance.get_run_queue_config()
if run_queue_config:
return GrapheneRunQueueConfig(run_queue_config)
else:
return None

Expand Down
9 changes: 8 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
)
from dagster._core.remote_representation.external import RemoteSchedule
from dagster._core.run_coordinator import RunCoordinator
from dagster._core.run_coordinator.queued_run_coordinator import RunQueueConfig
from dagster._core.scheduler import Scheduler, SchedulerDebugInfo
from dagster._core.scheduler.instigation import (
InstigatorState,
Expand Down Expand Up @@ -788,7 +789,13 @@ def run_coordinator(self) -> "RunCoordinator":
self._run_coordinator.register_instance(self)
return self._run_coordinator

# run launcher
def get_run_queue_config(self) -> Optional["RunQueueConfig"]:
from dagster._core.run_coordinator.queued_run_coordinator import QueuedRunCoordinator

if not isinstance(self.run_coordinator, QueuedRunCoordinator):
return None

return self.run_coordinator.get_run_queue_config()

@property
def run_launcher(self) -> "RunLauncher":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ def run_iteration(
fixed_iteration_time: Optional[float] = None, # used for tests
) -> DaemonIterator:
run_coordinator = workspace_process_context.instance.run_coordinator
run_queue_config = workspace_process_context.instance.get_run_queue_config()
if not isinstance(run_coordinator, QueuedRunCoordinator):
check.failed(f"Expected QueuedRunCoordinator, got {run_coordinator}")

run_queue_config = run_coordinator.get_run_queue_config()
if not run_queue_config:
check.failed("Got invalid run queue config")

instance = workspace_process_context.instance
runs_to_dequeue = self._get_runs_to_dequeue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def test_run_queue_key():

with instance_for_test(overrides={"run_queue": config}) as instance:
assert isinstance(instance.run_coordinator, QueuedRunCoordinator)
run_queue_config = instance.run_coordinator.get_run_queue_config()
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 50
assert run_queue_config.tag_concurrency_limits == tag_rules

Expand All @@ -188,7 +189,8 @@ def test_run_queue_key():
}
) as instance:
assert isinstance(instance.run_coordinator, QueuedRunCoordinator)
run_queue_config = instance.run_coordinator.get_run_queue_config()
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 50
assert run_queue_config.tag_concurrency_limits == tag_rules

Expand Down Expand Up @@ -226,7 +228,8 @@ def test_run_coordinator_key():
overrides={"run_queue": {"max_concurrent_runs": 50, "tag_concurrency_limits": tag_rules}}
) as instance:
assert isinstance(instance.run_coordinator, QueuedRunCoordinator)
run_queue_config = instance.run_coordinator.get_run_queue_config()
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 50
assert run_queue_config.tag_concurrency_limits == tag_rules

Expand Down

0 comments on commit 13f2c3c

Please sign in to comment.