Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job expiration dates #1983

Merged
merged 8 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions containers/teuthology-dev/teuthology.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ if [ "$TEUTHOLOGY_SUITE" != "none" ]; then
--filter-out "libcephfs,kclient" \
--force-priority \
--seed 349 \
${TEUTHOLOGY_SUITE_EXTRA_ARGS} \
$TEUTHOLOGY_CONF
DISPATCHER_EXIT_FLAG='--exit-on-empty-queue'
teuthology-queue -m $TEUTHOLOGY_MACHINE_TYPE -s | \
Expand Down
8 changes: 6 additions & 2 deletions docs/siteconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ Here is a sample configuration with many of the options set and documented::
# itself from git. This is disabled by default.
automated_scheduling: false

# How often, in seconds, teuthology-worker should poll its child job
# How often, in seconds, teuthology-supervisor should poll its child job
# processes
watchdog_interval: 120

# How old a scheduled job can be, in seconds, before the dispatcher
# considers it 'expired', skipping it.
max_job_age: 1209600

# How long a scheduled job should be allowed to run, in seconds, before
# it is killed by the worker process.
# it is killed by the supervisor process.
max_job_time: 259200

# The template from which the URL of the repository containing packages
Expand Down
4 changes: 4 additions & 0 deletions scripts/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
When tests finish or time out, send an email
here. May also be specified in ~/.teuthology.yaml
as 'results_email'
--expire <datetime> Do not execute jobs in the run if they have not
completed by this time. Valid formats include
ISO 8601, and relative offsets like '90s', '30m',
'1h', '3d', or '1w'
--rocketchat <rocketchat> Comma separated list of Rocket.Chat channels where
to send a message when tests finished or time out.
To be used with --sleep-before-teardown option.
Expand Down
8 changes: 5 additions & 3 deletions teuthology/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ def __init__(self, yaml_path=None):
self._conf = dict()

def load(self, conf=None):
if conf:
if conf is not None:
if isinstance(conf, dict):
self._conf = conf
else:
return
elif conf:
self._conf = yaml.safe_load(conf)
return
return
kshtsk marked this conversation as resolved.
Show resolved Hide resolved
if os.path.exists(self.yaml_path):
with open(self.yaml_path) as f:
self._conf = yaml.safe_load(f)
Expand Down Expand Up @@ -157,6 +158,7 @@ class TeuthologyConfig(YamlConfig):
'job_threshold': 500,
'lab_domain': 'front.sepia.ceph.com',
'lock_server': 'http://paddles.front.sepia.ceph.com/',
'max_job_age': 1209600, # 2 weeks
kshtsk marked this conversation as resolved.
Show resolved Hide resolved
'max_job_time': 259200, # 3 days
'nsupdate_url': 'http://nsupdate.front.sepia.ceph.com/update',
'results_server': 'http://paddles.front.sepia.ceph.com/',
Expand Down
28 changes: 28 additions & 0 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from teuthology.dispatcher import supervisor
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
from teuthology.util.time import parse_timestamp
from teuthology import safepath

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -234,6 +235,8 @@ def match(proc):

def prep_job(job_config, log_file_path, archive_dir):
job_id = job_config['job_id']
check_job_expiration(job_config)

safe_archive = safepath.munge(job_config['name'])
job_config['worker_log'] = log_file_path
archive_path_full = os.path.join(
Expand Down Expand Up @@ -308,6 +311,31 @@ def prep_job(job_config, log_file_path, archive_dir):
return job_config, teuth_bin_path


def check_job_expiration(job_config):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, I'd like to see inline docs for the method.
Second, since this method is used both in dispatcher and supervisor maybe it is the best to create a separate module for job, and move all related methods there, for example, some of them can be taken from schedule.

job_id = job_config['job_id']
expired = False
now = datetime.datetime.now(datetime.timezone.utc)
if expire_str := job_config.get('timestamp'):
expire = parse_timestamp(expire_str) + \
datetime.timedelta(seconds=teuth_config.max_job_age)
expired = expire < now
if not expired and (expire_str := job_config.get('expire')):
try:
expire = parse_timestamp(expire_str)
expired = expired or expire < now
except ValueError:
log.warning(f"Failed to parse job expiration: {expire_str=}")
pass
if expired:
log.info(f"Skipping job {job_id} because it is expired: {expire_str} is in the past")
report.try_push_job_info(
job_config,
# TODO: Add a 'canceled' status to paddles, and use that.
dict(status='dead'),
)
raise SkipJob()


def lock_machines(job_config):
report.try_push_job_info(job_config, dict(status='running'))
fake_ctx = supervisor.create_fake_context(job_config, block=True)
Expand Down
29 changes: 15 additions & 14 deletions teuthology/dispatcher/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from urllib.parse import urljoin

from teuthology import exporter, kill, report, safepath
from teuthology import exporter, dispatcher, kill, report, safepath
from teuthology.config import config as teuth_config
from teuthology.exceptions import SkipJob, MaxWhileTries
from teuthology import setup_log_file, install_except_hook
Expand Down Expand Up @@ -37,6 +37,10 @@ def main(args):
f"supervisor.{job_config['job_id']}.log")
setup_log_file(log_file_path)
install_except_hook()
try:
dispatcher.check_job_expiration(job_config)
except SkipJob:
return 0

