Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
teuthology/queue: Single command for queue operations
Browse files Browse the repository at this point in the history
Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk.

Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
amathuria committed Sep 7, 2022
1 parent 172089c commit a6c7093
Showing 17 changed files with 192 additions and 364 deletions.
35 changes: 0 additions & 35 deletions scripts/beanstalk_queue.py

This file was deleted.

9 changes: 5 additions & 4 deletions scripts/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND
teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] [--queue-backend BACKEND] --log-dir LOG_DIR --tube TUBE
Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk
Start a dispatcher for the specified tube. Grab jobs from a paddles/beanstalk
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.
@@ -17,12 +17,13 @@
-v, --verbose be more verbose
-l, --log-dir LOG_DIR path in which to store logs
-a DIR, --archive-dir DIR path to archive results in
--machine-type MACHINE_TYPE the machine type for the job
-t, --tube TUBE which queue to read jobs from
--supervisor run dispatcher in job supervisor mode
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
--exit-on-empty-queue if the queue is empty, exit
--queue-backend BACKEND choose between paddles and beanstalk
--queue-backend BACKEND which backend will be used for the queue
[default: beanstalk]
"""

import docopt
2 changes: 1 addition & 1 deletion scripts/kill.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN
Kill running teuthology jobs:
1. Removes any queued jobs from the paddles queue
1. Removes any queued jobs from the queue
2. Kills any running jobs
3. Nukes any machines involved
45 changes: 0 additions & 45 deletions scripts/paddles_queue.py

This file was deleted.

17 changes: 13 additions & 4 deletions scripts/queue.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import docopt

import teuthology.config
import teuthology.queue.beanstalk
import teuthology.queue.paddles
from teuthology.config import config

doc = """
usage: teuthology-queue -h
teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-queue [-r] -m MACHINE_TYPE
teuthology-queue -m MACHINE_TYPE -D PATTERN
teuthology-queue -p SECONDS [-m MACHINE_TYPE]
teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER]
teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
@@ -29,9 +30,17 @@
-p, --pause SECONDS Pause queues for a number of seconds. A value of 0
will unpause. If -m is passed, pause that queue,
otherwise pause all queues.
-P, --priority PRIORITY
Change priority of queued jobs (only in Paddles queues)
-U, --user USER User who owns the jobs
-R, --run-name RUN_NAME
Used to change priority of all jobs in the run.
"""


def main():
args = docopt.docopt(doc)
teuthology.queue.main(args)
if config.backend == 'beanstalk':
teuthology.queue.beanstalk.main(args)
else:
teuthology.queue.paddles.main(args)
1 change: 0 additions & 1 deletion scripts/schedule.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@
Queue backend name, use prefix '@'
to append job config to the given
file path as yaml.
[default: paddles]
-n <name>, --name <name> Name of suite run the job is part of
-d <desc>, --description <desc> Job description
-o <owner>, --owner <owner> Job owner
6 changes: 3 additions & 3 deletions scripts/worker.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ def main():

def parse_args():
parser = argparse.ArgumentParser(description="""
Grab jobs from a paddles queue and run the teuthology tests they
Grab jobs from a beanstalk queue and run the teuthology tests they
describe. One job is run at a time.
""")
parser.add_argument(
@@ -29,8 +29,8 @@ def parse_args():
required=True,
)
parser.add_argument(
'-m', '--machine-type',
help='which machine type the jobs will run on',
'-t', '--tube',
help='which beanstalk tube to read jobs from',
required=True,
)

32 changes: 10 additions & 22 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@
import yaml

from datetime import datetime
from time import sleep

from teuthology import setup_log_file, install_except_hook
from teuthology.queue import beanstalk
@@ -70,31 +69,32 @@ def main(args):
return supervisor.main(args)

verbose = args["--verbose"]
machine_type = args["--machine-type"]
tube = args["--tube"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
exit_on_empty_queue = args["--exit-on-empty-queue"]
backend = args['--queue-backend']

if backend is None:
backend = 'beanstalk'

if archive_dir is None:
archive_dir = teuth_config.archive_base

if machine_type is None and teuth_config.machine_type is None:
return
# setup logging for disoatcher in {log_dir}
loglevel = logging.INFO
if verbose:
loglevel = logging.DEBUG
log.setLevel(loglevel)
log_file_path = os.path.join(log_dir, f"dispatcher.{machine_type}.{os.getpid()}")
log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
setup_log_file(log_file_path)
install_except_hook()

load_config(archive_dir=archive_dir)

if backend == 'beanstalk':
connection = beanstalk.connect()
beanstalk.watch_tube(connection, machine_type)
beanstalk.watch_tube(connection, tube)

result_proc = None

@@ -129,8 +129,11 @@ def main(args):
log.info('Reserved job %s', job_id)
log.info('Config is: %s', job.body)
else:
job = report.get_queued_job(machine_type)
job = report.get_queued_job(tube)
if job is None:
if exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue
job = clean_config(job)
report.try_push_job_info(job, dict(status='running'))
@@ -220,18 +223,3 @@ def create_job_archive(job_name, job_archive_path, archive_dir):
if not os.path.exists(run_archive):
safepath.makedirs('/', run_archive)
safepath.makedirs('/', job_archive_path)


def pause_queue(machine_type, paused, paused_by, pause_duration=None):
if paused:
report.pause_queue(machine_type, paused, paused_by, pause_duration)
'''
If there is a pause duration specified
un-pause the queue after the time elapses
'''
if pause_duration is not None:
sleep(int(pause_duration))
paused = False
report.pause_queue(machine_type, paused, paused_by)
elif not paused:
report.pause_queue(machine_type, paused, paused_by)
7 changes: 3 additions & 4 deletions teuthology/dispatcher/supervisor.py
Original file line number Diff line number Diff line change
@@ -70,9 +70,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
if teuth_config.results_server:
try:
report.try_delete_jobs(job_config['name'], job_config['job_id'])
except Exception as e:
log.warning("Unable to delete job %s, exception occurred: %s",
job_config['job_id'], e)
except Exception:
log.exception("Unable to delete job %s", job_config['job_id'])
job_archive = os.path.join(archive_dir, safe_archive)
args = [
os.path.join(teuth_bin_path, 'teuthology-results'),
@@ -130,7 +129,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
'--archive', job_config['archive_path'],
'--name', job_config['name'],
])
if 'description' in job_config:
if job_config.get('description') is not None:
arg.extend(['--description', job_config['description']])
job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml')
arg.extend(['--', job_archive])
1 change: 1 addition & 0 deletions teuthology/orchestra/run.py
Original file line number Diff line number Diff line change
@@ -182,6 +182,7 @@ def _raise_for_status(self):
command=self.command, exitstatus=self.returncode,
node=self.hostname, label=self.label
)

def _get_exitstatus(self):
"""
:returns: the remote command's exit status (return code). Note that
106 changes: 0 additions & 106 deletions teuthology/queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,106 +0,0 @@
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology import report
from teuthology.config import config

