Skip to content

Commit

Permalink
Clean up and improve error handling (#86)
Browse files Browse the repository at this point in the history
* introduce `variables` arg; check covalent version in script; fix status error message

* new error-catching exec script

* clean up checks; add check cloudpickle version

* move job script creation to new module

* use %s formatting for app_log messages

* clean up tuple inside RuntimeError

* swap all os.path with pathlib

* clean up and improve error msgs

* recover logs without result; better defaults; cleaner init

* clean up docstrings and signatures

* allow poll_freq >=10 s

* separate file copying/execution logic

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update changelog

* bump min covalent version

* edit error messages for consistency

* allow min poll_freq of 5 seconds

* bump min covalent version in test reqs

* allow bashrc_path=''; add todos

* fix tests vis a vis updates

* wrap import covalent for convenience

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update covalent version spec

* clean up unused imports

* basic tests of exec script help messages

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* tests for exec script

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* string not None defaults so visible in config

* remove todo comment

* manually revert to re-add deleted line

* update copyright dates

* Update tests/requirements.txt

Co-authored-by: Sankalp Sanand <[email protected]>

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Sankalp Sanand <[email protected]>
  • Loading branch information
3 people authored Jan 19, 2024
1 parent f5085fb commit acfc10f
Show file tree
Hide file tree
Showing 8 changed files with 895 additions and 413 deletions.
14 changes: 12 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED]

## Added
### Added

- add a new `variables` parameter for environment variables
- add a new error-catching python execution script (add new module)
- add checks inside submit script for `covalent` and `cloudpickle` versions
- clean up job script creation (add new module)
- export `COVALENT_CONFIG_DIR=/tmp` inside sbatch script to enable filelock

## Changed
### Changed

- update plugin defaults to use `BaseModel` instead of `dict`
- change to actually get errors from these checks
- use `Path` everywhere instead of `os.path` operations
- allow `poll_freq >= 10` seconds, instead of 60 seconds
- misc. cleanups and refactoring
- Aesthetics and string formatting
- remove addition of `COVALENT_CONFIG_DIR=/tmp` to sbatch script
- Removed the `sshproxy` interface.
- Updates __init__ signature kwargs replaced with parent for better documentation.
- Updated license to Apache
Expand Down
115 changes: 115 additions & 0 deletions covalent_slurm_plugin/exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2024 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This script executes the electron function on the Slurm cluster."""

import os
import sys

import cloudpickle as pickle


def _import_covalent() -> None:
# Wrapped import for convenience in testing.
import covalent


def _check_setup() -> None:
"""Use these checks to create more informative error messages."""

import filelock

msg = ""
exception = None

try:
# covalent is needed because the @electron function
# executes inside `wrapper_fn` to apply deps
_import_covalent()

except ImportError as _exception:
msg = "The covalent SDK is not installed in the Slurm job environment."
exception = _exception

except filelock._error.Timeout as _exception:
config_location = os.getenv("COVALENT_CONFIG_DIR", "~/.config/covalent")
config_location = os.path.expanduser(config_location)
config_file = os.path.join(config_location, "covalent.conf")

msg = "\n".join(
[
f"Failed to acquire file lock '{config_file}.lock' on Slurm cluster filesystem. "
f"Consider overriding the current config location ('{config_location}'), e.g:",
' SlurmExecutor(..., variables={"COVALENT_CONFIG_DIR": "/tmp"})' "",
]
)
exception = _exception

# Raise the exception if one was caught
if exception:
raise RuntimeError(msg) from exception


def _execute() -> dict:
"""Load and execute the @electron function"""

func_filename = sys.argv[1]

with open(func_filename, "rb") as f:
function, args, kwargs = pickle.load(f)

result = None
exception = None

try:
result = function(*args, **kwargs)
except Exception as ex:
exception = ex

return {
"result": result,
"exception": exception,
}


def main():
"""Execute the @electron function on the Slurm cluster."""

output_data = {
"result": None,
"exception": None,
"result_filename": sys.argv[2],
}
try:
_check_setup()
output_data.update(**_execute())

except Exception as ex:
output_data.update(exception=ex)

finally:
_record_output(**output_data)


def _record_output(result, exception, result_filename) -> None:
"""Record the output of the @electron function"""

with open(result_filename, "wb") as f:
pickle.dump((result, exception), f)


if __name__ == "__main__":
main()
235 changes: 235 additions & 0 deletions covalent_slurm_plugin/job_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# Copyright 2024 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tools for formatting the Slurm job submission script."""

import re
from typing import Dict, List, Optional

SLURM_JOB_SCRIPT_TEMPLATE = """\
#!/bin/bash
{sbatch_directives}
{shell_env_setup}
{conda_env_setup}
if [ $? -ne 0 ] ; then
>&2 echo "Failed to activate conda env '$__env_name' on compute node."
exit 99
fi
remote_py_version=$(python -c "print('.'.join(map(str, __import__('sys').version_info[:2])))")
if [[ $remote_py_version != "{python_version}" ]] ; then
>&2 echo "Python version mismatch."
>&2 echo "Environment '$__env_name' (python=$remote_py_version) does not match task (python={python_version})."
exit 199
fi
covalent_version=$(python -c "import covalent; print(covalent.__version__)")
if [ $? -ne 0 ] ; then
>&2 echo "Covalent may not be installed in the compute environment."
>&2 echo "Please install covalent=={covalent_version} in the '$__env_name' conda env."
exit 299
elif [[ $covalent_version != "{covalent_version}" ]] ; then
>&2 echo "Covalent version mismatch."
>&2 echo "Environment '$__env_name' (covalent==$covalent_version) does not match task (covalent=={covalent_version})."
exit 299
fi
cloudpickle_version=$(python -c "import cloudpickle; print(cloudpickle.__version__)")
if [ $? -ne 0 ] ; then
>&2 echo "Cloudpickle may not be installed in the compute environment."
>&2 echo "Please install cloudpickle=={cloudpickle_version} in the '$__env_name' conda env."
exit 399
elif [[ $cloudpickle_version != "{cloudpickle_version}" ]] ; then
>&2 echo "Cloudpickle version mismatch."
>&2 echo "Environment '$__env_name' (cloudpickle==$cloudpickle_version) does not match task (cloudpickle=={cloudpickle_version})."
exit 399
fi
{run_commands}
wait
"""