# reimage target machines before running the job
if 'targets' in job_config:
Expand All @@ -54,25 +58,22 @@ def main(args):
with open(args.job_config, 'w') as f:
yaml.safe_dump(job_config, f, default_flow_style=False)

try:
suite = job_config.get("suite")
if suite:
with exporter.JobTime().time(suite=suite):
return run_job(
job_config,
args.bin_path,
args.archive_dir,
args.verbose
)
else:
suite = job_config.get("suite")
if suite:
with exporter.JobTime().time(suite=suite):
return run_job(
job_config,
args.bin_path,
args.archive_dir,
args.verbose
)
except SkipJob:
return 0
else:
return run_job(
job_config,
args.bin_path,
args.archive_dir,
args.verbose
)


def run_job(job_config, teuth_bin_path, archive_dir, verbose):
Expand Down
29 changes: 29 additions & 0 deletions teuthology/dispatcher/test/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from teuthology import dispatcher
from teuthology.config import FakeNamespace
from teuthology.contextutil import MaxWhileTries
from teuthology.util.time import TIMESTAMP_FMT


class TestDispatcher(object):
Expand Down Expand Up @@ -172,3 +173,31 @@ def test_main_loop_13925(
for i in range(len(jobs)):
push_call = m_try_push_job_info.call_args_list[i]
assert push_call[0][1]['status'] == 'dead'

@pytest.mark.parametrize(
["timestamp", "expire", "skip"],
[
[datetime.timedelta(days=-1), None, False],
[datetime.timedelta(days=-30), None, True],
[None, datetime.timedelta(days=1), False],
[None, datetime.timedelta(days=-1), True],
[datetime.timedelta(days=-1), datetime.timedelta(days=1), False],
[datetime.timedelta(days=1), datetime.timedelta(days=-1), True],
]
)
@patch("teuthology.dispatcher.report.try_push_job_info")
def test_check_job_expiration(self, _, timestamp, expire, skip):
now = datetime.datetime.now(datetime.timezone.utc)
job_config = dict(
job_id="1",
name="job_name",
)
if timestamp:
job_config["timestamp"] = (now + timestamp).strftime(TIMESTAMP_FMT)
if expire:
job_config["expire"] = (now + expire).strftime(TIMESTAMP_FMT)
if skip:
with pytest.raises(dispatcher.SkipJob):
dispatcher.check_job_expiration(job_config)
else:
dispatcher.check_job_expiration(job_config)
3 changes: 3 additions & 0 deletions teuthology/suite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def process_args(args):
elif key == 'subset' and value is not None:
# take input string '2/3' and turn into (2, 3)
value = tuple(map(int, value.split('/')))
elif key == 'expire' and value is None:
# Skip empty 'expire' values
continue
elif key in ('filter_all', 'filter_in', 'filter_out', 'rerun_statuses'):
if not value:
value = []
Expand Down
1 change: 1 addition & 0 deletions teuthology/suite/placeholder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def _substitute(input_dict, values_dict):
# Template for the config that becomes the base for each generated job config
dict_templ = {
'branch': Placeholder('ceph_branch'),
'expire': Placeholder('expire'),
'sha1': Placeholder('ceph_hash'),
'teuthology_branch': Placeholder('teuthology_branch'),
'teuthology_sha1': Placeholder('teuthology_sha1'),
Expand Down
34 changes: 32 additions & 2 deletions teuthology/suite/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import datetime
import logging
import os
import pwd
Expand All @@ -8,7 +9,6 @@

from humanfriendly import format_timespan

from datetime import datetime
from tempfile import NamedTemporaryFile
from teuthology import repo_utils

Expand All @@ -24,6 +24,7 @@
from teuthology.suite.merge import config_merge
from teuthology.suite.build_matrix import build_matrix
from teuthology.suite.placeholder import substitute_placeholders, dict_templ
from teuthology.util.time import parse_offset, parse_timestamp, TIMESTAMP_FMT

log = logging.getLogger(__name__)

Expand All @@ -43,7 +44,7 @@ def __init__(self, args):
self.args = args
# We assume timestamp is a datetime.datetime object
self.timestamp = self.args.timestamp or \
datetime.now().strftime('%Y-%m-%d_%H:%M:%S')
datetime.datetime.now().strftime(TIMESTAMP_FMT)
self.user = self.args.user or pwd.getpwuid(os.getuid()).pw_name

self.name = self.make_run_name()
Expand Down Expand Up @@ -86,6 +87,15 @@ def create_initial_config(self):

:returns: A JobConfig object
"""
now = datetime.datetime.now(datetime.timezone.utc)
expires = self.get_expiration()
if expires:
if now > expires:
util.schedule_fail(
f"Refusing to schedule because the expiration date is in the past: {self.args.expire}",
dry_run=self.args.dry_run,
)

self.os = self.choose_os()
self.kernel_dict = self.choose_kernel()
ceph_hash = self.choose_ceph_hash()
Expand Down Expand Up @@ -122,9 +132,29 @@ def create_initial_config(self):
suite_repo=config.get_ceph_qa_suite_git_url(),
suite_relpath=self.args.suite_relpath,
flavor=self.args.flavor,
expire=expires.strftime(TIMESTAMP_FMT) if expires else None,
)
return self.build_base_config()

def get_expiration(self, _base_time: datetime.datetime | None = None) -> datetime.datetime | None:
"""
_base_time: For testing, calculate relative offsets from this base time

:returns: True if the job should run; False if it has expired
"""
log.info(f"Checking for expiration ({self.args.expire})")
expires_str = self.args.expire
if expires_str is None:
return None
now = datetime.datetime.now(datetime.timezone.utc)
if _base_time is None:
_base_time = now
try:
expires = parse_timestamp(expires_str)
except ValueError:
expires = _base_time + parse_offset(expires_str)
return expires

def choose_os(self):
os_type = self.args.distro
os_version = self.args.distro_version
Expand Down
4 changes: 4 additions & 0 deletions teuthology/suite/test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from teuthology.config import config

def pytest_runtest_setup():
config.load({})
4 changes: 3 additions & 1 deletion teuthology/suite/test/test_placeholder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def test_substitute_placeholders(self):
suite_repo='https://example.com/ceph/suite.git',
suite_relpath='',
ceph_repo='https://example.com/ceph/ceph.git',
flavor='default'
flavor='default',
expire='expire',
)
output_dict = substitute_placeholders(dict_templ, input_dict)
assert output_dict['suite'] == 'suite'
Expand Down Expand Up @@ -50,6 +51,7 @@ def test_null_placeholders_dropped(self):
suite_relpath='',
ceph_repo='https://example.com/ceph/ceph.git',
flavor=None,
expire='expire',
)
output_dict = substitute_placeholders(dict_templ, input_dict)
assert 'os_type' not in output_dict
44 changes: 42 additions & 2 deletions teuthology/suite/test/test_run_.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import contextlib
import yaml

from datetime import datetime
from datetime import datetime, timedelta, timezone
from mock import patch, call, ANY
from io import StringIO
from io import BytesIO

from teuthology.config import config, YamlConfig
from teuthology.exceptions import ScheduleFailError
from teuthology.suite import run
from teuthology.util.time import TIMESTAMP_FMT


class TestRun(object):
Expand Down Expand Up @@ -52,7 +53,7 @@ def test_email_addr(self, m_git_validate_sha1, m_choose_ceph_version,

@patch('teuthology.suite.run.util.fetch_repos')
def test_name(self, m_fetch_repos):
stamp = datetime.now().strftime('%Y-%m-%d_%H:%M:%S')
stamp = datetime.now().strftime(TIMESTAMP_FMT)
with patch.object(run.Run, 'create_initial_config',
return_value=run.JobConfig()):
name = run.Run(self.args).name
Expand Down Expand Up @@ -89,6 +90,45 @@ def test_branch_nonexistent(
with pytest.raises(ScheduleFailError):
self.klass(self.args)

@pytest.mark.parametrize(
["expire", "delta", "result"],
[
[None, timedelta(), False],
["1m", timedelta(), True],
["1m", timedelta(minutes=-2), False],
["1m", timedelta(minutes=2), True],
["7d", timedelta(days=-14), False],
]
)
@patch('teuthology.repo_utils.fetch_repo')
@patch('teuthology.suite.run.util.git_branch_exists')
@patch('teuthology.suite.run.util.package_version_for_hash')
@patch('teuthology.suite.run.util.git_ls_remote')
def test_get_expiration(
self,
m_git_ls_remote,
m_package_version_for_hash,
m_git_branch_exists,
m_fetch_repo,
expire,
delta,
result,
):
m_git_ls_remote.side_effect = 'hash'
m_package_version_for_hash.return_value = 'a_version'
m_git_branch_exists.return_value = True
self.args.expire = expire
obj = self.klass(self.args)
now = datetime.now(timezone.utc)
expires_result = obj.get_expiration(_base_time=now + delta)
if expire is None:
assert expires_result is None
assert obj.base_config['expire'] is None
else:
assert expires_result is not None
assert (now < expires_result) is result
assert obj.base_config['expire']

@patch('teuthology.suite.run.util.fetch_repos')
@patch('requests.head')
@patch('teuthology.suite.run.util.git_branch_exists')
Expand Down
Loading