log = logging.getLogger(__name__)

def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
msg=msg, i=index, total=total))
sys.stderr.flush()

def end_progress():
sys.stderr.write('\n')
sys.stderr.flush()

class JobProcessor(object):
def __init__(self):
self.jobs = OrderedDict()

def add_job(self, job_id, job_config, job_obj=None):
job_id = str(job_id)

job_dict = dict(
index=(len(self.jobs) + 1),
job_config=job_config,
)
if job_obj:
job_dict['job_obj'] = job_obj
self.jobs[job_id] = job_dict

self.process_job(job_id)

def process_job(self, job_id):
pass

def complete(self):
pass


class JobPrinter(JobProcessor):
def __init__(self, show_desc=False, full=False):
super(JobPrinter, self).__init__()
self.show_desc = show_desc
self.full = full

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_index = self.jobs[job_id]['index']
job_priority = job_config['priority']
job_name = job_config['name']
job_desc = job_config['description']
print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
i=job_index,
pri=job_priority,
job_id=job_id,
job_name=job_name,
))
if self.full:
pprint.pprint(job_config)
elif job_desc and self.show_desc:
for desc in job_desc.split():
print('\t {}'.format(desc))


class RunPrinter(JobProcessor):
def __init__(self):
super(RunPrinter, self).__init__()
self.runs = list()

