Skip to content

Commit

Permalink
Merge branch 'add-ssh-test-2' of github.com:ltalirz/aiida-core into a…
Browse files Browse the repository at this point in the history
…dd-ssh-test-2
  • Loading branch information
ltalirz committed Feb 10, 2021
2 parents 1e6a510 + cf94ed4 commit e82704d
Show file tree
Hide file tree
Showing 46 changed files with 546 additions and 418 deletions.
81 changes: 52 additions & 29 deletions .molecule/default/files/polish/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Command line interface to dynamically create and run a WorkChain that can evaluate a reversed polish expression."""
import importlib
import sys
import time

import click

Expand Down Expand Up @@ -71,8 +74,16 @@
default=False,
help='Only evaluate the expression and generate the workchain but do not launch it'
)
@click.option(
'-r',
'--retries',
type=click.INT,
default=1,
show_default=True,
help='Number of retries for running via the daemon'
)
@decorators.with_dbenv()
def launch(expression, code, use_calculations, use_calcfunctions, sleep, timeout, modulo, dry_run, daemon):
def launch(expression, code, use_calculations, use_calcfunctions, sleep, timeout, modulo, dry_run, daemon, retries):
"""
Evaluate the expression in Reverse Polish Notation in both a normal way and by procedurally generating
a workchain that encodes the sequence of operators and gets the stack of operands as an input. Multiplications
Expand All @@ -96,11 +107,8 @@ def launch(expression, code, use_calculations, use_calcfunctions, sleep, timeout
If no expression is specified, a random one will be generated that adheres to these rules
"""
# pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches
import importlib
import sys
import time
from aiida.orm import Code, Int, Str
from aiida.engine import run_get_node, submit
from aiida.engine import run_get_node

lib_expression = importlib.import_module('lib.expression')
lib_workchain = importlib.import_module('lib.workchain')
Expand Down Expand Up @@ -138,32 +146,15 @@ def launch(expression, code, use_calculations, use_calcfunctions, sleep, timeout
inputs['code'] = code

if daemon:
workchain = submit(workchains.Polish00WorkChain, **inputs)
start_time = time.time()
timed_out = True

while time.time() - start_time < timeout:
time.sleep(sleep)

if workchain.is_terminated:
timed_out = False
total_time = time.time() - start_time
# the daemon tests have been known to fail on Jenkins, when the result node cannot be found
# to mitigate this, we can retry multiple times
for _ in range(retries):
output = run_via_daemon(workchains, inputs, sleep, timeout)
if output is not None:
break

if timed_out:
click.secho('Failed: ', fg='red', bold=True, nl=False)
click.secho(
f'the workchain<{workchain.pk}> did not finish in time and the operation timed out', bold=True
)
sys.exit(1)

try:
result = workchain.outputs.result
except AttributeError:
click.secho('Failed: ', fg='red', bold=True, nl=False)
click.secho(f'the workchain<{workchain.pk}> did not return a result output node', bold=True)
click.echo(str(workchain.attributes))
if output is None:
sys.exit(1)
result, workchain, total_time = output

else:
start_time = time.time()
Expand All @@ -186,5 +177,37 @@ def launch(expression, code, use_calculations, use_calcfunctions, sleep, timeout
sys.exit(0)


def run_via_daemon(workchains, inputs, sleep, timeout):
"""Run via the daemon, polling until it is terminated or timeout."""
from aiida.engine import submit

workchain = submit(workchains.Polish00WorkChain, **inputs)
start_time = time.time()
timed_out = True

while time.time() - start_time < timeout:
time.sleep(sleep)

if workchain.is_terminated:
timed_out = False
total_time = time.time() - start_time
break

if timed_out:
click.secho('Failed: ', fg='red', bold=True, nl=False)
click.secho(f'the workchain<{workchain.pk}> did not finish in time and the operation timed out', bold=True)
return None

try:
result = workchain.outputs.result
except AttributeError:
click.secho('Failed: ', fg='red', bold=True, nl=False)
click.secho(f'the workchain<{workchain.pk}> did not return a result output node', bold=True)
click.echo(str(workchain.attributes))
return None

return result, workchain, total_time


if __name__ == '__main__':
launch() # pylint: disable=no-value-for-parameter
2 changes: 1 addition & 1 deletion .molecule/default/test_polish_workchains.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
set -e
declare -a EXPRESSIONS=({{ polish_expressions | map('quote') | join(' ') }})
for expression in "${EXPRESSIONS[@]}"; do
{{ venv_bin }}/verdi -p {{ aiida_backend }} run --auto-group -l polish -- "{{ polish_script }}" -X add! -C -F -d -t {{ polish_timeout }} "$expression"
{{ venv_bin }}/verdi -p {{ aiida_backend }} run --auto-group -l polish -- "{{ polish_script }}" -X add! -C -F -d -t {{ polish_timeout }} -r 2 "$expression"
done
args:
executable: /bin/bash
Expand Down
2 changes: 1 addition & 1 deletion aiida/calculations/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def validate_transfer_inputs(inputs, _):

for node_label, node_object in source_nodes.items():
if isinstance(node_object, orm.RemoteData):
if computer.name != node_object.computer.name:
if computer.label != node_object.computer.label:
error_message = (
f' > remote node `{node_label}` points to computer `{node_object.computer}`, '
f'not the one being used (`{computer}`)'
Expand Down
39 changes: 31 additions & 8 deletions aiida/cmdline/commands/cmd_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def start(foreground, number):
If the NUMBER of desired workers is not specified, the default is used, which is determined by the configuration
option `daemon.default_workers`, which if not explicitly changed defaults to 1.
Returns exit code 0 if the daemon is OK, non-zero if there was an error.
"""
from aiida.engine.daemon.client import get_daemon_client

Expand All @@ -78,7 +80,9 @@ def start(foreground, number):
time.sleep(1)
response = client.get_status()

print_client_response_status(response)
retcode = print_client_response_status(response)
if retcode:
sys.exit(retcode)


@verdi_daemon.command()
Expand Down Expand Up @@ -115,24 +119,34 @@ def status(all_profiles):
@click.argument('number', default=1, type=int)
@decorators.only_if_daemon_running()
def incr(number):
"""Add NUMBER [default=1] workers to the running daemon."""
"""Add NUMBER [default=1] workers to the running daemon.
Returns exit code 0 if the daemon is OK, non-zero if there was an error.
"""
from aiida.engine.daemon.client import get_daemon_client

client = get_daemon_client()
response = client.increase_workers(number)
print_client_response_status(response)
retcode = print_client_response_status(response)
if retcode:
sys.exit(retcode)


@verdi_daemon.command()
@click.argument('number', default=1, type=int)
@decorators.only_if_daemon_running()
def decr(number):
"""Remove NUMBER [default=1] workers from the running daemon."""
"""Remove NUMBER [default=1] workers from the running daemon.
Returns exit code 0 if the daemon is OK, non-zero if there was an error.
"""
from aiida.engine.daemon.client import get_daemon_client

client = get_daemon_client()
response = client.decrease_workers(number)
print_client_response_status(response)
retcode = print_client_response_status(response)
if retcode:
sys.exit(retcode)


@verdi_daemon.command()
Expand All @@ -154,7 +168,10 @@ def logshow():
@click.option('--no-wait', is_flag=True, help='Do not wait for confirmation.')
@click.option('--all', 'all_profiles', is_flag=True, help='Stop all daemons.')
def stop(no_wait, all_profiles):
"""Stop the daemon."""
"""Stop the daemon.
Returns exit code 0 if the daemon was shut down successfully (or was not running), non-zero if there was an error.
"""
from aiida.engine.daemon.client import get_daemon_client

config = get_config()
Expand Down Expand Up @@ -190,7 +207,9 @@ def stop(no_wait, all_profiles):
if response['status'] == client.DAEMON_ERROR_NOT_RUNNING:
click.echo('The daemon was not running.')
else:
print_client_response_status(response)
retcode = print_client_response_status(response)
if retcode:
sys.exit(retcode)


@verdi_daemon.command()
Expand All @@ -205,6 +224,8 @@ def restart(ctx, reset, no_wait):
By default will only reset the workers of the running daemon. After the restart the same amount of workers will be
running. If the `--reset` flag is passed, however, the full daemon will be stopped and restarted with the default
number of workers that is started when calling `verdi daemon start` manually.
Returns exit code 0 if the result is OK, non-zero if there was an error.
"""
from aiida.engine.daemon.client import get_daemon_client

Expand All @@ -230,7 +251,9 @@ def restart(ctx, reset, no_wait):
response = client.restart_daemon(wait)

if wait:
print_client_response_status(response)
retcode = print_client_response_status(response)
if retcode:
sys.exit(retcode)


@verdi_daemon.command(hidden=True)
Expand Down
2 changes: 1 addition & 1 deletion aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def _print(communicator, body, sender, subject, correlation_id): # pylint: disa
echo.echo('') # add a new line after the interrupt character
echo.echo_info('received interrupt, exiting...')
try:
communicator.stop()
communicator.close()
except RuntimeError:
pass

Expand Down
2 changes: 1 addition & 1 deletion aiida/cmdline/commands/cmd_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def verdi_status(print_traceback, no_rmq):
with Capturing(capture_stderr=True):
with override_log_level(): # temporarily suppress noisy logging
comm = manager.create_communicator(with_orm=False)
comm.stop()
comm.close()
except Exception as exc:
message = f'Unable to connect to rabbitmq with URL: {profile.get_rmq_url()}'
print_status(ServiceStatus.ERROR, 'rabbitmq', message, exception=exc, print_traceback=print_traceback)
Expand Down
18 changes: 12 additions & 6 deletions aiida/cmdline/utils/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@ def print_client_response_status(response):
Print the response status of a call to the CircusClient through the DaemonClient
:param response: the response object
:return: an integer error code; non-zero means there was an error (FAILED, TIMEOUT), zero means OK (OK, RUNNING)
"""
from aiida.engine.daemon.client import DaemonClient

if 'status' not in response:
return
return 1

if response['status'] == 'active':
click.secho('RUNNING', fg='green', bold=True)
elif response['status'] == 'ok':
return 0
if response['status'] == 'ok':
click.secho('OK', fg='green', bold=True)
elif response['status'] == DaemonClient.DAEMON_ERROR_NOT_RUNNING:
return 0
if response['status'] == DaemonClient.DAEMON_ERROR_NOT_RUNNING:
click.secho('FAILED', fg='red', bold=True)
click.echo('Try to run \'verdi daemon start --foreground\' to potentially see the exception')
elif response['status'] == DaemonClient.DAEMON_ERROR_TIMEOUT:
return 2
if response['status'] == DaemonClient.DAEMON_ERROR_TIMEOUT:
click.secho('TIMEOUT', fg='red', bold=True)
else:
click.echo(response['status'])
return 3
# Unknown status, I will consider it as failed
click.echo(response['status'])
return -1


def get_daemon_status(client):
Expand Down
8 changes: 6 additions & 2 deletions aiida/engine/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ def get_circus_socket_directory(self) -> str:
"""
if self.is_daemon_running:
try:
return open(self.circus_socket_file, 'r', encoding='utf8').read().strip()
with open(self.circus_socket_file, 'r', encoding='utf8') as fhandle:
content = fhandle.read().strip()
return content
except (ValueError, IOError):
raise RuntimeError('daemon is running so sockets file should have been there but could not read it')
else:
Expand All @@ -208,7 +210,9 @@ def get_daemon_pid(self) -> Optional[int]:
"""
if os.path.isfile(self.circus_pid_file):
try:
return int(open(self.circus_pid_file, 'r', encoding='utf8').read().strip())
with open(self.circus_pid_file, 'r', encoding='utf8') as fhandle:
content = fhandle.read().strip()
return int(content)
except (ValueError, IOError):
return None
else:
Expand Down
35 changes: 13 additions & 22 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ async def do_upload():

try:
logger.info(f'scheduled request to upload CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException)
ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption)
skip_submit = await exponential_backoff_retry(
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except PreSubmitException:
raise
except plumpy.futures.CancelledError:
pass
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
raise
except Exception:
logger.warning(f'uploading CalcJob<{node.pk}> failed')
raise TransportTaskException(f'upload_calculation failed {max_attempts} times consecutively')
Expand Down Expand Up @@ -139,15 +139,12 @@ async def do_submit():

try:
logger.info(f'scheduled request to submit CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
do_submit,
initial_interval,
max_attempts,
logger=node.logger,
ignore_exceptions=plumpy.process_states.Interruption
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except plumpy.process_states.Interruption:
pass
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): # pylint: disable=try-except-raise
raise
except Exception:
logger.warning(f'submitting CalcJob<{node.pk}> failed')
raise TransportTaskException(f'submit_calculation failed {max_attempts} times consecutively')
Expand Down Expand Up @@ -201,14 +198,11 @@ async def do_update():

try:
logger.info(f'scheduled request to update CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
job_done = await exponential_backoff_retry(
do_update,
initial_interval,
max_attempts,
logger=node.logger,
ignore_exceptions=plumpy.process_states.Interruption
do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except plumpy.process_states.Interruption:
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): # pylint: disable=try-except-raise
raise
except Exception:
logger.warning(f'updating CalcJob<{node.pk}> failed')
Expand Down Expand Up @@ -270,14 +264,11 @@ async def do_retrieve():

try:
logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
do_retrieve,
initial_interval,
max_attempts,
logger=node.logger,
ignore_exceptions=plumpy.process_states.Interruption
do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except plumpy.process_states.Interruption:
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): # pylint: disable=try-except-raise
raise
except Exception:
logger.warning(f'retrieving CalcJob<{node.pk}> failed')
Expand Down
Loading

0 comments on commit e82704d

Please sign in to comment.