From 9d97b17bfc90553c8c8baa80c8eac30d23c0435d Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 26 Apr 2021 19:16:45 +0530 Subject: [PATCH 1/7] Remove usage of beanstalk from teuthology The following changes support the removal of Beanstalk from Teuthology. In place of Beanstalk, we will now be using Paddles for queue management in Teuthology. This PR has the corresponding changes for the paddles PR: https://github.com/ceph/paddles/pull/94/files. The changes include: 1. Removing all beanstalk related code 2. Teuthology scheduler and dispatcher using Paddles queue for scheduling and dispatching jobs 3. Adding support for Paddles queue management 4. Additional functionality of being able to change the priority of Teuthology jobs in the queued state in the teuthology-queue command Signed-off-by: Aishwarya Mathuria --- scripts/dispatcher.py | 27 ++ scripts/kill.py | 2 +- scripts/queue.py | 18 +- scripts/schedule.py | 2 +- teuthology/dispatcher/__init__.py | 43 +-- teuthology/dispatcher/supervisor.py | 2 +- teuthology/kill.py | 38 --- teuthology/{beanstalk.py => paddles_queue.py} | 119 ++++---- teuthology/report.py | 260 +++++++++++++++++- teuthology/schedule.py | 31 +-- 10 files changed, 390 insertions(+), 152 deletions(-) rename teuthology/{beanstalk.py => paddles_queue.py} (72%) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 45dd61b264..72aad8d0cf 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,4 +1,31 @@ import argparse +""" +usage: teuthology-dispatcher --help + teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR + teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE + +Start a dispatcher for the specified machine type. Grab jobs from a paddles +queue and run the teuthology tests they describe as subprocesses. The +subprocess invoked is a teuthology-dispatcher command run in supervisor +mode. + +Supervisor mode: Supervise the job run described by its config. Reimage +target machines and invoke teuthology command. Unlock the target machines +at the end of the run. + +standard arguments: + -h, --help show this help message and exit + -v, --verbose be more verbose + -l, --log-dir LOG_DIR path in which to store logs + -a DIR, --archive-dir DIR path to archive results in + --machine-type MACHINE_TYPE the machine type for the job + --supervisor run dispatcher in job supervisor mode + --bin-path BIN_PATH teuthology bin path + --job-config CONFIG file descriptor of job's config file + --exit-on-empty-queue if the queue is empty, exit +""" + +import docopt import sys import teuthology.dispatcher.supervisor diff --git a/scripts/kill.py b/scripts/kill.py index 31acc8b1a4..e2a1a4ef09 100644 --- a/scripts/kill.py +++ b/scripts/kill.py @@ -12,7 +12,7 @@ teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN Kill running teuthology jobs: -1. Removes any queued jobs from the beanstalk queue +1. Removes any queued jobs from the paddles queue 2. Kills any running jobs 3. Nukes any machines involved diff --git a/scripts/queue.py b/scripts/queue.py index 8ea5ca5c2c..285a0adac9 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,21 +1,22 @@ import docopt import teuthology.config -import teuthology.beanstalk +import teuthology.paddles_queue doc = """ usage: teuthology-queue -h - teuthology-queue [-s|-d|-f] -m MACHINE_TYPE - teuthology-queue [-r] -m MACHINE_TYPE - teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -s -m MACHINE_TYPE + teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER + teuthology-queue [-r] -m MACHINE_TYPE -u USER + teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER + teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the queue. Arguments: - -m, --machine_type MACHINE_TYPE [default: multi] + -m, --machine_type MACHINE_TYPE Which machine type queue to work on. optional arguments: @@ -28,9 +29,12 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs + -u, --user USER User who owns the jobs """ def main(): args = docopt.docopt(doc) - teuthology.beanstalk.main(args) + teuthology.paddles_queue.main(args) diff --git a/scripts/schedule.py b/scripts/schedule.py index 58f7a46249..e9f0c1f5ff 100644 --- a/scripts/schedule.py +++ b/scripts/schedule.py @@ -21,7 +21,7 @@ Queue backend name, use prefix '@' to append job config to the given file path as yaml. - [default: beanstalk] + [default: paddles] -n , --name Name of suite run the job is part of -d , --description Job description -o , --owner Job owner diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 59f8ae3279..cc5658e188 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -5,6 +5,7 @@ import subprocess import sys import yaml +import json from typing import Dict, List @@ -64,6 +65,12 @@ def load_config(archive_dir=None): else: teuth_config.archive_base = archive_dir +def clean_config(config): + result = {} + for key in config: + if config[key] is not None: + result[key] = config[key] + return result def main(args): archive_dir = args.archive_dir or teuth_config.archive_base @@ -75,7 +82,17 @@ def main(args): "There is already a teuthology-dispatcher process running:" f" {procs}" ) + verbose = args["--verbose"] + machine_type = args["--machine-type"] + log_dir = args["--log-dir"] + archive_dir = args["--archive-dir"] + exit_on_empty_queue = args["--exit-on-empty-queue"] + if archive_dir is None: + archive_dir = teuth_config.archive_base + + if machine_type is None and teuth_config.machine_type is None: + return # setup logging for disoatcher in {log_dir} loglevel = logging.INFO if args.verbose: @@ -88,8 +105,6 @@ def main(args): load_config(archive_dir=archive_dir) - connection = beanstalk.connect() - beanstalk.watch_tube(connection, args.tube) result_proc = None if teuth_config.teuthology_path is None: @@ -118,21 +133,19 @@ def main(args): if rc is not None: worst_returncode = max([worst_returncode, rc]) job_procs.remove(proc) - job = connection.reserve(timeout=60) + job = report.get_queued_job(machine_type) if job is None: if args.exit_on_empty_queue and not job_procs: log.info("Queue is empty and no supervisor processes running; exiting!") break continue - - # bury the job so it won't be re-run if it fails - job.bury() - job_id = job.jid - log.info('Reserved job %d', job_id) - log.info('Config is: %s', job.body) - job_config = yaml.safe_load(job.body) - job_config['job_id'] = str(job_id) - + job = clean_config(job) + report.try_push_job_info(job, dict(status='running')) + job_id = job.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job) + job_config = job + if job_config.get('stop_worker'): keep_running = False @@ -192,12 +205,6 @@ def main(args): status='fail', failure_reason=error_message)) - # This try/except block is to keep the worker from dying when - # beanstalkc throws a SocketError - try: - job.delete() - except Exception: - log.exception("Saw exception while trying to delete job") return worst_returncode diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 83e6d997c5..72bca76049 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -136,7 +136,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if job_config['description'] is not None: + if 'description' in job_config: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/kill.py b/teuthology/kill.py index 137e49080e..ce3455cc8d 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -11,7 +11,6 @@ import teuthology.exporter -from teuthology import beanstalk from teuthology import report from teuthology.config import config from teuthology.lock import ops as lock_ops @@ -68,7 +67,6 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None, "you must also pass --machine-type") if not preserve_queue: - remove_beanstalk_jobs(run_name, machine_type) remove_paddles_jobs(run_name) if kill_processes(run_name, run_info.get('pids')): return @@ -123,7 +121,6 @@ def find_run_info(serializer, run_name): if not os.path.isdir(job_dir): continue job_num += 1 - beanstalk.print_progress(job_num, job_total, 'Reading Job: ') job_info = serializer.job_info(run_name, job_id, simple=True) for key in job_info.keys(): if key in run_info_fields and key not in run_info: @@ -142,41 +139,6 @@ def remove_paddles_jobs(run_name): report.try_delete_jobs(run_name, job_ids) -def remove_beanstalk_jobs(run_name, tube_name): - qhost = config.queue_host - qport = config.queue_port - if qhost is None or qport is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.yaml_path)) - log.info("Checking Beanstalk Queue...") - beanstalk_conn = beanstalk.connect() - real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name) - - curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready'] - if curjobs != 0: - x = 1 - while x != curjobs: - x += 1 - job = beanstalk_conn.reserve(timeout=20) - if job is None: - continue - job_config = yaml.safe_load(job.body) - if run_name == job_config['name']: - job_id = job.stats()['id'] - msg = "Deleting job from queue. ID: " + \ - "{id} Name: {name} Desc: {desc}".format( - id=str(job_id), - name=job_config['name'], - desc=job_config['description'], - ) - log.info(msg) - job.delete() - else: - print("No jobs in Beanstalk Queue") - beanstalk_conn.close() - - def kill_processes(run_name, pids=None): if pids: to_kill = set(pids).intersection(psutil.pids()) diff --git a/teuthology/beanstalk.py b/teuthology/paddles_queue.py similarity index 72% rename from teuthology/beanstalk.py rename to teuthology/paddles_queue.py index 76bc2c97ae..d66b9cd2f8 100644 --- a/teuthology/beanstalk.py +++ b/teuthology/paddles_queue.py @@ -1,6 +1,3 @@ -import beanstalkc -import json -import yaml import logging import pprint import sys @@ -9,6 +6,7 @@ from teuthology import report from teuthology.config import config + log = logging.getLogger(__name__) @@ -45,21 +43,28 @@ def callback(jobs_dict) log.info('No jobs in Beanstalk Queue') return - # Try to figure out a sane timeout based on how many jobs are in the queue - timeout = job_count / 2000.0 * 60 - for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") - job = connection.reserve(timeout=timeout) - if job is None or job.body is None: - continue - job_config = yaml.safe_load(job.body) - job_name = job_config['name'] - job_id = job.stats()['id'] - if pattern is not None and pattern not in job_name: - continue - processor.add_job(job_id, job_config, job) - end_progress() - processor.complete() +def stats_queue(machine_type): + stats = report.get_queue_stats(machine_type) + stats = report.get_queue_stats(machine_type) + if stats['paused'] is None: + log.info("%s queue is currently running with %s jobs queued", + stats['name'], + stats['count']) + else: + log.info("%s queue is paused with %s jobs queued", + stats['name'], + stats['count']) + + +def update_priority(machine_type, priority, user): + jobs = report.get_user_jobs_queue(machine_type, user) + for job in jobs: + job['priority'] = priority + report.try_push_job_info(job) + + +def pause_queue(machine_type, pause_duration, paused_by): + report.pause_queue(machine_type, paused_by, pause_duration) def print_progress(index, total, message=None): @@ -74,6 +79,29 @@ def end_progress(): sys.stderr.flush() +def walk_jobs(machine_type, processor, user): + log.info("Checking paddles queue...") + job_count = report.get_queue_stats(machine_type)['count'] + + jobs = report.get_user_jobs_queue(machine_type, user) + if job_count == 0: + log.info('No jobs in queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = jobs[i-1] + if job is None: + continue + job_name = job['name'] + job_id = job['job_id'] + processor.add_job(job_id, job) + end_progress() + processor.complete() + + class JobProcessor(object): def __init__(self): self.jobs = OrderedDict() @@ -152,38 +180,13 @@ def process_job(self, job_id): job_id=job_id, job_name=job_name, )) - job_obj = self.jobs[job_id].get('job_obj') - if job_obj: - job_obj.delete() report.try_delete_jobs(job_name, job_id) -def pause_tube(connection, tube, duration): - duration = int(duration) - if not tube: - tubes = sorted(connection.tubes()) - else: - tubes = [tube] - - prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" - templ = prefix + ": {tubes}" - log.info(templ.format(dur=duration, tubes=tubes)) - for tube in tubes: - connection.pause_tube(tube, duration) - - -def stats_tube(connection, tube): - stats = connection.stats_tube(tube) - result = dict( - name=tube, - count=stats['current-jobs-ready'], - paused=(stats['pause'] != 0), - ) - return result - - def main(args): machine_type = args['--machine_type'] + user = args['--user'] + priority = args['--priority'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] @@ -191,25 +194,21 @@ def main(args): full = args['--full'] pause_duration = args['--pause'] try: - connection = connect() - if machine_type and not pause_duration: - # watch_tube needs to be run before we inspect individual jobs; - # it is not needed for pausing tubes - watch_tube(connection, machine_type) if status: - print(json.dumps(stats_tube(connection, machine_type))) + stats_queue(machine_type) elif pause_duration: - pause_tube(connection, machine_type, pause_duration) + pause_queue(machine_type, pause_duration, user) + elif priority: + update_priority(machine_type, priority, user) elif delete: - walk_jobs(connection, machine_type, - JobDeleter(delete)) + walk_jobs(machine_type, + JobDeleter(delete), user) elif runs: - walk_jobs(connection, machine_type, - RunPrinter()) + walk_jobs(machine_type, + RunPrinter(), user) else: - walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) + walk_jobs(machine_type, + JobPrinter(show_desc=show_desc, full=full), + user) except KeyboardInterrupt: log.info("Interrupted.") - finally: - connection.close() diff --git a/teuthology/report.py b/teuthology/report.py index f0a4472017..1f8a848fba 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -6,6 +6,7 @@ import logging import random import socket +import threading from datetime import datetime import teuthology @@ -263,6 +264,39 @@ def report_run(self, run_name, dead=False): self.log.debug(" no jobs; skipped") return len(jobs) + def write_new_job(self, run_name, job_info): + """ + Report a new job to the results server. + + :param run_name: The name of the run. The run must already exist. + :param job_info: The job's info dict. Must be present since this is a new job + """ + if job_info is None or not isinstance(job_info, dict): + raise TypeError("Job info must be a dict") + run_uri = "{base}/runs/{name}/jobs/".format( + base=self.base_uri, name=run_name, + ) + job_json = json.dumps(job_info) + headers = {'content-type': 'application/json'} + response = self.session.post(run_uri, data=job_json, headers=headers) + + if response.status_code == 200: + resp_json = response.json() + job_id = resp_json['job_id'] + return job_id + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=run_uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + return None + + def report_jobs(self, run_name, job_ids, dead=False): """ Report several jobs to the results server. @@ -292,12 +326,13 @@ def report_job(self, run_name, job_id, job_info=None, dead=False): set_status(job_info, 'dead') job_json = json.dumps(job_info) headers = {'content-type': 'application/json'} + job_uri = os.path.join(run_uri, job_id, '') inc = random.uniform(0, 1) with safe_while( sleep=1, increment=inc, action=f'report job {job_id}') as proceed: while proceed(): - response = self.session.post(run_uri, data=job_json, headers=headers) + response = self.session.put(job_uri, data=job_json, headers=headers) if response.status_code == 200: return @@ -314,15 +349,9 @@ def report_job(self, run_name, job_id, job_info=None, dead=False): else: msg = response.text - if msg and msg.endswith('already exists'): - job_uri = os.path.join(run_uri, job_id, '') - response = self.session.put(job_uri, data=job_json, - headers=headers) - if response.status_code == 200: - return - elif msg: + if msg: self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( + "PUT to {uri} failed with status {status}: {msg}".format( uri=run_uri, status=response.status_code, msg=msg, @@ -352,6 +381,14 @@ def last_run(self): self.__last_run = None if os.path.exists(self.last_run_file): os.remove(self.last_run_file) + + def get_top_job(self, machine_type): + + uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri, + machine_type=machine_type) + response = self.session.get(uri) + response.raise_for_status() + return response.json() def get_jobs(self, run_name, job_id=None, fields=None): """ @@ -457,6 +494,164 @@ def delete_run(self, run_name): response = self.session.delete(uri) response.raise_for_status() + def create_queue(self, machine_type): + """ + Create a queue on the results server + + :param machine_type: The machine type specified for the job + """ + uri = "{base}/queue/".format( + base=self.base_uri + ) + queue_info = {'machine_type': machine_type} + queue_json = json.dumps(queue_info) + headers = {'content-type': 'application/json'} + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully created queue for {machine_type}".format( + machine_type=machine_type, + )) + else: + resp_json = response.json() + if resp_json: + msg = resp_json.get('message', '') + else: + msg = response.text + if msg and msg.endswith('already exists'): + return + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + + def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None): + uri = "{base}/queue/".format( + base=self.base_uri + ) + if pause_duration is not None: + pause_duration = int(pause_duration) + queue_info = {'machine_type': machine_type, 'paused': paused, 'paused_by': paused_by, + 'pause_duration': pause_duration} + queue_json = json.dumps(queue_info) + headers = {'content-type': 'application/json'} + response = self.session.put(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully updated queue for {machine_type}".format( + machine_type=machine_type, + )) + else: + msg = response.text + self.log.error( + "PUT to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + + + def queue_stats(self, machine_type): + uri = "{base}/queue/stats/".format( + base=self.base_uri + ) + queue_info = {'machine_type': machine_type} + queue_json = json.dumps(queue_info) + + headers = {'content-type': 'application/json'} + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully retrieved stats for queue {machine_type}".format( + machine_type=machine_type, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + response.raise_for_status() + + def queued_jobs(self, machine_type, user): + uri = "{base}/queue/queued_jobs/".format( + base=self.base_uri + ) + request_info = {'machine_type': machine_type, + 'user': user} + request_json = json.dumps(request_info) + headers = {'content-type': 'application/json'} + response = self.session.post(uri, data=request_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully retrieved jobs for user {user}".format( + user=user, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + response.raise_for_status() + + +def create_machine_type_queue(machine_type): + reporter = ResultsReporter() + if not reporter.base_uri: + return + reporter.create_queue(machine_type) + + +def get_user_jobs_queue(machine_type, user): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(machine_type, user) + + +def pause_queue(machine_type, paused_by, pause_duration): + reporter = ResultsReporter() + if not reporter.base_uri: + return + paused = True + reporter.update_queue(machine_type, paused, paused_by, pause_duration) + paused = False + timer = threading.Timer(int(pause_duration), reporter.update_queue, [machine_type, paused, paused_by]) + timer.daemon = True + timer.start() + timer.join() + + +def is_queue_paused(machine_type): + reporter = ResultsReporter() + if not reporter.base_uri: + return + stats = reporter.queue_stats(machine_type) + if stats['paused'] != 0 and stats['paused'] is not None: + return True + return False + + +def get_queue_stats(machine_type): + reporter = ResultsReporter() + if not reporter.base_uri: + return + stats = reporter.queue_stats(machine_type) + return stats + def push_job_info(run_name, job_id, job_info, base_uri=None): """ @@ -480,6 +675,23 @@ def push_job_info(run_name, job_id, job_info, base_uri=None): ) +def get_queued_job(machine_type): + """ + Retrieve a job that is queued depending on priority + + """ + log = init_logging() + reporter = ResultsReporter() + if not reporter.base_uri: + return + if is_queue_paused(machine_type) == True: + log.info("Teuthology queue for machine type %s is currently paused", + machine_type) + return None + else: + return reporter.get_top_job(machine_type) + + def try_push_job_info(job_config, extra_info=None): """ Wrap push_job_info, gracefully doing nothing if: @@ -519,6 +731,36 @@ def try_push_job_info(job_config, extra_info=None): config.results_server) +def try_create_job(job_config, extra_info=None): + log = init_logging() + + if not config.results_server: + log.warning('No results_server in config; not reporting results') + return + + reporter = ResultsReporter() + if not reporter.base_uri: + return + + run_name = job_config['name'] + + if extra_info is not None: + job_info = extra_info.copy() + job_info.update(job_config) + else: + job_info = job_config + + try: + log.debug("Writing job info to %s", config.results_server) + job_id = reporter.write_new_job(run_name, job_info) + log.info("Job ID: %s", job_id) + if job_id is not None: + return job_id + except report_exceptions: + log.exception("Could not report results to %s", + config.results_server) + + def try_delete_jobs(run_name, job_ids, delete_empty_run=True): """ Using the same error checking and retry mechanism as try_push_job_info(), diff --git a/teuthology/schedule.py b/teuthology/schedule.py index d9af64efc4..4836884a9e 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,9 +1,9 @@ import os import yaml -import teuthology.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report +from teuthology.config import config def main(args): @@ -35,13 +35,13 @@ def main(args): backend = args['--queue-backend'] if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) - elif backend == 'beanstalk': + elif backend == 'paddles': schedule_job(job_config, args['--num'], report_status) elif backend.startswith('@'): dump_job_to_file(backend.lstrip('@'), job_config, args['--num']) else: raise ValueError("Provided schedule backend '%s' is not supported. " - "Try 'beanstalk' or '@path-to-a-file" % backend) + "Try 'paddles' or '@path-to-a-file" % backend) def build_config(args): @@ -96,22 +96,19 @@ def schedule_job(job_config, num=1, report_status=True): """ num = int(num) job = yaml.safe_dump(job_config) - tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() - beanstalk.use(tube) + + ''' + Add 'machine_type' queue to DB here. + ''' + report.create_machine_type_queue(job_config['machine_type']) while num > 0: - jid = beanstalk.put( - job, - ttr=60 * 60 * 24, - priority=job_config['priority'], - ) - print('Job scheduled with name {name} and ID {jid}'.format( - name=job_config['name'], jid=jid)) - job_config['job_id'] = str(jid) - if report_status: - report.try_push_job_info(job_config, dict(status='queued')) - num -= 1 + job_id = report.try_create_job(job_config, dict(status='queued')) + print('Job scheduled with name {name} and ID {job_id}'.format( + name=job_config['name'], job_id=job_id)) + job_config['job_id'] = str(job_id) + + num -= 1 def dump_job_to_file(path, job_config, num=1): """ From 99e5ca631ccc8cf4f58833aeb015be60f41b49ef Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 7 Jun 2021 19:12:23 +0530 Subject: [PATCH 2/7] Modify teuthology worker unit test and fix flake8 errors Previously the teuthology worker unit test used beanstalk, the test will be using Paddles now. Signed-off-by: Aishwarya Mathuria --- teuthology/dispatcher/__init__.py | 1 - teuthology/kill.py | 2 -- teuthology/paddles_queue.py | 3 --- teuthology/schedule.py | 3 --- 4 files changed, 9 deletions(-) diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index cc5658e188..c61f03ab02 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -5,7 +5,6 @@ import subprocess import sys import yaml -import json from typing import Dict, List diff --git a/teuthology/kill.py b/teuthology/kill.py index ce3455cc8d..1b0d247819 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -12,7 +12,6 @@ import teuthology.exporter from teuthology import report -from teuthology.config import config from teuthology.lock import ops as lock_ops log = logging.getLogger(__name__) @@ -116,7 +115,6 @@ def find_run_info(serializer, run_name): job_info = {} job_num = 0 jobs = serializer.jobs_for_run(run_name) - job_total = len(jobs) for (job_id, job_dir) in jobs.items(): if not os.path.isdir(job_dir): continue diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py index d66b9cd2f8..fd1e09c45e 100644 --- a/teuthology/paddles_queue.py +++ b/teuthology/paddles_queue.py @@ -88,14 +88,11 @@ def walk_jobs(machine_type, processor, user): log.info('No jobs in queue') return - # Try to figure out a sane timeout based on how many jobs are in the queue - timeout = job_count / 2000.0 * 60 for i in range(1, job_count + 1): print_progress(i, job_count, "Loading") job = jobs[i-1] if job is None: continue - job_name = job['name'] job_id = job['job_id'] processor.add_job(job_id, job) end_progress() diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 4836884a9e..4ccf780845 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -3,7 +3,6 @@ from teuthology.misc import get_user, merge_configs from teuthology import report -from teuthology.config import config def main(args): @@ -95,8 +94,6 @@ def schedule_job(job_config, num=1, report_status=True): :param num: The number of times to schedule the job """ num = int(num) - job = yaml.safe_dump(job_config) - ''' Add 'machine_type' queue to DB here. ''' From 72d4408c223a737c93f2ef1272bd7815812eaee3 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 9 Jun 2021 19:06:37 +0530 Subject: [PATCH 3/7] Add retry for paddles calls and modify pause queue command 1. Add retry loop for the paddles calls. 2. Add run name as a parameter for updating priority of jobs in paddles. 3. Modify the pause queue command to run on server side with an optional pause duration parameter. Signed-off-by: Aishwarya Mathuria --- scripts/queue.py | 20 ++- teuthology/dispatcher/__init__.py | 16 +++ teuthology/paddles_queue.py | 31 +++-- teuthology/report.py | 208 +++++++++++++++++------------- 4 files changed, 167 insertions(+), 108 deletions(-) diff --git a/scripts/queue.py b/scripts/queue.py index 285a0adac9..a07598a92f 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -6,10 +6,12 @@ doc = """ usage: teuthology-queue -h teuthology-queue -s -m MACHINE_TYPE - teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER - teuthology-queue [-r] -m MACHINE_TYPE -u USER - teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER - teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER + teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] + teuthology-queue [-r] -m MACHINE_TYPE -U USER + teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER + teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER + teuthology-queue -u -m MACHINE_TYPE -U USER List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -26,12 +28,16 @@ -r, --runs Only show run names -f, --full Print the entire job config. Use with caution. -s, --status Prints the status of the queue - -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 - will unpause. If -m is passed, pause that queue, + -t, --time SECONDS Pause queues for a number of seconds. + If -m is passed, pause that queue, otherwise pause all queues. + -p, --pause Pause queue + -u, --unpause Unpause queue -P, --priority PRIORITY Change priority of queued jobs - -u, --user USER User who owns the jobs + -U, --user USER User who owns the jobs + -R, --run_name RUN_NAME + Used to change priority of all jobs in the run. """ diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index c61f03ab02..db4d62e2b4 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -7,6 +7,7 @@ import yaml from typing import Dict, List +from time import sleep from teuthology import ( # non-modules @@ -369,3 +370,18 @@ def create_job_archive(job_name, job_archive_path, archive_dir): if not os.path.exists(run_archive): safepath.makedirs('/', run_archive) safepath.makedirs('/', job_archive_path) + + +def pause_queue(machine_type, paused, paused_by, pause_duration=None): + if paused == True: + report.pause_queue(machine_type, paused, paused_by, pause_duration) + ''' + If there is a pause duration specified + un-pause the queue after the time elapses + ''' + if pause_duration is not None: + sleep(int(pause_duration)) + paused = False + report.pause_queue(machine_type, paused, paused_by) + elif paused == False: + report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py index fd1e09c45e..99cfe77e6e 100644 --- a/teuthology/paddles_queue.py +++ b/teuthology/paddles_queue.py @@ -4,7 +4,7 @@ from collections import OrderedDict from teuthology import report -from teuthology.config import config +from teuthology.dispatcher import pause_queue log = logging.getLogger(__name__) @@ -44,7 +44,6 @@ def callback(jobs_dict) return def stats_queue(machine_type): - stats = report.get_queue_stats(machine_type) stats = report.get_queue_stats(machine_type) if stats['paused'] is None: log.info("%s queue is currently running with %s jobs queued", @@ -56,17 +55,16 @@ def stats_queue(machine_type): stats['count']) -def update_priority(machine_type, priority, user): - jobs = report.get_user_jobs_queue(machine_type, user) +def update_priority(machine_type, priority, user, run_name=None): + if run_name is not None: + jobs = report.get_user_jobs_queue(machine_type, user, run_name) + else: + jobs = report.get_user_jobs_queue(machine_type, user) for job in jobs: job['priority'] = priority report.try_push_job_info(job) -def pause_queue(machine_type, pause_duration, paused_by): - report.pause_queue(machine_type, paused_by, pause_duration) - - def print_progress(index, total, message=None): msg = "{m} ".format(m=message) if message else '' sys.stderr.write("{msg}{i}/{total}\r".format( @@ -183,20 +181,29 @@ def process_job(self, job_id): def main(args): machine_type = args['--machine_type'] user = args['--user'] + run_name = args['--run_name'] priority = args['--priority'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] show_desc = args['--description'] full = args['--full'] - pause_duration = args['--pause'] + pause = args['--pause'] + unpause = args['--unpause'] + pause_duration = args['--time'] try: if status: stats_queue(machine_type) - elif pause_duration: - pause_queue(machine_type, pause_duration, user) + elif pause: + if pause_duration: + pause_queue(machine_type, pause, user, pause_duration) + else: + pause_queue(machine_type, pause, user) + elif unpause: + pause = False + pause_queue(machine_type, pause, user) elif priority: - update_priority(machine_type, priority, user) + update_priority(machine_type, priority, user, run_name) elif delete: walk_jobs(machine_type, JobDeleter(delete), user) diff --git a/teuthology/report.py b/teuthology/report.py index 1f8a848fba..f675145b58 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -6,7 +6,6 @@ import logging import random import socket -import threading from datetime import datetime import teuthology @@ -278,20 +277,25 @@ def write_new_job(self, run_name, job_info): ) job_json = json.dumps(job_info) headers = {'content-type': 'application/json'} - response = self.session.post(run_uri, data=job_json, headers=headers) - if response.status_code == 200: - resp_json = response.json() - job_id = resp_json['job_id'] - return job_id - else: - msg = response.text - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=run_uri, - status=response.status_code, - msg=msg, - )) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'write job for {run_name}') as proceed: + while proceed(): + response = self.session.post(run_uri, data=job_json, headers=headers) + + if response.status_code == 200: + resp_json = response.json() + job_id = resp_json['job_id'] + return job_id + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=run_uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() return None @@ -386,9 +390,15 @@ def get_top_job(self, machine_type): uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri, machine_type=machine_type) - response = self.session.get(uri) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed: + while proceed(): + response = self.session.get(uri) + if response.status_code == 200: + return response.json() response.raise_for_status() - return response.json() + def get_jobs(self, run_name, job_id=None, fields=None): """ @@ -506,30 +516,36 @@ def create_queue(self, machine_type): queue_info = {'machine_type': machine_type} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} - response = self.session.post(uri, data=queue_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully created queue for {machine_type}".format( - machine_type=machine_type, - )) - else: - resp_json = response.json() - if resp_json: - msg = resp_json.get('message', '') - else: - msg = response.text - if msg and msg.endswith('already exists'): - return - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed: + while proceed(): + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully created queue for {machine_type}".format( + machine_type=machine_type, + )) + return + else: + resp_json = response.json() + if resp_json: + msg = resp_json.get('message', '') + else: + msg = response.text + if msg and msg.endswith('already exists'): + return + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() - def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None): + def update_queue(self, machine_type, paused, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) @@ -539,20 +555,26 @@ def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} - response = self.session.put(uri, data=queue_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully updated queue for {machine_type}".format( - machine_type=machine_type, - )) - else: - msg = response.text - self.log.error( - "PUT to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed: + while proceed(): + response = self.session.put(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully updated queue for {machine_type}".format( + machine_type=machine_type, + )) + return + else: + msg = response.text + self.log.error( + "PUT to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() @@ -565,46 +587,60 @@ def queue_stats(self, machine_type): queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} - response = self.session.post(uri, data=queue_json, headers=headers) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed: + while proceed(): + response = self.session.post(uri, data=queue_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {machine_type}".format( - machine_type=machine_type, - )) - return response.json() - else: - msg = response.text - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + if response.status_code == 200: + self.log.info("Successfully retrieved stats for queue {machine_type}".format( + machine_type=machine_type, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() - def queued_jobs(self, machine_type, user): + def queued_jobs(self, machine_type, user, run_name): uri = "{base}/queue/queued_jobs/".format( base=self.base_uri ) - request_info = {'machine_type': machine_type, - 'user': user} + if run_name is not None: + filter_field = run_name + request_info = {'machine_type': machine_type, + 'run_name': run_name} + else: + filter_field = user + request_info = {'machine_type': machine_type, + 'user': user} request_json = json.dumps(request_info) headers = {'content-type': 'application/json'} - response = self.session.post(uri, data=request_json, headers=headers) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get queued jobs {filter_field}') as proceed: + while proceed(): + response = self.session.post(uri, data=request_json, headers=headers) - if response.status_code == 200: - self.log.info("Successfully retrieved jobs for user {user}".format( - user=user, - )) - return response.json() - else: - msg = response.text - self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( - uri=uri, - status=response.status_code, - msg=msg, - )) + if response.status_code == 200: + self.log.info("Successfully retrieved jobs for {filter_field}".format( + filter_field=filter_field, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) response.raise_for_status() @@ -615,24 +651,18 @@ def create_machine_type_queue(machine_type): reporter.create_queue(machine_type) -def get_user_jobs_queue(machine_type, user): +def get_user_jobs_queue(machine_type, user, run_name=None): reporter = ResultsReporter() if not reporter.base_uri: return - return reporter.queued_jobs(machine_type, user) + return reporter.queued_jobs(machine_type, user, run_name) -def pause_queue(machine_type, paused_by, pause_duration): +def pause_queue(machine_type, paused, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - paused = True reporter.update_queue(machine_type, paused, paused_by, pause_duration) - paused = False - timer = threading.Timer(int(pause_duration), reporter.update_queue, [machine_type, paused, paused_by]) - timer.daemon = True - timer.start() - timer.join() def is_queue_paused(machine_type): From 0cf9a5e1109d175aab18305f0904ef0d7f216377 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 14 Jun 2021 11:49:08 +0530 Subject: [PATCH 4/7] Add new test cases for dispatcher and scheduler Signed-off-by: Aishwarya Mathuria --- teuthology/test/test_dispatcher.py | 107 +++++++++++++++++++++++++++++ teuthology/test/test_schedule.py | 42 ++++++++++- 2 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 teuthology/test/test_dispatcher.py diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py new file mode 100644 index 0000000000..6b0dddfe2f --- /dev/null +++ b/teuthology/test/test_dispatcher.py @@ -0,0 +1,107 @@ +from teuthology import dispatcher +from unittest.mock import patch, Mock +from teuthology import report + +import unittest.mock as mock +import unittest + + +class TestDispatcher(unittest.TestCase): + + def test_mock_get_queue_job(self): + mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job') + machine_type = 'test_queue' + job_config = { + 'job_id': '1', + 'description': 'DESC', + 'email': 'EMAIL', + 'first_in_suite': False, + 'last_in_suite': True, + 'machine_type': 'test_queue', + 'name': 'NAME', + 'owner': 'OWNER', + 'priority': 99, + 'results_timeout': '6', + 'verbose': False, + } + + mock_get = mock_get_patcher.start() + mock_get.return_value = Mock(status_code = 200) + mock_get.return_value.json.return_value = job_config + + response = report.get_queued_job(machine_type) + + mock_get_patcher.stop() + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), job_config) + + + @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.dispatcher.fetch_qa_suite") + @patch("teuthology.worker.fetch_qa_suite") + @patch("teuthology.dispatcher.report.get_queued_job") + @patch("teuthology.dispatcher.report.try_push_job_info") + @patch("teuthology.dispatcher.setup_log_file") + @patch("os.path.isdir") + @patch("os.getpid") + @patch("teuthology.dispatcher.teuth_config") + @patch("subprocess.Popen") + @patch("os.path.join") + @patch("teuthology.dispatcher.create_job_archive") + @patch("yaml.safe_dump") + def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, + m_worker_fetch_qa_suite, m_get_queued_job, + m_try_push_job_info, + m_setup_log, + m_isdir, m_getpid, + m_t_config, m_popen, m_join, m_create_archive, m_yaml_dump): + + args = { + '--owner': 'the_owner', + '--archive-dir': '/archive/dir', + '--log-dir': '/worker/log', + '--name': 'the_name', + '--description': 'the_description', + '--machine-type': 'test_queue', + '--supervisor': False, + '--verbose': False + } + + m = mock.MagicMock() + job_id = {'job_id': '1'} + m.__getitem__.side_effect = job_id.__getitem__ + m.__iter__.side_effect = job_id.__iter__ + job = { + 'job_id': '1', + 'description': 'DESC', + 'email': 'EMAIL', + 'first_in_suite': False, + 'last_in_suite': True, + 'machine_type': 'test_queue', + 'name': 'NAME', + 'owner': 'OWNER', + 'priority': 99, + 'results_timeout': '6', + 'verbose': False, + 'stop_worker': True, + 'archive_path': '/archive/dir/NAME/1' + } + + m_fetch_teuthology.return_value = '/teuth/path' + m_fetch_qa_suite.return_value = '/suite/path' + m_isdir.return_value = True + mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job') + mock_get = mock_get_patcher.start() + mock_get.return_value = job + + mock_prep_job_patcher = patch('teuthology.dispatcher.prep_job') + mock_prep_job = mock_prep_job_patcher.start() + mock_prep_job.return_value = (job, '/teuth/bin/path') + m_yaml_dump.return_value = '' + + m_try_push_job_info.called_once_with(job, dict(status='running')) + dispatcher.main(args) + mock_get_patcher.stop() + + diff --git a/teuthology/test/test_schedule.py b/teuthology/test/test_schedule.py index dd0a68f845..1add70620a 100644 --- a/teuthology/test/test_schedule.py +++ b/teuthology/test/test_schedule.py @@ -1,8 +1,14 @@ from teuthology.schedule import build_config from teuthology.misc import get_user +from unittest.mock import patch, Mock +from teuthology import report +from teuthology import schedule +import unittest +import os -class TestSchedule(object): + +class TestSchedule(unittest.TestCase): basic_args = { '--verbose': False, '--owner': 'OWNER', @@ -43,3 +49,37 @@ def test_owner(self): job_dict = build_config(self.basic_args) assert job_dict['owner'] == 'scheduled_%s' % get_user() + + def test_dump_job_to_file(self): + path = 'teuthology/test/job' + job_config = { + 'description': 'DESC', + 'email': 'EMAIL', + 'first_in_suite': False, + 'last_in_suite': True, + 'machine_type': 'tala', + 'name': 'NAME', + 'owner': 'OWNER', + 'priority': 99, + 'results_timeout': '6', + 'verbose': False, + 'tube': 'tala', + } + schedule.dump_job_to_file(path, job_config) + + count_file_path = path + '.count' + assert os.path.exists(count_file_path) == True + + + def test_mock_create_queue(self): + mock_get_patcher = patch('teuthology.schedule.report.create_machine_type_queue') + machine_type = 'test_queue' + + mock_get = mock_get_patcher.start() + mock_get.return_value = Mock(status_code = 200) + + response = report.create_machine_type_queue(machine_type) + + mock_get_patcher.stop() + + self.assertEqual(response.status_code, 200) From e9122c6d790e4db7b86aed8f6ff8a4b3f0dad256 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 4 Oct 2021 19:24:41 +0530 Subject: [PATCH 5/7] Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles With the use of the --queue-backend argument the user can specify which backend(paddles/beanstalk) they would like to use for maintaining the teuthology Jobs queue. In order to avoid overlapping Job IDs, when a job is being scheduled in beanstalk it is also written to paddles which returns a unique ID. This is the ID teuthology will treat as the Job ID throughout the run of the job. To differentiate between the 2 queue backends, the teuthology-queue command has been split into teuthology-paddles-queue command and teuthology-beanstalk-queue command. Signed-off-by: Aishwarya Mathuria --- scripts/beanstalk_queue.py | 35 ++++ scripts/dispatcher.py | 5 +- scripts/{queue.py => paddles_queue.py} | 18 +-- teuthology/beanstalk.py | 214 +++++++++++++++++++++++++ teuthology/config.py | 1 + teuthology/dispatcher/__init__.py | 51 ++++-- teuthology/dispatcher/supervisor.py | 6 + teuthology/kill.py | 42 +++++ teuthology/orchestra/run.py | 1 - teuthology/report.py | 67 ++++---- teuthology/schedule.py | 55 +++++-- teuthology/test/test_dispatcher.py | 3 +- 12 files changed, 425 insertions(+), 73 deletions(-) create mode 100644 scripts/beanstalk_queue.py rename scripts/{queue.py => paddles_queue.py} (69%) create mode 100644 teuthology/beanstalk.py diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py new file mode 100644 index 0000000000..88a8242847 --- /dev/null +++ b/scripts/beanstalk_queue.py @@ -0,0 +1,35 @@ +import docopt + +import teuthology.config +import teuthology.beanstalk + +doc = """ +usage: teuthology-beanstalk-queue -h + teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-beanstalk-queue [-r] -m MACHINE_TYPE + teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN + teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE] +List Jobs in queue. +If -D is passed, then jobs with PATTERN in the job name are deleted from the +queue. +Arguments: + -m, --machine_type MACHINE_TYPE [default: multi] + Which machine type queue to work on. +optional arguments: + -h, --help Show this help message and exit + -D, --delete PATTERN Delete Jobs with PATTERN in their name + -d, --description Show job descriptions + -r, --runs Only show run names + -f, --full Print the entire job config. Use with caution. + -s, --status Prints the status of the queue + -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 + will unpause. If -m is passed, pause that queue, + otherwise pause all queues. +""" + + +def main(): + + args = docopt.docopt(doc) + print(args) + teuthology.beanstalk.main(args) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 72aad8d0cf..fffdb634b8 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -2,9 +2,9 @@ """ usage: teuthology-dispatcher --help teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR - teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE + teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND -Start a dispatcher for the specified machine type. Grab jobs from a paddles +Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is a teuthology-dispatcher command run in supervisor mode. @@ -23,6 +23,7 @@ --bin-path BIN_PATH teuthology bin path --job-config CONFIG file descriptor of job's config file --exit-on-empty-queue if the queue is empty, exit + --queue-backend BACKEND choose between paddles and beanstalk """ import docopt diff --git a/scripts/queue.py b/scripts/paddles_queue.py similarity index 69% rename from scripts/queue.py rename to scripts/paddles_queue.py index a07598a92f..3c69d772e6 100644 --- a/scripts/queue.py +++ b/scripts/paddles_queue.py @@ -4,14 +4,14 @@ import teuthology.paddles_queue doc = """ -usage: teuthology-queue -h - teuthology-queue -s -m MACHINE_TYPE - teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-queue [-r] -m MACHINE_TYPE -U USER - teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-queue -u -m MACHINE_TYPE -U USER +usage: teuthology-paddles-queue -h + teuthology-paddles-queue -s -m MACHINE_TYPE + teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] + teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER + teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -u -m MACHINE_TYPE -U USER List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -36,7 +36,7 @@ -P, --priority PRIORITY Change priority of queued jobs -U, --user USER User who owns the jobs - -R, --run_name RUN_NAME + -R, --run-name RUN_NAME Used to change priority of all jobs in the run. """ diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py new file mode 100644 index 0000000000..a1165becca --- /dev/null +++ b/teuthology/beanstalk.py @@ -0,0 +1,214 @@ +import beanstalkc +import yaml +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology.config import config +from teuthology import report + +log = logging.getLogger(__name__) + + +def connect(): + host = config.queue_host + port = config.queue_port + if host is None or port is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.teuthology_yaml)) + return beanstalkc.Connection(host=host, port=port) + + +def watch_tube(connection, tube_name): + """ + Watch a given tube, potentially correcting to 'multi' if necessary. Returns + the tube_name that was actually used. + """ + if ',' in tube_name: + log.debug("Correcting tube name to 'multi'") + tube_name = 'multi' + connection.watch(tube_name) + connection.ignore('default') + return tube_name + + +def walk_jobs(connection, tube_name, processor, pattern=None): + """ + def callback(jobs_dict) + """ + log.info("Checking Beanstalk Queue...") + job_count = connection.stats_tube(tube_name)['current-jobs-ready'] + if job_count == 0: + log.info('No jobs in Beanstalk Queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = connection.reserve(timeout=timeout) + if job is None or job.body is None: + continue + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + if pattern is not None and pattern not in job_name: + continue + processor.add_job(job_id, job_config, job) + end_progress() + processor.complete() + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + job_obj = self.jobs[job_id].get('job_obj') + if job_obj: + job_obj.delete() + report.try_delete_jobs(job_name, job_id) + + +def pause_tube(connection, tube, duration): + duration = int(duration) + if not tube: + tubes = sorted(connection.tubes()) + else: + tubes = [tube] + + prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" + templ = prefix + ": {tubes}" + log.info(templ.format(dur=duration, tubes=tubes)) + for tube in tubes: + connection.pause_tube(tube, duration) + + +def stats_tube(connection, tube): + stats = connection.stats_tube(tube) + result = dict( + name=tube, + count=stats['current-jobs-ready'], + paused=(stats['pause'] != 0), + ) + return result + + +def main(args): + machine_type = args['--machine_type'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + try: + connection = connect() + if machine_type and not pause_duration: + # watch_tube needs to be run before we inspect individual jobs; + # it is not needed for pausing tubes + watch_tube(connection, machine_type) + if status: + print(stats_tube(connection, machine_type)) + elif pause_duration: + pause_tube(connection, machine_type, pause_duration) + elif delete: + walk_jobs(connection, machine_type, + JobDeleter(delete)) + elif runs: + walk_jobs(connection, machine_type, + RunPrinter()) + else: + walk_jobs(connection, machine_type, + JobPrinter(show_desc=show_desc, full=full)) + except KeyboardInterrupt: + log.info("Interrupted.") + finally: + connection.close() diff --git a/teuthology/config.py b/teuthology/config.py index 30204aa466..0f5912e87f 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -144,6 +144,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, + 'backend': 'paddles', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index db4d62e2b4..47e90797b3 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -68,6 +68,8 @@ def load_config(archive_dir=None): def clean_config(config): result = {} for key in config: + if key == 'status': + continue if config[key] is not None: result[key] = config[key] return result @@ -87,6 +89,7 @@ def main(args): log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] exit_on_empty_queue = args["--exit-on-empty-queue"] + backend = args['--queue-backend'] if archive_dir is None: archive_dir = teuth_config.archive_base @@ -105,6 +108,10 @@ def main(args): load_config(archive_dir=archive_dir) + if backend == 'beanstalk': + connection = beanstalk.connect() + beanstalk.watch_tube(connection, machine_type) + result_proc = None if teuth_config.teuthology_path is None: @@ -133,19 +140,26 @@ def main(args): if rc is not None: worst_returncode = max([worst_returncode, rc]) job_procs.remove(proc) - job = report.get_queued_job(machine_type) - if job is None: - if args.exit_on_empty_queue and not job_procs: - log.info("Queue is empty and no supervisor processes running; exiting!") - break - continue - job = clean_config(job) - report.try_push_job_info(job, dict(status='running')) - job_id = job.get('job_id') - log.info('Reserved job %s', job_id) - log.info('Config is: %s', job) - job_config = job - + if backend == 'beanstalk': + job = connection.reserve(timeout=60) + if job is None: + continue + job.bury() + job_config = yaml.safe_load(job.body) + job_id = job_config.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job.body) + else: + job = report.get_queued_job(machine_type) + if job is None: + continue + job = clean_config(job) + report.try_push_job_info(job, dict(status='running')) + job_id = job.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job) + job_config = job + if job_config.get('stop_worker'): keep_running = False @@ -205,6 +219,13 @@ def main(args): status='fail', failure_reason=error_message)) + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + if backend == 'beanstalk': + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") return worst_returncode @@ -373,7 +394,7 @@ def create_job_archive(job_name, job_archive_path, archive_dir): def pause_queue(machine_type, paused, paused_by, pause_duration=None): - if paused == True: + if paused: report.pause_queue(machine_type, paused, paused_by, pause_duration) ''' If there is a pause duration specified @@ -383,5 +404,5 @@ def pause_queue(machine_type, paused, paused_by, pause_duration=None): sleep(int(pause_duration)) paused = False report.pause_queue(machine_type, paused, paused_by) - elif paused == False: + elif not paused: report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 72bca76049..55512dcf23 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -79,6 +79,12 @@ def main(args): def run_job(job_config, teuth_bin_path, archive_dir, verbose): safe_archive = safepath.munge(job_config['name']) if job_config.get('first_in_suite') or job_config.get('last_in_suite'): + if teuth_config.results_server: + try: + report.try_delete_jobs(job_config['name'], job_config['job_id']) + except Exception as e: + log.warning("Unable to delete job %s, exception occurred: %s", + job_config['job_id'], e) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), diff --git a/teuthology/kill.py b/teuthology/kill.py index 1b0d247819..cf71e8553e 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -13,6 +13,9 @@ from teuthology import report from teuthology.lock import ops as lock_ops +from teuthology import beanstalk +from teuthology.config import config +from teuthology import misc log = logging.getLogger(__name__) @@ -66,6 +69,7 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None, "you must also pass --machine-type") if not preserve_queue: + remove_beanstalk_jobs(run_name, machine_type) remove_paddles_jobs(run_name) if kill_processes(run_name, run_info.get('pids')): return @@ -115,10 +119,13 @@ def find_run_info(serializer, run_name): job_info = {} job_num = 0 jobs = serializer.jobs_for_run(run_name) + job_total = len(jobs) for (job_id, job_dir) in jobs.items(): if not os.path.isdir(job_dir): continue job_num += 1 + if config.backend == 'beanstalk': + beanstalk.print_progress(job_num, job_total, 'Reading Job: ') job_info = serializer.job_info(run_name, job_id, simple=True) for key in job_info.keys(): if key in run_info_fields and key not in run_info: @@ -137,6 +144,41 @@ def remove_paddles_jobs(run_name): report.try_delete_jobs(run_name, job_ids) +def remove_beanstalk_jobs(run_name, tube_name): + qhost = config.queue_host + qport = config.queue_port + if qhost is None or qport is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.yaml_path)) + log.info("Checking Beanstalk Queue...") + beanstalk_conn = beanstalk.connect() + real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name) + + curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready'] + if curjobs != 0: + x = 1 + while x != curjobs: + x += 1 + job = beanstalk_conn.reserve(timeout=20) + if job is None: + continue + job_config = yaml.safe_load(job.body) + if run_name == job_config['name']: + job_id = job_config['job_id'] + msg = "Deleting job from queue. ID: " + \ + "{id} Name: {name} Desc: {desc}".format( + id=str(job_id), + name=job_config['name'], + desc=job_config['description'], + ) + log.info(msg) + job.delete() + else: + print("No jobs in Beanstalk Queue") + beanstalk_conn.close() + + def kill_processes(run_name, pids=None): if pids: to_kill = set(pids).intersection(psutil.pids()) diff --git a/teuthology/orchestra/run.py b/teuthology/orchestra/run.py index bf6a069533..23bed6d170 100644 --- a/teuthology/orchestra/run.py +++ b/teuthology/orchestra/run.py @@ -182,7 +182,6 @@ def _raise_for_status(self): command=self.command, exitstatus=self.returncode, node=self.hostname, label=self.label ) - def _get_exitstatus(self): """ :returns: the remote command's exit status (return code). Note that diff --git a/teuthology/report.py b/teuthology/report.py index f675145b58..947c43bcbd 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -386,13 +386,13 @@ def last_run(self): if os.path.exists(self.last_run_file): os.remove(self.last_run_file) - def get_top_job(self, machine_type): + def get_top_job(self, queue): - uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri, - machine_type=machine_type) + uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri, + queue=queue) inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'get job from {machine_type}') as proceed: + sleep=1, increment=inc, action=f'get job from {queue}') as proceed: while proceed(): response = self.session.get(uri) if response.status_code == 200: @@ -504,7 +504,7 @@ def delete_run(self, run_name): response = self.session.delete(uri) response.raise_for_status() - def create_queue(self, machine_type): + def create_queue(self, queue): """ Create a queue on the results server @@ -513,19 +513,19 @@ def create_queue(self, machine_type): uri = "{base}/queue/".format( base=self.base_uri ) - queue_info = {'machine_type': machine_type} + queue_info = {'queue': queue} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'creating queue {machine_type}') as proceed: + sleep=1, increment=inc, action=f'creating queue {queue}') as proceed: while proceed(): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully created queue for {machine_type}".format( - machine_type=machine_type, + self.log.info("Successfully created queue {queue}".format( + queue=queue, )) return else: @@ -545,26 +545,26 @@ def create_queue(self, machine_type): response.raise_for_status() - def update_queue(self, machine_type, paused, paused_by, pause_duration=None): + def update_queue(self, queue, paused, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) if pause_duration is not None: pause_duration = int(pause_duration) - queue_info = {'machine_type': machine_type, 'paused': paused, 'paused_by': paused_by, + queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by, 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'updating queue {machine_type}') as proceed: + sleep=1, increment=inc, action=f'updating queue {queue}') as proceed: while proceed(): response = self.session.put(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully updated queue for {machine_type}".format( - machine_type=machine_type, + self.log.info("Successfully updated queue {queue}".format( + queue=queue, )) return else: @@ -579,23 +579,23 @@ def update_queue(self, machine_type, paused, paused_by, pause_duration=None): response.raise_for_status() - def queue_stats(self, machine_type): + def queue_stats(self, queue): uri = "{base}/queue/stats/".format( base=self.base_uri ) - queue_info = {'machine_type': machine_type} + queue_info = {'queue': queue} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) with safe_while( - sleep=1, increment=inc, action=f'stats for queue {machine_type}') as proceed: + sleep=1, increment=inc, action=f'stats for queue {queue}') as proceed: while proceed(): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {machine_type}".format( - machine_type=machine_type, + self.log.info("Successfully retrieved stats for queue {queue}".format( + queue=queue, )) return response.json() else: @@ -608,18 +608,18 @@ def queue_stats(self, machine_type): )) response.raise_for_status() - def queued_jobs(self, machine_type, user, run_name): + def queued_jobs(self, queue, user, run_name): uri = "{base}/queue/queued_jobs/".format( base=self.base_uri ) + request_info = {'queue': queue} if run_name is not None: filter_field = run_name - request_info = {'machine_type': machine_type, - 'run_name': run_name} + uri += "?run_name=" + str(run_name) else: filter_field = user - request_info = {'machine_type': machine_type, - 'user': user} + uri += "?user=" + str(user) + request_json = json.dumps(request_info) headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) @@ -644,11 +644,14 @@ def queued_jobs(self, machine_type, user, run_name): response.raise_for_status() -def create_machine_type_queue(machine_type): +def create_machine_type_queue(queue): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.create_queue(machine_type) + if ',' in queue: + queue = 'multi' + reporter.create_queue(queue) + return queue def get_user_jobs_queue(machine_type, user, run_name=None): @@ -714,12 +717,16 @@ def get_queued_job(machine_type): reporter = ResultsReporter() if not reporter.base_uri: return - if is_queue_paused(machine_type) == True: - log.info("Teuthology queue for machine type %s is currently paused", - machine_type) + if ',' in machine_type: + queue = 'multi' + else: + queue = machine_type + if is_queue_paused(queue) == True: + log.info("Teuthology queue %s is currently paused", + queue) return None else: - return reporter.get_top_job(machine_type) + return reporter.get_top_job(queue) def try_push_job_info(job_config, extra_info=None): diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 4ccf780845..81dd4d548f 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,6 +1,7 @@ import os import yaml +import teuthology.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -22,11 +23,6 @@ def main(args): if args[opt]: raise ValueError(msg_fmt.format(opt=opt)) - if args['--first-in-suite'] or args['--last-in-suite']: - report_status = False - else: - report_status = True - name = args['--name'] if not name or name.isdigit(): raise ValueError("Please use a more descriptive value for --name") @@ -34,13 +30,15 @@ def main(args): backend = args['--queue-backend'] if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) - elif backend == 'paddles': - schedule_job(job_config, args['--num'], report_status) elif backend.startswith('@'): dump_job_to_file(backend.lstrip('@'), job_config, args['--num']) + elif backend == 'paddles': + paddles_schedule_job(job_config, args['--num']) + elif backend == 'beanstalk': + beanstalk_schedule_job(job_config, args['--num']) else: raise ValueError("Provided schedule backend '%s' is not supported. " - "Try 'paddles' or '@path-to-a-file" % backend) + "Try 'paddles', 'beanstalk' or '@path-to-a-file" % backend) def build_config(args): @@ -86,9 +84,9 @@ def build_config(args): return job_config -def schedule_job(job_config, num=1, report_status=True): +def paddles_schedule_job(job_config, backend, num=1): """ - Schedule a job. + Schedule a job with Paddles as the backend. :param job_config: The complete job dict :param num: The number of times to schedule the job @@ -97,16 +95,44 @@ def schedule_job(job_config, num=1, report_status=True): ''' Add 'machine_type' queue to DB here. ''' - report.create_machine_type_queue(job_config['machine_type']) + queue = report.create_machine_type_queue(job_config['machine_type']) + job_config['queue'] = queue while num > 0: - job_id = report.try_create_job(job_config, dict(status='queued')) - print('Job scheduled with name {name} and ID {job_id}'.format( + print('Job scheduled in Paddles with name {name} and ID {job_id}'.format( name=job_config['name'], job_id=job_id)) job_config['job_id'] = str(job_id) - + + num -= 1 + + +def beanstalk_schedule_job(job_config, backend, num=1): + """ + Schedule a job with Beanstalk as the backend. + + :param job_config: The complete job dict + :param num: The number of times to schedule the job + """ + num = int(num) + tube = job_config.pop('tube') + beanstalk = teuthology.beanstalk.connect() + beanstalk.use(tube) + queue = report.create_machine_type_queue(job_config['machine_type']) + job_config['queue'] = queue + while num > 0: + job_id = report.try_create_job(job_config, dict(status='queued')) + job_config['job_id'] = str(job_id) + job = yaml.safe_dump(job_config) + _ = beanstalk.put( + job, + ttr=60 * 60 * 24, + priority=job_config['priority'], + ) + print('Job scheduled in Beanstalk with name {name} and ID {job_id}'.format( + name=job_config['name'], job_id=job_id)) num -= 1 + def dump_job_to_file(path, job_config, num=1): """ Schedule a job. @@ -134,4 +160,3 @@ def dump_job_to_file(path, job_config, num=1): num -= 1 with open(count_file_path, 'w') as f: f.write(str(jid)) - diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py index 6b0dddfe2f..9a6d0ff564 100644 --- a/teuthology/test/test_dispatcher.py +++ b/teuthology/test/test_dispatcher.py @@ -65,7 +65,8 @@ def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, '--description': 'the_description', '--machine-type': 'test_queue', '--supervisor': False, - '--verbose': False + '--verbose': False, + '--queue-backend': 'paddles' } m = mock.MagicMock() From 74bd369f06a9e635f6e3c76784225224b9ef5c63 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 18 Apr 2022 13:19:40 +0530 Subject: [PATCH 6/7] scripts/queue: common teuthology-queue command for paddles and beanstalk queue Signed-off-by: Aishwarya Mathuria --- scripts/beanstalk_queue.py | 2 +- scripts/paddles_queue.py | 3 +- scripts/queue.py | 37 +++++ teuthology/config.py | 2 +- teuthology/kill.py | 1 + teuthology/paddles_queue.py | 218 ---------------------------- teuthology/queue/__init__.py | 106 ++++++++++++++ teuthology/{ => queue}/beanstalk.py | 96 ------------ teuthology/queue/paddles.py | 88 +++++++++++ teuthology/report.py | 28 ++-- 10 files changed, 253 insertions(+), 328 deletions(-) create mode 100644 scripts/queue.py delete mode 100644 teuthology/paddles_queue.py create mode 100644 teuthology/queue/__init__.py rename teuthology/{ => queue}/beanstalk.py (56%) create mode 100644 teuthology/queue/paddles.py diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py index 88a8242847..a8a0661ecf 100644 --- a/scripts/beanstalk_queue.py +++ b/scripts/beanstalk_queue.py @@ -1,7 +1,7 @@ import docopt import teuthology.config -import teuthology.beanstalk +import teuthology.queue.beanstalk doc = """ usage: teuthology-beanstalk-queue -h diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py index 3c69d772e6..8487fd938e 100644 --- a/scripts/paddles_queue.py +++ b/scripts/paddles_queue.py @@ -1,8 +1,7 @@ import docopt import teuthology.config -import teuthology.paddles_queue - +import teuthology.queue.paddles_queue doc = """ usage: teuthology-paddles-queue -h teuthology-paddles-queue -s -m MACHINE_TYPE diff --git a/scripts/queue.py b/scripts/queue.py new file mode 100644 index 0000000000..2c466a7be9 --- /dev/null +++ b/scripts/queue.py @@ -0,0 +1,37 @@ +import docopt + +import teuthology.config +import teuthology.queue.beanstalk +import teuthology.queue.paddles + +doc = """ +usage: teuthology-queue -h + teuthology-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-queue [-r] -m MACHINE_TYPE + teuthology-queue -m MACHINE_TYPE -D PATTERN + teuthology-queue -p SECONDS [-m MACHINE_TYPE] + +List Jobs in queue. +If -D is passed, then jobs with PATTERN in the job name are deleted from the +queue. + +Arguments: + -m, --machine_type MACHINE_TYPE [default: multi] + Which machine type queue to work on. + +optional arguments: + -h, --help Show this help message and exit + -D, --delete PATTERN Delete Jobs with PATTERN in their name + -d, --description Show job descriptions + -r, --runs Only show run names + -f, --full Print the entire job config. Use with caution. + -s, --status Prints the status of the queue + -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 + will unpause. If -m is passed, pause that queue, + otherwise pause all queues. +""" + + +def main(): + args = docopt.docopt(doc) + teuthology.queue.main(args) diff --git a/teuthology/config.py b/teuthology/config.py index 0f5912e87f..d9216b24ab 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -144,7 +144,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, - 'backend': 'paddles', + 'backend': 'beanstalk', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/kill.py b/teuthology/kill.py index cf71e8553e..8ce4ddad5a 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -11,6 +11,7 @@ import teuthology.exporter +from teuthology.queue import beanstalk from teuthology import report from teuthology.lock import ops as lock_ops from teuthology import beanstalk diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py deleted file mode 100644 index 99cfe77e6e..0000000000 --- a/teuthology/paddles_queue.py +++ /dev/null @@ -1,218 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.dispatcher import pause_queue - - -log = logging.getLogger(__name__) - - -def connect(): - host = config.queue_host - port = config.queue_port - if host is None or port is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.teuthology_yaml)) - return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load) - - -def watch_tube(connection, tube_name): - """ - Watch a given tube, potentially correcting to 'multi' if necessary. Returns - the tube_name that was actually used. - """ - if ',' in tube_name: - log.debug("Correcting tube name to 'multi'") - tube_name = 'multi' - connection.watch(tube_name) - connection.ignore('default') - return tube_name - - -def walk_jobs(connection, tube_name, processor, pattern=None): - """ - def callback(jobs_dict) - """ - log.info("Checking Beanstalk Queue...") - job_count = connection.stats_tube(tube_name)['current-jobs-ready'] - if job_count == 0: - log.info('No jobs in Beanstalk Queue') - return - -def stats_queue(machine_type): - stats = report.get_queue_stats(machine_type) - if stats['paused'] is None: - log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) - else: - log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) - - -def update_priority(machine_type, priority, user, run_name=None): - if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) - for job in jobs: - job['priority'] = priority - report.try_push_job_info(job) - - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - - -def walk_jobs(machine_type, processor, user): - log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] - - jobs = report.get_user_jobs_queue(machine_type, user) - if job_count == 0: - log.info('No jobs in queue') - return - - for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") - job = jobs[i-1] - if job is None: - continue - job_id = job['job_id'] - processor.add_job(job_id, job) - end_progress() - processor.complete() - - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - machine_type = args['--machine_type'] - user = args['--user'] - run_name = args['--run_name'] - priority = args['--priority'] - status = args['--status'] - delete = args['--delete'] - runs = args['--runs'] - show_desc = args['--description'] - full = args['--full'] - pause = args['--pause'] - unpause = args['--unpause'] - pause_duration = args['--time'] - try: - if status: - stats_queue(machine_type) - elif pause: - if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - else: - pause_queue(machine_type, pause, user) - elif unpause: - pause = False - pause_queue(machine_type, pause, user) - elif priority: - update_priority(machine_type, priority, user, run_name) - elif delete: - walk_jobs(machine_type, - JobDeleter(delete), user) - elif runs: - walk_jobs(machine_type, - RunPrinter(), user) - else: - walk_jobs(machine_type, - JobPrinter(show_desc=show_desc, full=full), - user) - except KeyboardInterrupt: - log.info("Interrupted.") diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py new file mode 100644 index 0000000000..2a0b6ff363 --- /dev/null +++ b/teuthology/queue/__init__.py @@ -0,0 +1,106 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report +from teuthology.config import config + +log = logging.getLogger(__name__) + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) + + +def main(args): + if config.backend == 'paddles': + paddles.main(args) + else: + beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/beanstalk.py b/teuthology/queue/beanstalk.py similarity index 56% rename from teuthology/beanstalk.py rename to teuthology/queue/beanstalk.py index a1165becca..90b1cbd6d3 100644 --- a/teuthology/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@ -61,102 +61,6 @@ def callback(jobs_dict) processor.complete() -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - job_obj = self.jobs[job_id].get('job_obj') - if job_obj: - job_obj.delete() - report.try_delete_jobs(job_name, job_id) - - def pause_tube(connection, tube, duration): duration = int(duration) if not tube: diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py new file mode 100644 index 0000000000..f2ea8b84c8 --- /dev/null +++ b/teuthology/queue/paddles.py @@ -0,0 +1,88 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report +from teuthology.dispatcher import pause_queue + + +log = logging.getLogger(__name__) + + +def stats_queue(machine_type): + stats = report.get_queue_stats(machine_type) + if stats['paused'] is None: + log.info("%s queue is currently running with %s jobs queued", + stats['name'], + stats['count']) + else: + log.info("%s queue is paused with %s jobs queued", + stats['name'], + stats['count']) + + +def update_priority(machine_type, priority, user, run_name=None): + if run_name is not None: + jobs = report.get_user_jobs_queue(machine_type, user, run_name) + else: + jobs = report.get_user_jobs_queue(machine_type, user) + for job in jobs: + job['priority'] = priority + report.try_push_job_info(job) + + +def walk_jobs(machine_type, processor, user): + log.info("Checking paddles queue...") + job_count = report.get_queue_stats(machine_type)['count'] + + jobs = report.get_user_jobs_queue(machine_type, user) + if job_count == 0: + log.info('No jobs in queue') + return + + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = jobs[i-1] + if job is None: + continue + job_id = job['job_id'] + processor.add_job(job_id, job) + end_progress() + processor.complete() + + +def main(args): + machine_type = args['--machine_type'] + #user = args['--user'] + #run_name = args['--run_name'] + #priority = args['--priority'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + #unpause = args['--unpause'] + #pause_duration = args['--time'] + try: + if status: + stats_queue(machine_type) + if pause_duration: + pause_queue(machine_type, pause, user, pause_duration) + #else: + #pause_queue(machine_type, pause, user) + elif priority: + update_priority(machine_type, priority, run_name) + elif delete: + walk_jobs(machine_type, + JobDeleter(delete), user) + elif runs: + walk_jobs(machine_type, + RunPrinter(), user) + else: + walk_jobs(machine_type, + JobPrinter(show_desc=show_desc, full=full), + user) + except KeyboardInterrupt: + log.info("Interrupted.") diff --git a/teuthology/report.py b/teuthology/report.py index 947c43bcbd..5cb4e854cb 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -508,7 +508,7 @@ def create_queue(self, queue): """ Create a queue on the results server - :param machine_type: The machine type specified for the job + :param queue: The queue specified for the job """ uri = "{base}/queue/".format( base=self.base_uri @@ -613,10 +613,11 @@ def queued_jobs(self, queue, user, run_name): base=self.base_uri ) request_info = {'queue': queue} + filter_field = queue if run_name is not None: filter_field = run_name uri += "?run_name=" + str(run_name) - else: + elif user is not None: filter_field = user uri += "?user=" + str(user) @@ -653,36 +654,43 @@ def create_machine_type_queue(queue): reporter.create_queue(queue) return queue +def get_all_jobs_in_queue(queue, user=None, run_name=None): + reporter = ResultsReporter() + if not reporter.base_uri: + return + if ',' in queue: + queue = 'multi' + return reporter.queued_jobs(queue) -def get_user_jobs_queue(machine_type, user, run_name=None): +def get_user_jobs_queue(queue, user, run_name=None): reporter = ResultsReporter() if not reporter.base_uri: return - return reporter.queued_jobs(machine_type, user, run_name) + return reporter.queued_jobs(queue, user, run_name) -def pause_queue(machine_type, paused, paused_by, pause_duration=None): +def pause_queue(queue, paused, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(machine_type, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused, paused_by, pause_duration) -def is_queue_paused(machine_type): +def is_queue_paused(queue): reporter = ResultsReporter() if not reporter.base_uri: return - stats = reporter.queue_stats(machine_type) + stats = reporter.queue_stats(queue) if stats['paused'] != 0 and stats['paused'] is not None: return True return False -def get_queue_stats(machine_type): +def get_queue_stats(queue): reporter = ResultsReporter() if not reporter.base_uri: return - stats = reporter.queue_stats(machine_type) + stats = reporter.queue_stats(queue) return stats From be34af23de8479978ae338f8e9c06aa46f5c3b81 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 4 May 2022 19:37:40 +0530 Subject: [PATCH 7/7] teuthology/queue: Single command for queue operations Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk. Signed-off-by: Aishwarya Mathuria --- scripts/dispatcher.py | 1 - scripts/paddles_queue.py | 45 ------------ scripts/queue.py | 15 +++- teuthology/dispatcher/__init__.py | 7 +- teuthology/dispatcher/supervisor.py | 7 +- teuthology/kill.py | 2 - teuthology/queue/__init__.py | 106 ---------------------------- teuthology/queue/beanstalk.py | 16 ++--- teuthology/queue/paddles.py | 49 ++++++------- teuthology/queue/util.py | 101 ++++++++++++++++++++++++++ teuthology/report.py | 20 +++--- teuthology/schedule.py | 4 +- teuthology/test/test_dispatcher.py | 3 +- 13 files changed, 165 insertions(+), 211 deletions(-) delete mode 100644 scripts/paddles_queue.py create mode 100644 teuthology/queue/util.py diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index fffdb634b8..63779f1246 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -26,7 +26,6 @@ --queue-backend BACKEND choose between paddles and beanstalk """ -import docopt import sys import teuthology.dispatcher.supervisor diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py deleted file mode 100644 index 8487fd938e..0000000000 --- a/scripts/paddles_queue.py +++ /dev/null @@ -1,45 +0,0 @@ -import docopt - -import teuthology.config -import teuthology.queue.paddles_queue -doc = """ -usage: teuthology-paddles-queue -h - teuthology-paddles-queue -s -m MACHINE_TYPE - teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -u -m MACHINE_TYPE -U USER - -List Jobs in queue. -If -D is passed, then jobs with PATTERN in the job name are deleted from the -queue. - -Arguments: - -m, --machine_type MACHINE_TYPE - Which machine type queue to work on. - -optional arguments: - -h, --help Show this help message and exit - -D, --delete PATTERN Delete Jobs with PATTERN in their name - -d, --description Show job descriptions - -r, --runs Only show run names - -f, --full Print the entire job config. Use with caution. - -s, --status Prints the status of the queue - -t, --time SECONDS Pause queues for a number of seconds. - If -m is passed, pause that queue, - otherwise pause all queues. - -p, --pause Pause queue - -u, --unpause Unpause queue - -P, --priority PRIORITY - Change priority of queued jobs - -U, --user USER User who owns the jobs - -R, --run-name RUN_NAME - Used to change priority of all jobs in the run. -""" - - -def main(): - args = docopt.docopt(doc) - teuthology.paddles_queue.main(args) diff --git a/scripts/queue.py b/scripts/queue.py index 2c466a7be9..1d9112c22e 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,15 +1,16 @@ import docopt -import teuthology.config import teuthology.queue.beanstalk import teuthology.queue.paddles +from teuthology.config import config doc = """ usage: teuthology-queue -h teuthology-queue [-s|-d|-f] -m MACHINE_TYPE teuthology-queue [-r] -m MACHINE_TYPE teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER] + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -29,9 +30,17 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs (only in Paddles queues) + -U, --user USER User who owns the jobs + -R, --run-name RUN_NAME + Used to change priority of all jobs in the run. """ def main(): args = docopt.docopt(doc) - teuthology.queue.main(args) + if config.backend == 'beanstalk': + teuthology.queue.beanstalk.main(args) + else: + teuthology.queue.paddles.main(args) diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 47e90797b3..3ec1153fdf 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -84,9 +84,7 @@ def main(args): "There is already a teuthology-dispatcher process running:" f" {procs}" ) - verbose = args["--verbose"] machine_type = args["--machine-type"] - log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] exit_on_empty_queue = args["--exit-on-empty-queue"] backend = args['--queue-backend'] @@ -111,6 +109,8 @@ def main(args): if backend == 'beanstalk': connection = beanstalk.connect() beanstalk.watch_tube(connection, machine_type) + elif backend == 'paddles': + report.create_machine_type_queue(machine_type) result_proc = None @@ -152,6 +152,9 @@ def main(args): else: job = report.get_queued_job(machine_type) if job is None: + if exit_on_empty_queue and not job_procs: + log.info("Queue is empty and no supervisor processes running; exiting!") + break continue job = clean_config(job) report.try_push_job_info(job, dict(status='running')) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 55512dcf23..5dd0e4e4ca 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -82,9 +82,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): if teuth_config.results_server: try: report.try_delete_jobs(job_config['name'], job_config['job_id']) - except Exception as e: - log.warning("Unable to delete job %s, exception occurred: %s", - job_config['job_id'], e) + except Exception: + log.exception("Unable to delete job %s", job_config['job_id']) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), @@ -142,7 +141,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if 'description' in job_config: + if job_config.get('description') is not None: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/kill.py b/teuthology/kill.py index 8ce4ddad5a..2e464d6713 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -14,9 +14,7 @@ from teuthology.queue import beanstalk from teuthology import report from teuthology.lock import ops as lock_ops -from teuthology import beanstalk from teuthology.config import config -from teuthology import misc log = logging.getLogger(__name__) diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py index 2a0b6ff363..e69de29bb2 100644 --- a/teuthology/queue/__init__.py +++ b/teuthology/queue/__init__.py @@ -1,106 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.config import config - -log = logging.getLogger(__name__) - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - if config.backend == 'paddles': - paddles.main(args) - else: - beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py index 90b1cbd6d3..c668e4f6bc 100644 --- a/teuthology/queue/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@ -1,12 +1,10 @@ import beanstalkc import yaml import logging -import pprint -import sys -from collections import OrderedDict from teuthology.config import config -from teuthology import report +from teuthology.queue import util + log = logging.getLogger(__name__) @@ -47,7 +45,7 @@ def callback(jobs_dict) # Try to figure out a sane timeout based on how many jobs are in the queue timeout = job_count / 2000.0 * 60 for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = connection.reserve(timeout=timeout) if job is None or job.body is None: continue @@ -57,7 +55,7 @@ def callback(jobs_dict) if pattern is not None and pattern not in job_name: continue processor.add_job(job_id, job_config, job) - end_progress() + util.end_progress() processor.complete() @@ -105,13 +103,13 @@ def main(args): pause_tube(connection, machine_type, pause_duration) elif delete: walk_jobs(connection, machine_type, - JobDeleter(delete)) + util.JobDeleter(delete)) elif runs: walk_jobs(connection, machine_type, - RunPrinter()) + util.RunPrinter()) else: walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) + util.JobPrinter(show_desc=show_desc, full=full)) except KeyboardInterrupt: log.info("Interrupted.") finally: diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py index f2ea8b84c8..489d638e2f 100644 --- a/teuthology/queue/paddles.py +++ b/teuthology/queue/paddles.py @@ -1,11 +1,7 @@ import logging -import pprint -import sys -from collections import OrderedDict from teuthology import report -from teuthology.dispatcher import pause_queue - +from teuthology.queue import util log = logging.getLogger(__name__) @@ -14,19 +10,17 @@ def stats_queue(machine_type): stats = report.get_queue_stats(machine_type) if stats['paused'] is None: log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) else: log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) -def update_priority(machine_type, priority, user, run_name=None): +def update_priority(machine_type, priority, run_name=None): if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) + jobs = report.get_jobs_by_run(machine_type, run_name) for job in jobs: job['priority'] = priority report.try_push_job_info(job) @@ -34,55 +28,54 @@ def update_priority(machine_type, priority, user, run_name=None): def walk_jobs(machine_type, processor, user): log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] + job_count = report.get_queue_stats(machine_type)['queued_jobs'] jobs = report.get_user_jobs_queue(machine_type, user) if job_count == 0: - log.info('No jobs in queue') + log.info('No jobs in Paddles queue') return for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = jobs[i-1] if job is None: continue job_id = job['job_id'] processor.add_job(job_id, job) - end_progress() + util.end_progress() processor.complete() def main(args): machine_type = args['--machine_type'] - #user = args['--user'] - #run_name = args['--run_name'] - #priority = args['--priority'] + user = args['--user'] + run_name = args['--run-name'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] show_desc = args['--description'] full = args['--full'] pause_duration = args['--pause'] - #unpause = args['--unpause'] - #pause_duration = args['--time'] + priority = args['--priority'] try: if status: stats_queue(machine_type) if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - #else: - #pause_queue(machine_type, pause, user) + if not user: + log.info('Please enter user to pause Paddles queue') + return + report.pause_queue(machine_type, user, pause_duration) elif priority: update_priority(machine_type, priority, run_name) elif delete: walk_jobs(machine_type, - JobDeleter(delete), user) + util.JobDeleter(delete), user) elif runs: walk_jobs(machine_type, - RunPrinter(), user) + util.RunPrinter(), user) else: walk_jobs(machine_type, - JobPrinter(show_desc=show_desc, full=full), + util.JobPrinter(show_desc=show_desc, full=full), user) except KeyboardInterrupt: log.info("Interrupted.") diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py new file mode 100644 index 0000000000..2a7642e726 --- /dev/null +++ b/teuthology/queue/util.py @@ -0,0 +1,101 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report + +log = logging.getLogger(__name__) + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) diff --git a/teuthology/report.py b/teuthology/report.py index 5cb4e854cb..079b561f44 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -388,7 +388,7 @@ def last_run(self): def get_top_job(self, queue): - uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri, + uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri, queue=queue) inc = random.uniform(0, 1) with safe_while( @@ -545,13 +545,14 @@ def create_queue(self, queue): response.raise_for_status() - def update_queue(self, queue, paused, paused_by, pause_duration=None): + def update_queue(self, queue, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) + if pause_duration is not None: pause_duration = int(pause_duration) - queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by, + queue_info = {'queue': queue, 'paused_by': paused_by, 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} @@ -594,9 +595,6 @@ def queue_stats(self, queue): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {queue}".format( - queue=queue, - )) return response.json() else: msg = response.text @@ -668,12 +666,18 @@ def get_user_jobs_queue(queue, user, run_name=None): return return reporter.queued_jobs(queue, user, run_name) +def get_jobs_by_run(queue, run_name): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(queue, None, run_name) + -def pause_queue(queue, paused, paused_by, pause_duration=None): +def pause_queue(queue, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(queue, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused_by, pause_duration) def is_queue_paused(queue): diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 81dd4d548f..3a370e86db 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,7 +1,7 @@ import os import yaml -import teuthology.beanstalk +import teuthology.queue.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -115,7 +115,7 @@ def beanstalk_schedule_job(job_config, backend, num=1): """ num = int(num) tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() + beanstalk = teuthology.queue.beanstalk.connect() beanstalk.use(tube) queue = report.create_machine_type_queue(job_config['machine_type']) job_config['queue'] = queue diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py index 9a6d0ff564..16c61618a0 100644 --- a/teuthology/test/test_dispatcher.py +++ b/teuthology/test/test_dispatcher.py @@ -66,7 +66,8 @@ def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, '--machine-type': 'test_queue', '--supervisor': False, '--verbose': False, - '--queue-backend': 'paddles' + '--queue-backend': 'paddles', + '--exit-on-empty-queue': False } m = mock.MagicMock()