def process_job(self, job_id):
run = self.jobs[job_id]['job_config']['name']
if run not in self.runs:
self.runs.append(run)
print(run)


class JobDeleter(JobProcessor):
def __init__(self, pattern):
self.pattern = pattern
super(JobDeleter, self).__init__()

def add_job(self, job_id, job_config, job_obj=None):
job_name = job_config['name']
if self.pattern in job_name:
super(JobDeleter, self).add_job(job_id, job_config, job_obj)

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_name = job_config['name']
print('Deleting {job_name}/{job_id}'.format(
job_id=job_id,
job_name=job_name,
))
report.try_delete_jobs(job_name, job_id)


def main(args):
if config.backend == 'paddles':
paddles.main(args)
else:
beanstalk.main(args)
16 changes: 7 additions & 9 deletions teuthology/queue/beanstalk.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import beanstalkc
import yaml
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology.config import config
from teuthology import report
from teuthology.queue import util


log = logging.getLogger(__name__)

@@ -47,7 +45,7 @@ def callback(jobs_dict)
# Try to figure out a sane timeout based on how many jobs are in the queue
timeout = job_count / 2000.0 * 60
for i in range(1, job_count + 1):
print_progress(i, job_count, "Loading")
util.print_progress(i, job_count, "Loading")
job = connection.reserve(timeout=timeout)
if job is None or job.body is None:
continue
@@ -57,7 +55,7 @@ def callback(jobs_dict)
if pattern is not None and pattern not in job_name:
continue
processor.add_job(job_id, job_config, job)
end_progress()
util.end_progress()
processor.complete()


@@ -105,13 +103,13 @@ def main(args):
pause_tube(connection, machine_type, pause_duration)
elif delete:
walk_jobs(connection, machine_type,
JobDeleter(delete))
util.JobDeleter(delete))
elif runs:
walk_jobs(connection, machine_type,
RunPrinter())
util.RunPrinter())
else:
walk_jobs(connection, machine_type,
JobPrinter(show_desc=show_desc, full=full))
util.JobPrinter(show_desc=show_desc, full=full))
except KeyboardInterrupt:
log.info("Interrupted.")
finally:
49 changes: 21 additions & 28 deletions teuthology/queue/paddles.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology import report
from teuthology.dispatcher import pause_queue

from teuthology.queue import util

log = logging.getLogger(__name__)

@@ -14,75 +10,72 @@ def stats_queue(machine_type):
stats = report.get_queue_stats(machine_type)
if stats['paused'] is None:
log.info("%s queue is currently running with %s jobs queued",
stats['name'],
stats['count'])
stats['queue'],
stats['queued_jobs'])
else:
log.info("%s queue is paused with %s jobs queued",
stats['name'],
stats['count'])
stats['queue'],
stats['queued_jobs'])


def update_priority(machine_type, priority, user, run_name=None):
def update_priority(machine_type, priority, run_name=None):
if run_name is not None:
jobs = report.get_user_jobs_queue(machine_type, user, run_name)
else:
jobs = report.get_user_jobs_queue(machine_type, user)
jobs = report.get_jobs_by_run(machine_type, run_name)
for job in jobs:
job['priority'] = priority
report.try_push_job_info(job)


def walk_jobs(machine_type, processor, user):
log.info("Checking paddles queue...")
job_count = report.get_queue_stats(machine_type)['count']
job_count = report.get_queue_stats(machine_type)['queued_jobs']

