diff --git a/teuthology/run.py b/teuthology/run.py index eda7e43c3e..3107c83b61 100644 --- a/teuthology/run.py +++ b/teuthology/run.py @@ -4,6 +4,7 @@ import contextlib import sys import logging +from collections import OrderedDict from traceback import format_tb import teuthology @@ -122,13 +123,13 @@ def setup_config(config_paths): job_id = str(job_id) config['job_id'] = job_id - # targets must be >= than roles - if 'targets' in config and 'roles' in config: + # targets must be >= than nodes + if 'targets' in config and 'nodes' in config: targets = len(config['targets']) - roles = len(config['roles']) - assert targets >= roles, \ - '%d targets are needed for all roles but found %d listed.' % ( - roles, targets) + nodes = len(config['nodes']) + assert targets >= nodes, \ + '%d targets are needed for all nodes but found %d listed.' % ( + nodes, targets) return config @@ -179,26 +180,89 @@ def validate_tasks(config): return config["tasks"] +def get_nodes_request(config, machine_type): + """ + Examine each item in a job's 'nodes' stanza. Consolidate those with the + same requirements into 'node requests' so that we may later call + lock_many() as few times as possible. + + Each resulting request contains role lists, each of which will be mapped to + a target when the machines are locked. + """ + request = list() + os_specs = OrderedDict() + # Group node confs by 'spec'. We use an OrderedDict to contain them + # briefly, because it combines the functionality of an ordered set with + # key-value storage. + for item in config: + item['arch'] = item.get('arch') + item['machine_type'] = item.get('machine_type', machine_type) + spec_key = ( + item.get('os_type'), + item.get('os_version'), + item.get('arch'), + item.get('machine_type'), + ) + spec_roles = os_specs.get(spec_key, list()) + assert isinstance(spec_roles, list) + spec_roles.append(item['roles']) + os_specs[spec_key] = spec_roles + + # Build a 'request' for each 'spec' + for spec, roles in os_specs.items(): + os_type, os_version, arch, machine_type = spec + request.append(dict( + os_type=os_type or None, + os_version=os_version or None, + arch=arch or None, + machine_type=machine_type, + roles=roles, + )) + return request + + def get_initial_tasks(lock, config, machine_type): init_tasks = [ {'internal.check_packages': None}, {'internal.buildpackages_prep': None}, ] - if 'roles' in config and lock: - msg = ('You cannot specify targets in a config file when using the ' + - '--lock option') - assert 'targets' not in config, msg - init_tasks.append({'internal.lock_machines': ( - len(config['roles']), machine_type)}) + + target_conflict_msg = 'You cannot specify targets in a config file when' \ + 'using the --lock option' + if lock and ('roles' in config or 'nodes' in config): + assert 'targets' not in config, target_conflict_msg + if lock and 'roles' in config: + if 'nodes' in config: + log.warn( + "Config specifies both 'roles' and 'nodes'; " + "using 'nodes' and ignoring 'roles'" + ) + else: + # Convert old 'roles' stanza into new 'nodes' stanza, so that + # elsewhere in teuthology we can consolidate to one set of + # codepaths for node specification + nodes_config = list() + for node_roles in config['roles']: + nodes_config.append(dict( + roles=node_roles, + os_type=config.get("os_type"), + os_version=config.get("os_version"), + arch=config.get('arch'), + )) + config['nodes'] = nodes_config + del config['roles'] + if lock and 'nodes' in config: + nodes_request = get_nodes_request(config['nodes'], machine_type) + init_tasks.append({'internal.lock_machines': nodes_request}) init_tasks.append({'internal.save_config': None}) - if 'roles' in config: + if 'nodes' in config: init_tasks.append({'internal.check_lock': None}) init_tasks.append({'internal.add_remotes': None}) - if 'roles' in config: + if 'nodes' in config: init_tasks.extend([ {'console_log': None}, {'internal.connect': None}, @@ -207,8 +271,8 @@ def get_initial_tasks(lock, config, machine_type): {'internal.check_conflict': None}, ]) - if ('roles' in config and - not config.get('use_existing_cluster', False)): + if ('nodes' in config and + not config.get('use_existing_cluster', False)): init_tasks.extend([ {'internal.check_ceph_data': None}, {'internal.vm_setup': None}, @@ -217,10 +281,10 @@ def get_initial_tasks(lock, config, machine_type): if 'kernel' in config: init_tasks.append({'kernel': config['kernel']}) - if 'roles' in config: + if 'nodes' in config: init_tasks.append({'internal.base': None}) init_tasks.append({'internal.archive_upload': None}) - if 'roles' in config: + if 'nodes' in config: init_tasks.extend([ {'internal.archive': None}, {'internal.coredump': None}, @@ -237,7 +301,7 @@ def get_initial_tasks(lock, config, machine_type): {'kernel.install_latest_rh_kernel': None} ]) - if 'roles' in config: + if 'nodes' in config: init_tasks.extend([ {'pcp': None}, {'selinux': None}, @@ -357,6 +421,14 @@ def main(args): if suite_repo: teuth_config.ceph_qa_suite_git_url = suite_repo + # overwrite the config value of os_type if --os-type is provided + if os_type: + config["os_type"] = os_type + + # overwrite the config value of os_version if --os-version is provided + if os_version: + config["os_version"] = os_version + config["tasks"] = validate_tasks(config) init_tasks = get_initial_tasks(lock, config, machine_type) @@ -370,14 +442,6 @@ def main(args): # fetches the tasks and returns a new suite_path if needed config["suite_path"] = fetch_tasks_if_needed(config) - # overwrite the config value of os_type if --os-type is provided - if os_type: - config["os_type"] = os_type - - # overwrite the config value of os_version if --os-version is provided - if os_version: - config["os_version"] = os_version - # If the job has a 'use_shaman' key, use that value to override the global # config's value. if config.get('use_shaman') is not None: diff --git a/teuthology/task/internal/__init__.py b/teuthology/task/internal/__init__.py index 2e16423ade..c19fcd6eac 100644 --- a/teuthology/task/internal/__init__.py +++ b/teuthology/task/internal/__init__.py @@ -137,30 +137,24 @@ def add_remotes(ctx, config): """ ctx.cluster = cluster.Cluster() # Allow jobs to run without using nodes, for self-testing - if 'roles' not in ctx.config and 'targets' not in ctx.config: + if 'nodes' not in ctx.config and 'targets' not in ctx.config: return - remotes = [] - machs = [] - for name in ctx.config['targets'].iterkeys(): - machs.append(name) - for t, key in ctx.config['targets'].iteritems(): - t = misc.canonicalize_hostname(t) - try: - if ctx.config['sshkeys'] == 'ignore': - key = None - except (AttributeError, KeyError): - pass - rem = remote.Remote(name=t, host_key=key, keep_alive=True) - remotes.append(rem) - if 'roles' in ctx.config: - for rem, roles in zip(remotes, ctx.config['roles']): + for node_conf in ctx.config['nodes']: + remotes = [] + for name, key in node_conf['targets'].items(): + name = misc.canonicalize_hostname(name) + try: + if ctx.config['sshkeys'] == 'ignore': + key = None + except (AttributeError, KeyError): + pass + rem = remote.Remote(name=name, host_key=key, keep_alive=True) + remotes.append(rem) + roles = node_conf['roles'] assert all(isinstance(role, str) for role in roles), \ "Roles in config must be strings: %r" % roles ctx.cluster.add(rem, roles) log.info('roles: %s - %s' % (rem, roles)) - else: - for rem in remotes: - ctx.cluster.add(rem, rem.name) def connect(ctx, config): diff --git a/teuthology/task/internal/lock_machines.py b/teuthology/task/internal/lock_machines.py index 96b5eec632..c1e0830d41 100644 --- a/teuthology/task/internal/lock_machines.py +++ b/teuthology/task/internal/lock_machines.py @@ -24,59 +24,70 @@ def lock_machines(ctx, config): new machines. This is not called if the one has teuthology-locked machines and placed those keys in the Targets section of a yaml file. """ + ctx.config['targets'] = dict() + # Change the status during the locking process + report.try_push_job_info(ctx.config, dict(status='waiting')) + total_required = sum(map(lambda c: len(c['roles']), config)) + assert len(ctx.config['nodes']) == total_required + # Now lock all the machines we need + for request in config: + request['targets'] = do_lock_machines(ctx, request) + assert len(request['targets']) == len(request['roles']) + + # Then, map the resulting locked targets to each appropriate 'node conf' in + # ctx.config['nodes'] + for node_conf in ctx.config['nodes']: + + def request_matches(request): + def get_spec(obj): + keys = ('os_type', 'os_version', 'arch', 'machine_type') + return [obj.get(key) for key in keys] + return get_spec(node_conf) == get_spec(request) + request = filter(request_matches, config)[0] + node_conf['targets'] = dict() + key = request['targets'].keys()[0] + node_conf['targets'][key] = request['targets'].pop(key) + + # successfully locked machines, change status back to running + report.try_push_job_info(ctx.config, dict(status='running')) + try: + yield + finally: + # If both unlock_on_failure and nuke-on-error are set, don't unlock now + # because we're just going to nuke (and unlock) later. + unlock_on_failure = ( + ctx.config.get('unlock_on_failure', False) + and not ctx.config.get('nuke-on-error', False) + ) + if get_status(ctx.summary) == 'pass' or unlock_on_failure: + log.info('Unlocking machines...') + for machine in ctx.config['targets'].iterkeys(): + teuthology.lock.ops.unlock_one( + ctx, machine, ctx.owner, ctx.archive) + + +def do_lock_machines(ctx, request): # It's OK for os_type and os_version to be None here. If we're trying # to lock a bare metal machine, we'll take whatever is available. If # we want a vps, defaults will be provided by misc.get_distro and # misc.get_distro_version in provision.create_if_vm - os_type = ctx.config.get("os_type") - os_version = ctx.config.get("os_version") - arch = ctx.config.get('arch') + os_type = request['os_type'] + os_version = request['os_version'] + arch = request['arch'] or ctx.config.get('arch') + machine_type = request['machine_type'] + total_requested = len(request['roles']) log.info('Locking machines...') - assert isinstance(config[0], int), 'config[0] must be an integer' - machine_type = config[1] - total_requested = config[0] - # We want to make sure there are always this many machines available - reserved = teuth_config.reserve_machines - assert isinstance(reserved, int), 'reserve_machines must be integer' - assert (reserved >= 0), 'reserve_machines should >= 0' - - # change the status during the locking process - report.try_push_job_info(ctx.config, dict(status='waiting')) all_locked = dict() requested = total_requested while True: - # get a candidate list of machines - machines = teuthology.lock.query.list_locks(machine_type=machine_type, up=True, - locked=False, count=requested + reserved) - if machines is None: - if ctx.block: - log.error('Error listing machines, trying again') - time.sleep(20) - continue - else: - raise RuntimeError('Error listing machines') - - # make sure there are machines for non-automated jobs to run - if len(machines) < reserved + requested and ctx.owner.startswith('scheduled'): - if ctx.block: - log.info( - 'waiting for more %s machines to be free (need %s + %s, have %s)...', - machine_type, - reserved, - requested, - len(machines), - ) - time.sleep(10) - continue - else: - assert 0, ('not enough machines free; need %s + %s, have %s' % - (reserved, requested, len(machines))) + if not wait_for_enough(ctx, machine_type, requested): + continue try: - newly_locked = teuthology.lock.ops.lock_many(ctx, requested, machine_type, - ctx.owner, ctx.archive, os_type, - os_version, arch) + newly_locked = teuthology.lock.ops.lock_many( + ctx, requested, machine_type, ctx.owner, ctx.archive, os_type, + os_version, arch) except Exception: # Lock failures should map to the 'dead' status instead of 'fail' set_status(ctx.summary, 'dead') @@ -92,46 +103,21 @@ def lock_machines(ctx, config): ) ) if len(all_locked) == total_requested: - vmlist = [] - for lmach in all_locked: - if teuthology.lock.query.is_vm(lmach): - vmlist.append(lmach) - if vmlist: - log.info('Waiting for virtual machines to come up') - keys_dict = dict() - loopcount = 0 - while len(keys_dict) != len(vmlist): - loopcount += 1 - time.sleep(10) - keys_dict = misc.ssh_keyscan(vmlist) - log.info('virtual machine is still unavailable') - if loopcount == 40: - loopcount = 0 - log.info('virtual machine(s) still not up, ' + - 'recreating unresponsive ones.') - for guest in vmlist: - if guest not in keys_dict.keys(): - log.info('recreating: ' + guest) - full_name = misc.canonicalize_hostname(guest) - provision.destroy_if_vm(ctx, full_name) - provision.create_if_vm(ctx, full_name) - if teuthology.lock.keys.do_update_keys(keys_dict)[0]: - log.info("Error in virtual machine keys") - newscandict = {} - for dkey in all_locked.iterkeys(): - stats = teuthology.lock.query.get_status(dkey) - newscandict[dkey] = stats['ssh_pub_key'] - ctx.config['targets'] = newscandict + locked_vms = dict() + for name in all_locked: + if teuthology.lock.query.is_vm(name): + locked_vms[name] = all_locked[name] + if locked_vms: + ctx.config['targets'].update( + wait_for_vms(ctx, locked_vms)) else: - ctx.config['targets'] = all_locked + ctx.config['targets'].update(all_locked) locked_targets = yaml.safe_dump( - ctx.config['targets'], + all_locked, default_flow_style=False ).splitlines() log.info('\n '.join(['Locked targets:', ] + locked_targets)) - # successfully locked machines, change status back to running - report.try_push_job_info(ctx.config, dict(status='running')) - break + return all_locked elif not ctx.block: assert 0, 'not enough machines are available' else: @@ -145,16 +131,68 @@ def lock_machines(ctx, config): ) log.warn('Could not lock enough machines, waiting...') time.sleep(10) - try: - yield - finally: - # If both unlock_on_failure and nuke-on-error are set, don't unlock now - # because we're just going to nuke (and unlock) later. - unlock_on_failure = ( - ctx.config.get('unlock_on_failure', False) - and not ctx.config.get('nuke-on-error', False) - ) - if get_status(ctx.summary) == 'pass' or unlock_on_failure: - log.info('Unlocking machines...') - for machine in ctx.config['targets'].iterkeys(): - teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive) + + +def wait_for_enough(ctx, machine_type, requested): + # We want to make sure there are always this many machines available + reserved = teuth_config.reserve_machines + assert isinstance(reserved, int), 'reserve_machines must be integer' + assert (reserved >= 0), 'reserve_machines should >= 0' + # get a candidate list of machines + machines = teuthology.lock.query.list_locks( + machine_type=machine_type, up=True, locked=False, count=requested + + reserved) + if machines is None: + if ctx.block: + log.error('Error listing machines, trying again') + time.sleep(20) + return False + else: + raise RuntimeError('Error listing machines') + + # make sure there are machines for non-automated jobs to run + if (len(machines) < reserved + requested and + ctx.owner.startswith('scheduled')): + if ctx.block: + log.info( + 'waiting for more %s machines to be free ' + '(need %s + %s, have %s)...', + machine_type, + reserved, + requested, + len(machines), + ) + time.sleep(10) + return False + else: + assert 0, ('not enough machines free; need %s + %s, have %s' % + (reserved, requested, len(machines))) + return True + + +def wait_for_vms(ctx, vm_dict): + log.info('Waiting for virtual machines to come up') + keys_dict = dict() + loopcount = 0 + while len(keys_dict) != len(vm_dict): + loopcount += 1 + time.sleep(10) + keys_dict = misc.ssh_keyscan(vm_dict.keys()) + log.info('virtual machine is still unavailable') + if loopcount == 40: + loopcount = 0 + log.info('virtual machine(s) still not up, ' + 'recreating unresponsive ones.') + for guest in vm_dict: + if guest not in keys_dict.keys(): + log.info('recreating: ' + guest) + full_name = misc.canonicalize_hostname(guest) + provision.destroy_if_vm(ctx, full_name) + provision.create_if_vm(ctx, full_name) + if teuthology.lock.keys.do_update_keys(keys_dict)[0]: + log.info("Error in virtual machine keys") + newscandict = {} + for dkey in vm_dict.keys(): + stats = teuthology.lock.query.get_status(dkey) + newscandict[dkey] = stats['ssh_pub_key'] + return newscandict diff --git a/teuthology/test/test_run.py b/teuthology/test/test_run.py index 7fda102809..84799f7aee 100644 --- a/teuthology/test/test_run.py +++ b/teuthology/test/test_run.py @@ -32,15 +32,15 @@ def test_setup_config(self, m_merge_configs): @patch("teuthology.run.merge_configs") def test_setup_config_targets_ok(self, m_merge_configs): - config = {"targets": range(4), "roles": range(2)} + config = {"targets": range(4), "nodes": range(2)} m_merge_configs.return_value = config result = run.setup_config(["some/config.yaml"]) assert result["targets"] == [0, 1, 2, 3] - assert result["roles"] == [0, 1] + assert result["nodes"] == [0, 1] @patch("teuthology.run.merge_configs") def test_setup_config_targets_invalid(self, m_merge_configs): - config = {"targets": range(2), "roles": range(4)} + config = {"targets": range(2), "nodes": range(4)} m_merge_configs.return_value = config with pytest.raises(AssertionError): run.setup_config(["some/config.yaml"]) @@ -100,14 +100,102 @@ def test_get_initial_tasks_invalid(self): assert excinfo.value.message.startswith("You cannot") def test_get_inital_tasks(self): - config = {"roles": range(2), "kernel": "the_kernel", "use_existing_cluster": False} + config = dict( + roles=['r1', 'r2'], + kernel='the_kernel', + use_existing_cluster=False, + ) result = run.get_initial_tasks(True, config, "machine_type") - assert {"internal.lock_machines": (2, "machine_type")} in result + assert { + 'internal.lock_machines': [dict( + os_version=None, os_type=None, arch=None, + machine_type='machine_type', roles=['r1', 'r2'], + )] + } in result assert {"kernel": "the_kernel"} in result # added because use_existing_cluster == False assert {'internal.vm_setup': None} in result assert {'internal.buildpackages_prep': None} in result + @pytest.mark.parametrize( + 'input_conf, machine_type, expected', + [ + [ + dict(nodes=[ + dict( + roles=['u16'], + os_type='ubuntu', + os_version='16.04', + arch='aarch64', + ), + ]), + 'mtype', + [ + dict( + os_type='ubuntu', + os_version='16.04', + arch='aarch64', + machine_type='mtype', + roles=[['u16']], + ), + ], + ], + [ + dict(nodes=[ + dict( + roles=['u16_1'], + os_type='ubuntu', + os_version='16.04', + arch='x86_64', + ), + dict( + roles=['u16_2'], + os_type='ubuntu', + os_version='16.04', + arch='x86_64', + ), + dict( + roles=['c7_1'], + os_type='centos', + os_version='7.3', + ), + dict( + roles=[], + ), + ]), + 'mtype', + [ + dict( + os_type='ubuntu', + os_version='16.04', + arch='x86_64', + machine_type='mtype', + roles=[['u16_1'], ['u16_2']], + ), + dict( + os_type='centos', + os_version='7.3', + arch=None, + machine_type='mtype', + roles=[['c7_1']], + ), + dict( + os_type=None, + os_version=None, + arch=None, + machine_type='mtype', + roles=[[]], + ), + ], + ], + ] + ) + def test_lock_request(self, input_conf, machine_type, expected): + print expected + result = run.get_initial_tasks(True, input_conf, machine_type) + print result + assert {'internal.lock_machines': expected} in result + @patch("teuthology.run.fetch_qa_suite") def test_fetch_tasks_if_needed(self, m_fetch_qa_suite): config = {"suite_path": "/some/suite/path", "suite_branch": "feature_branch"}