Skip to content

Commit

Permalink
Simplipy create_runner function signature
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 21, 2024
1 parent b1f446a commit 36f1a86
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/aiida/engine/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(
), 'Must supply a persister if you want to submit using communicator'

set_event_loop_policy()
self._loop = loop if loop is not None else asyncio.get_event_loop()
self._loop = loop or asyncio.get_event_loop()
self._poll_interval = poll_interval
self._broker_submit = broker_submit
self._transport = transports.TransportQueue(self._loop)
Expand Down
48 changes: 25 additions & 23 deletions src/aiida/manage/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

from typing import TYPE_CHECKING, Any, Optional, Union

if TYPE_CHECKING:
import asyncio
import asyncio
import kiwipy

if TYPE_CHECKING:
from kiwipy.rmq import RmqThreadCommunicator
from plumpy.process_comms import RemoteProcessThreadController

Expand Down Expand Up @@ -381,7 +382,6 @@ def get_runner(self, **kwargs) -> 'Runner':
"""Return a runner that is based on the current profile settings and can be used globally by the code.
:return: the global runner
"""
if self._runner is None:
self._runner = self.create_runner(**kwargs)
Expand All @@ -392,20 +392,25 @@ def set_runner(self, new_runner: 'Runner') -> None:
"""Set the currently used runner
:param new_runner: the new runner to use
"""
if self._runner is not None:
self._runner.close()

self._runner = new_runner

def create_runner(self, with_persistence: bool = True, **kwargs: Any) -> 'Runner':
"""Create and return a new runner
def create_runner(
self,
poll_interval: Union[int, float] | None = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
communicator: Optional[kiwipy.Communicator] = None,
broker_submit: bool = False,
persister: Optional[AiiDAPersister] = None,
) -> 'Runner':
"""Create and return a new runner, with default settings from profile.
:param with_persistence: create a runner with persistence enabled
:return: a new runner instance
"""
from aiida.common import ConfigurationError
from aiida.engine import runners
Expand All @@ -415,23 +420,20 @@ def create_runner(self, with_persistence: bool = True, **kwargs: Any) -> 'Runner
raise ConfigurationError(
'Could not determine the current profile. Consider loading a profile using `aiida.load_profile()`.'
)
poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval')

settings = {'broker_submit': False, 'poll_interval': poll_interval}
settings.update(kwargs)

if 'communicator' not in settings:
# Only call get_communicator if we have to as it will lazily create
try:
settings['communicator'] = self.get_communicator()
except ConfigurationError:
# The currently loaded profile does not define a broker and so there is no communicator
pass

if with_persistence and 'persister' not in settings:
settings['persister'] = self.get_persister()

return runners.Runner(**settings) # type: ignore[arg-type]
_default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval')
_default_broker_submit = False
_default_communicator = self.get_communicator()
_default_persister = self.get_persister()

runner = runners.Runner(
poll_interval=poll_interval or _default_poll_interval,
loop=loop or asyncio.get_event_loop(),
communicator=communicator or _default_communicator,
broker_submit=broker_submit or _default_broker_submit,
persister=persister or _default_persister,
)
return runner

def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner':
"""Create and return a new daemon runner.
Expand Down
2 changes: 1 addition & 1 deletion tests/engine/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_calculation_future_broadcasts(self):

# No polling
future = processes.futures.ProcessFuture(
pk=process.pid, loop=runner.loop, communicator=manager.get_communicator()
pk=process.pid, loop=runner.loop, communicator=manager.get_coordinator()
)

run(process)
Expand Down

0 comments on commit 36f1a86

Please sign in to comment.