jobs = report.get_user_jobs_queue(machine_type, user)
if job_count == 0:
log.info('No jobs in queue')
log.info('No jobs in Paddles queue')
return

for i in range(1, job_count + 1):
print_progress(i, job_count, "Loading")
util.print_progress(i, job_count, "Loading")
job = jobs[i-1]
if job is None:
continue
job_id = job['job_id']
processor.add_job(job_id, job)
end_progress()
util.end_progress()
processor.complete()


def main(args):
machine_type = args['--machine_type']
#user = args['--user']
#run_name = args['--run_name']
#priority = args['--priority']
user = args['--user']
run_name = args['--run-name']
status = args['--status']
delete = args['--delete']
runs = args['--runs']
show_desc = args['--description']
full = args['--full']
pause_duration = args['--pause']
#unpause = args['--unpause']
#pause_duration = args['--time']
priority = args['--priority']
try:
if status:
stats_queue(machine_type)
if pause_duration:
pause_queue(machine_type, pause, user, pause_duration)
#else:
#pause_queue(machine_type, pause, user)
if not user:
log.info('Please enter user to pause Paddles queue')
return
report.pause_queue(machine_type, user, pause_duration)
elif priority:
update_priority(machine_type, priority, run_name)
elif delete:
walk_jobs(machine_type,
JobDeleter(delete), user)
util.JobDeleter(delete), user)
elif runs:
walk_jobs(machine_type,
RunPrinter(), user)
util.RunPrinter(), user)
else:
walk_jobs(machine_type,
JobPrinter(show_desc=show_desc, full=full),
util.JobPrinter(show_desc=show_desc, full=full),
user)
except KeyboardInterrupt:
log.info("Interrupted.")
101 changes: 101 additions & 0 deletions teuthology/queue/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology import report

log = logging.getLogger(__name__)


def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
msg=msg, i=index, total=total))
sys.stderr.flush()


def end_progress():
sys.stderr.write('\n')
sys.stderr.flush()


class JobProcessor(object):
def __init__(self):
self.jobs = OrderedDict()

def add_job(self, job_id, job_config, job_obj=None):
job_id = str(job_id)

job_dict = dict(
index=(len(self.jobs) + 1),
job_config=job_config,
)
if job_obj:
job_dict['job_obj'] = job_obj
self.jobs[job_id] = job_dict

self.process_job(job_id)

def process_job(self, job_id):
pass

def complete(self):
pass


class JobPrinter(JobProcessor):
def __init__(self, show_desc=False, full=False):
super(JobPrinter, self).__init__()
self.show_desc = show_desc
self.full = full

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_index = self.jobs[job_id]['index']
job_priority = job_config['priority']
job_name = job_config['name']
job_desc = job_config['description']
print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
i=job_index,
pri=job_priority,
job_id=job_id,
job_name=job_name,
))
if self.full:
pprint.pprint(job_config)
elif job_desc and self.show_desc:
for desc in job_desc.split():
print('\t {}'.format(desc))


class RunPrinter(JobProcessor):
def __init__(self):
super(RunPrinter, self).__init__()
self.runs = list()

def process_job(self, job_id):
run = self.jobs[job_id]['job_config']['name']
if run not in self.runs:
self.runs.append(run)
print(run)


class JobDeleter(JobProcessor):
def __init__(self, pattern):
self.pattern = pattern
super(JobDeleter, self).__init__()

def add_job(self, job_id, job_config, job_obj=None):
job_name = job_config['name']
if self.pattern in job_name:
super(JobDeleter, self).add_job(job_id, job_config, job_obj)

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_name = job_config['name']
print('Deleting {job_name}/{job_id}'.format(
job_id=job_id,
job_name=job_name,
))
report.try_delete_jobs(job_name, job_id)
45 changes: 20 additions & 25 deletions teuthology/report.py
Original file line number Diff line number Diff line change
@@ -282,7 +282,6 @@ def write_new_job(self, run_name, job_info):
sleep=1, increment=inc, action=f'write job for {run_name}') as proceed:
while proceed():
response = self.session.post(run_uri, data=job_json, headers=headers)