class JobScript:
"""Formats an sbatch submit script for the Slurm cluster."""

def __init__(
self,
sbatch_options: Optional[Dict[str, str]] = None,
srun_options: Optional[Dict[str, str]] = None,
variables: Optional[Dict[str, str]] = None,
bashrc_path: Optional[str] = "",
conda_env: Optional[str] = "",
prerun_commands: Optional[List[str]] = None,
srun_append: Optional[str] = "",
postrun_commands: Optional[List[str]] = None,
use_srun: bool = True,
):
"""Create a job script formatter.
Args:
See `covalent_slurm_plugin.slurm.SlurmExecutor` for details.
"""

self._sbatch_options = sbatch_options or {}
self._srun_options = srun_options or {}
self._variables = variables or {}
self._bashrc_path = bashrc_path
self._conda_env = conda_env
self._prerun_commands = prerun_commands or []
self._srun_append = srun_append
self._postrun_commands = postrun_commands or []
self._use_srun = use_srun

@property
def sbatch_directives(self) -> str:
"""Get the sbatch directives."""
directives = []
for key, value in self._sbatch_options.items():
if len(key) == 1:
directives.append(f"#SBATCH -{key}" + (f" {value}" if value else ""))
else:
directives.append(f"#SBATCH --{key}" + (f"={value}" if value else ""))

return "\n".join(directives)

@property
def shell_env_setup(self) -> str:
"""Get the shell environment setup."""
setup_lines = [
f"source {self._bashrc_path}" if self._bashrc_path else "",
]
for key, value in self._variables.items():
setup_lines.append(f'export {key}="{value}"')

return "\n".join(setup_lines)

@property
def conda_env_setup(self) -> str:
"""Get the conda environment setup."""
setup_lines = []
if not self._conda_env or self._conda_env == "base":
conda_env_name = "base"
setup_lines.append("conda activate")
else:
conda_env_name = self._conda_env
setup_lines.append(f"conda activate {self._conda_env}")

setup_lines.insert(0, f'__env_name="{conda_env_name}"')

return "\n".join(setup_lines)

@property
def covalent_version(self) -> str:
"""Get the version of Covalent installed in the compute environment."""
import covalent

return covalent.__version__

@property
def cloudpickle_version(self) -> str:
"""Get the version of cloudpickle installed in the compute environment."""
import cloudpickle

return cloudpickle.__version__

def get_run_commands(
self,
remote_py_filename: str,
func_filename: str,
result_filename: str,
) -> str:
"""Get the run commands."""

# Commands executed before the user's @electron function.
prerun_cmds = "\n".join(self._prerun_commands)

# Command that executes the user's @electron function.
python_cmd = "python {remote_py_filename} {func_filename} {result_filename}"

if not self._use_srun:
# Invoke python directly.
run_cmd = python_cmd
else:
# Invoke python via srun.
srun_options = []
for key, value in self._srun_options.items():
if len(key) == 1:
srun_options.append(f"-{key}" + (f" {value}" if value else ""))
else:
srun_options.append(f"--{key}" + (f"={value}" if value else ""))

run_cmds = [
f"srun {' '.join(srun_options)} \\" if srun_options else "srun \\",
f" {self._srun_append} \\",
f" {python_cmd}",
]
if not self._srun_append:
# Remove (empty) commands appended to `srun` call.
run_cmds.pop(1)

run_cmd = "\n".join(run_cmds)

run_cmd = run_cmd.format(
remote_py_filename=remote_py_filename,
func_filename=func_filename,
result_filename=result_filename,
)

# Commands executed after the user's @electron function.
postrun_cmds = "\n".join(self._postrun_commands)

# Combine all commands.
run_commands = [prerun_cmds, run_cmd, postrun_cmds]
run_commands = [cmd for cmd in run_commands if cmd]

return "\n\n".join(run_commands)

def format(
self,
python_version: str,
remote_py_filename: str,
func_filename: str,
result_filename: str,
) -> str:
"""Render the job script."""
template_kwargs = {
"sbatch_directives": self.sbatch_directives,
"shell_env_setup": self.shell_env_setup,
"conda_env_setup": self.conda_env_setup,
"covalent_version": self.covalent_version,
"cloudpickle_version": self.cloudpickle_version,
"python_version": python_version,
"run_commands": self.get_run_commands(
remote_py_filename=remote_py_filename,
func_filename=func_filename,
result_filename=result_filename,
),
}
existing_keys = set(template_kwargs.keys())
required_keys = set(re.findall(r"\{(\w+)\}", SLURM_JOB_SCRIPT_TEMPLATE))

if missing_keys := required_keys - existing_keys:
raise ValueError(f"Missing required keys: {', '.join(missing_keys)}")

if extra_keys := existing_keys - required_keys:
raise ValueError(f"Unexpected keys: {', '.join(extra_keys)}")

return SLURM_JOB_SCRIPT_TEMPLATE.format(**template_kwargs)
Loading

0 comments on commit acfc10f

Please sign in to comment.