if response.status_code == 200:
resp_json = response.json()
job_id = resp_json['job_id']
@@ -387,7 +386,7 @@ def last_run(self):

def get_top_job(self, queue):

uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri,
uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri,
queue=queue)
inc = random.uniform(0, 1)
with safe_while(
@@ -523,7 +522,6 @@ def create_queue(self, queue):
sleep=1, increment=inc, action=f'creating queue {queue}') as proceed:
while proceed():
response = self.session.post(uri, data=queue_json, headers=headers)

if response.status_code == 200:
self.log.info("Successfully created queue {queue}".format(
queue=queue,
@@ -546,13 +544,14 @@ def create_queue(self, queue):

response.raise_for_status()

def update_queue(self, queue, paused, paused_by, pause_duration=None):
def update_queue(self, queue, paused_by, pause_duration=None):
uri = "{base}/queue/".format(
base=self.base_uri
)

if pause_duration is not None:
pause_duration = int(pause_duration)
queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by,
queue_info = {'queue': queue, 'paused_by': paused_by,
'pause_duration': pause_duration}
queue_json = json.dumps(queue_info)
headers = {'content-type': 'application/json'}
@@ -581,34 +580,28 @@ def update_queue(self, queue, paused, paused_by, pause_duration=None):


def queue_stats(self, queue):
uri = "{base}/queue/stats/".format(
base=self.base_uri
uri = "{base}/queue/stats/?queue={queue}".format(
base=self.base_uri,
queue=queue
)
queue_info = {'queue': queue}
queue_json = json.dumps(queue_info)

headers = {'content-type': 'application/json'}
inc = random.uniform(0, 1)
with safe_while(
sleep=1, increment=inc, action=f'stats for queue {queue}') as proceed:
while proceed():
response = self.session.post(uri, data=queue_json, headers=headers)

response = self.session.get(uri)
if response.status_code == 200:
self.log.info("Successfully retrieved stats for queue {queue}".format(
queue=queue,
))
return response.json()
else:
msg = response.text
self.log.error(
"POST to {uri} failed with status {status}: {msg}".format(
"GET to {uri} failed with status {status}: {msg}".format(
uri=uri,
status=response.status_code,
msg=msg,
))
response.raise_for_status()



def queued_jobs(self, queue, user, run_name):
uri = "{base}/queue/queued_jobs/".format(
base=self.base_uri
@@ -669,12 +662,18 @@ def get_user_jobs_queue(queue, user, run_name=None):
return
return reporter.queued_jobs(queue, user, run_name)

def get_jobs_by_run(queue, run_name):
reporter = ResultsReporter()
if not reporter.base_uri:
return
return reporter.queued_jobs(queue, None, run_name)


def pause_queue(queue, paused, paused_by, pause_duration=None):
def pause_queue(queue, paused_by, pause_duration=None):
reporter = ResultsReporter()
if not reporter.base_uri:
return
reporter.update_queue(queue, paused, paused_by, pause_duration)
reporter.update_queue(queue, paused_by, pause_duration)


def is_queue_paused(queue):
@@ -711,7 +710,7 @@ def push_job_info(run_name, job_id, job_info, base_uri=None):
reporter.report_job(run_name, job_id, job_info)


def get_queued_job(machine_type):
def get_queued_job(queue):
"""
Retrieve a job that is queued depending on priority
@@ -720,10 +719,6 @@ def get_queued_job(machine_type):
reporter = ResultsReporter()
if not reporter.base_uri:
return
if ',' in machine_type:
queue = 'multi'
else:
queue = machine_type
if is_queue_paused(queue) == True:
log.info("Teuthology queue %s is currently paused",
queue)
11 changes: 7 additions & 4 deletions teuthology/schedule.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
import yaml

import teuthology.beanstalk
import teuthology.queue.beanstalk
from teuthology.misc import get_user, merge_configs
from teuthology import report
from teuthology.config import config


def main(args):
@@ -27,7 +28,9 @@ def main(args):
if not name or name.isdigit():
raise ValueError("Please use a more descriptive value for --name")
job_config = build_config(args)
backend = args['--queue-backend']
backend = config.backend
if args['--queue-backend']:
backend = args['--queue-backend']
if args['--dry-run']:
print('---\n' + yaml.safe_dump(job_config))
elif backend.startswith('@'):
@@ -115,9 +118,9 @@ def beanstalk_schedule_job(job_config, backend, num=1):
"""
num = int(num)
tube = job_config.pop('tube')
beanstalk = teuthology.beanstalk.connect()
beanstalk = teuthology.queue.beanstalk.connect()
beanstalk.use(tube)
queue = report.create_machine_type_queue(job_config['machine_type'])
queue = report.create_machine_type_queue(tube)
job_config['queue'] = queue
while num > 0:
job_id = report.try_create_job(job_config, dict(status='queued'))
73 changes: 0 additions & 73 deletions teuthology/test/test_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from teuthology import dispatcher
from unittest.mock import patch, Mock
from teuthology import report

import unittest.mock as mock
import unittest


@@ -35,74 +33,3 @@ def test_mock_get_queue_job(self):

self.assertEqual(response.status_code, 200)
self.assertEqual(response.json(), job_config)


@patch("teuthology.worker.fetch_teuthology")
@patch("teuthology.dispatcher.fetch_qa_suite")
@patch("teuthology.worker.fetch_qa_suite")
@patch("teuthology.dispatcher.report.get_queued_job")
@patch("teuthology.dispatcher.report.try_push_job_info")
@patch("teuthology.dispatcher.setup_log_file")
@patch("os.path.isdir")
@patch("os.getpid")
@patch("teuthology.dispatcher.teuth_config")
@patch("subprocess.Popen")
@patch("os.path.join")
@patch("teuthology.dispatcher.create_job_archive")
@patch("yaml.safe_dump")
def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite,
m_worker_fetch_qa_suite, m_get_queued_job,
m_try_push_job_info,
m_setup_log,
m_isdir, m_getpid,
m_t_config, m_popen, m_join, m_create_archive, m_yaml_dump):

args = {
'--owner': 'the_owner',
'--archive-dir': '/archive/dir',
'--log-dir': '/worker/log',
'--name': 'the_name',
'--description': 'the_description',
'--machine-type': 'test_queue',
'--supervisor': False,
'--verbose': False,
'--queue-backend': 'paddles'
}

m = mock.MagicMock()
job_id = {'job_id': '1'}
m.__getitem__.side_effect = job_id.__getitem__
m.__iter__.side_effect = job_id.__iter__
job = {
'job_id': '1',
'description': 'DESC',
'email': 'EMAIL',
'first_in_suite': False,
'last_in_suite': True,
'machine_type': 'test_queue',
'name': 'NAME',
'owner': 'OWNER',
'priority': 99,
'results_timeout': '6',
'verbose': False,
'stop_worker': True,
'archive_path': '/archive/dir/NAME/1'
}

m_fetch_teuthology.return_value = '/teuth/path'
m_fetch_qa_suite.return_value = '/suite/path'
m_isdir.return_value = True
mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job')
mock_get = mock_get_patcher.start()
mock_get.return_value = job

mock_prep_job_patcher = patch('teuthology.dispatcher.prep_job')
mock_prep_job = mock_prep_job_patcher.start()
mock_prep_job.return_value = (job, '/teuth/bin/path')
m_yaml_dump.return_value = ''

m_try_push_job_info.called_once_with(job, dict(status='running'))
dispatcher.main(args)
mock_get_patcher.stop()


0 comments on commit a6c7093

Please sign in to comment.